Composable Rust Sagas Expert
Expert knowledge for implementing distributed sagas in Composable Rust - multi-aggregate coordination, compensation logic, state machines, event bus patterns, and orchestration vs choreography.
When to Use This Skill
Automatically apply when:
- •Coordinating multiple aggregates in a distributed transaction
- •Implementing compensation logic or rollback flows
- •Working with
EventBustrait or Redpanda integration - •Designing saga state machines
- •Questions about eventual consistency or distributed transactions
- •Debugging saga failures or compensation flows
Saga Pattern Fundamentals
What is a Saga?
A saga is a sequence of local transactions across multiple aggregates, where each transaction publishes events. If a step fails, execute compensating transactions to undo completed work.
Success flow: Order → Payment → Inventory → Shipping → ✅ Complete Failure flow (payment fails): Order → Payment ❌ → Compensate Order → ✅ Rolled back
Why Sagas?
Problem: You can't use distributed transactions (2PC) because:
- •High latency and contention
- •Poor availability (all participants must be up)
- •Doesn't scale across services/databases
Solution: Saga pattern with eventual consistency:
- •Each aggregate commits independently
- •If failure occurs, compensate completed steps
- •Eventually consistent (all aggregates converge to correct state)
Saga vs 2PC
| Aspect | 2PC (Distributed Transaction) | Saga |
|---|---|---|
| Consistency | Strong (ACID) | Eventual |
| Availability | Low (blocks on coordinator) | High (no blocking) |
| Latency | High (2 phases, locks) | Low (async events) |
| Failure handling | Rollback (automatic) | Compensate (manual) |
| Scalability | Poor (locks, contention) | Good (no locks) |
Saga as Reducer (Core Pattern)
Key insight: A saga is just a reducer with a state machine.
Saga State Pattern
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckoutSagaState {
pub checkout_id: String,
pub current_step: SagaStep,
pub completed_steps: Vec<SagaStep>,
// IDs for compensation
pub order_id: Option<String>,
pub payment_id: Option<String>,
pub reservation_id: Option<String>,
// Data
pub customer_id: String,
pub items: Vec<Item>,
pub amount: Decimal,
// Tracking
pub started_at: DateTime<Utc>,
pub retry_count: u32,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum SagaStep {
NotStarted,
CreatingOrder,
ProcessingPayment,
ReservingInventory,
Completed,
Compensating { failed_at: Box<SagaStep> },
Failed,
}
Pattern: Track current step, completed steps, and IDs needed for compensation.
Saga Action Pattern
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CheckoutSagaAction {
// Initiating command
StartCheckout {
customer_id: String,
items: Vec<Item>,
amount: Decimal,
},
// Success events from aggregates
OrderCreated { order_id: String },
PaymentProcessed { payment_id: String },
InventoryReserved { reservation_id: String },
// Failure events from aggregates
OrderCreationFailed { reason: String },
PaymentFailed { reason: String },
InventoryReservationFailed { reason: String },
// Compensation events
OrderCompensated,
PaymentRefunded,
// Saga completion
SagaCompleted,
SagaFailed { reason: String },
}
Pattern: Commands to start, events for each step (success/failure), compensation events, terminal states.
Saga Reducer Implementation
pub struct CheckoutSagaReducer;
impl Reducer for CheckoutSagaReducer {
type State = CheckoutSagaState;
type Action = CheckoutSagaAction;
type Environment = SagaEnvironment;
fn reduce(
&self,
state: &mut Self::State,
action: Self::Action,
env: &Self::Environment,
) -> Vec<Effect<Self::Action>> {
match (&state.current_step, action) {
// Start saga
(SagaStep::NotStarted, CheckoutSagaAction::StartCheckout { customer_id, items, amount }) => {
state.current_step = SagaStep::CreatingOrder;
state.customer_id = customer_id.clone();
state.items = items.clone();
state.amount = amount;
vec![
Effect::PublishEvent(OrderCommand::CreateOrder {
customer_id,
items,
}),
]
}
// Order created → proceed to payment
(SagaStep::CreatingOrder, CheckoutSagaAction::OrderCreated { order_id }) => {
state.order_id = Some(order_id.clone());
state.completed_steps.push(SagaStep::CreatingOrder);
state.current_step = SagaStep::ProcessingPayment;
vec![
Effect::PublishEvent(PaymentCommand::ProcessPayment {
order_id,
amount: state.amount,
}),
]
}
// Payment processed → proceed to inventory
(SagaStep::ProcessingPayment, CheckoutSagaAction::PaymentProcessed { payment_id }) => {
state.payment_id = Some(payment_id.clone());
state.completed_steps.push(SagaStep::ProcessingPayment);
state.current_step = SagaStep::ReservingInventory;
vec![
Effect::PublishEvent(InventoryCommand::ReserveItems {
order_id: state.order_id.clone().unwrap(),
items: state.items.clone(),
}),
]
}
// Inventory reserved → complete
(SagaStep::ReservingInventory, CheckoutSagaAction::InventoryReserved { reservation_id }) => {
state.reservation_id = Some(reservation_id);
state.completed_steps.push(SagaStep::ReservingInventory);
state.current_step = SagaStep::Completed;
vec![
Effect::PublishEvent(CheckoutSagaAction::SagaCompleted),
]
}
// Payment failed → compensate order
(SagaStep::ProcessingPayment, CheckoutSagaAction::PaymentFailed { reason }) => {
state.current_step = SagaStep::Compensating {
failed_at: Box::new(SagaStep::ProcessingPayment),
};
// Compensate completed steps in reverse order
self.compensate(state, env)
}
// Inventory failed → compensate payment and order
(SagaStep::ReservingInventory, CheckoutSagaAction::InventoryReservationFailed { reason }) => {
state.current_step = SagaStep::Compensating {
failed_at: Box::new(SagaStep::ReservingInventory),
};
self.compensate(state, env)
}
_ => vec![Effect::None],
}
}
}
impl CheckoutSagaReducer {
/// Compensate completed steps in reverse order
fn compensate(
&self,
state: &CheckoutSagaState,
env: &SagaEnvironment,
) -> Vec<Effect<CheckoutSagaAction>> {
let mut effects = vec![];
// Compensate in reverse order
for step in state.completed_steps.iter().rev() {
match step {
SagaStep::CreatingOrder => {
if let Some(order_id) = &state.order_id {
effects.push(Effect::PublishEvent(OrderCommand::CancelOrder {
order_id: order_id.clone(),
reason: "Saga compensation".to_string(),
}));
}
}
SagaStep::ProcessingPayment => {
if let Some(payment_id) = &state.payment_id {
effects.push(Effect::PublishEvent(PaymentCommand::RefundPayment {
payment_id: payment_id.clone(),
}));
}
}
_ => {}
}
}
effects
}
}
Pattern:
- •Match on
(current_step, action)tuple - •Update state, track completed steps
- •Publish next command via
Effect::PublishEvent - •On failure, compensate completed steps in reverse order
EventBus Pattern (Multi-Aggregate Communication)
EventBus Trait
pub trait EventBus: Send + Sync {
type Event: Send + Sync;
/// Publish event to topic
async fn publish(&self, topic: &str, event: Self::Event) -> Result<(), Error>;
/// Subscribe to topic with consumer group
async fn subscribe(
&self,
topic: &str,
group_id: &str,
handler: impl Fn(Self::Event) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> + Send + Sync + 'static,
) -> Result<(), Error>;
}
Publish Pattern
// In reducer, return Effect::PublishEvent
vec![
Effect::PublishEvent {
topic: "orders".to_string(),
event: OrderEvent::OrderCreated { order_id: "123".to_string() },
},
]
// Store executes effect via event bus
async fn execute_effect(&self, effect: Effect<Action>) {
match effect {
Effect::PublishEvent { topic, event } => {
self.event_bus.publish(&topic, event).await?;
}
// ...
}
}
Subscribe Pattern
// Payment aggregate subscribes to orders topic
event_bus
.subscribe("orders", "payment-service", |event| {
Box::pin(async move {
match event {
OrderEvent::OrderCreated { order_id } => {
// Send payment processing action to payment store
payment_store
.send(PaymentAction::ProcessPayment { order_id })
.await?;
}
_ => {}
}
Ok(())
})
})
.await?;
Pattern: Each aggregate subscribes to events it cares about. Sends actions to its own store. This creates cross-aggregate coordination.
At-Least-Once Delivery
Events may be delivered multiple times. Design for idempotency:
// Idempotent action handling
fn reduce(&self, state: &mut State, action: Action, env: &Env) -> Vec<Effect> {
match action {
PaymentAction::ProcessPayment { order_id } => {
// Check if already processed
if state.processed_order_ids.contains(&order_id) {
return vec![Effect::None]; // ✅ Idempotent
}
// Process payment
state.processed_order_ids.insert(order_id.clone());
// ... payment logic
vec![Effect::PublishEvent(PaymentEvent::PaymentProcessed { order_id })]
}
_ => vec![Effect::None],
}
}
Pattern: Track processed IDs or use unique keys to prevent duplicate processing.
Orchestration vs Choreography
Choreography (Event-Driven)
Each aggregate listens to events and reacts independently:
Order creates order → publishes OrderCreated
↓
Payment listens → processes payment → publishes PaymentProcessed
↓
Inventory listens → reserves items → publishes InventoryReserved
Benefits:
- •Decoupled (no central coordinator)
- •Aggregates are independent
- •Easy to add new participants
Drawbacks:
- •Hard to understand full flow
- •Difficult to handle failures (who compensates?)
- •No single source of saga state
Orchestration (Saga Coordinator)
Central saga reducer coordinates the flow:
Saga: Create order → command
↓
Order: Order created → event
↓
Saga: Process payment → command
↓
Payment: Payment processed → event
↓
Saga: Reserve inventory → command
Benefits:
- •Clear flow (visible in saga reducer)
- •Centralized compensation logic
- •Easy to track saga state
- •Easier to debug
Drawbacks:
- •Central coordinator (potential bottleneck)
- •Saga knows about all participants
Recommendation: Use orchestration (saga as reducer) for complex flows with compensation. Use choreography for simple event cascades.
Redpanda/Kafka Integration
RedpandaEventBus Pattern
pub struct RedpandaEventBus {
producer: FutureProducer,
consumer: StreamConsumer,
}
impl RedpandaEventBus {
pub fn builder() -> RedpandaEventBusBuilder {
RedpandaEventBusBuilder::new()
}
}
// Usage
let event_bus = RedpandaEventBus::builder()
.broker("localhost:9092")
.build()?;
Publish Implementation
async fn publish(&self, topic: &str, event: Event) -> Result<(), Error> {
let payload = bincode::serialize(&event)?;
let record = FutureRecord::to(topic)
.payload(&payload)
.key(&event.aggregate_id()); // Partition by aggregate ID
self.producer
.send(record, Duration::from_secs(5))
.await
.map_err(|(err, _)| Error::PublishFailed(err))?;
Ok(())
}
Pattern: Serialize with bincode. Use aggregate ID as key (ensures ordering per aggregate).
Subscribe Implementation
async fn subscribe<F>(
&self,
topic: &str,
group_id: &str,
handler: F,
) -> Result<(), Error>
where
F: Fn(Event) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> + Send + Sync + 'static,
{
self.consumer.subscribe(&[topic])?;
loop {
match self.consumer.recv().await {
Ok(message) => {
let payload = message.payload().ok_or(Error::EmptyMessage)?;
let event: Event = bincode::deserialize(payload)?;
// Process event
handler(event).await?;
// Commit offset (at-least-once delivery)
self.consumer.commit_message(&message, CommitMode::Async)?;
}
Err(e) => {
eprintln!("Error receiving message: {}", e);
}
}
}
}
Pattern:
- •Receive message
- •Deserialize event
- •Call handler
- •Commit offset (manual commit for at-least-once)
Consumer Groups Pattern
Multiple instances of the same aggregate can share work:
Topic: orders (3 partitions) Consumer Group: payment-service ├─ Instance 1 → Partition 0 ├─ Instance 2 → Partition 1 └─ Instance 3 → Partition 2
Pattern: Use same group_id for all instances. Kafka assigns partitions automatically. Each partition processed by exactly one instance.
Compensation Patterns
Pattern 1: Reverse Order Compensation
Compensate in reverse order of execution:
fn compensate(state: &SagaState) -> Vec<Effect> {
let mut effects = vec![];
for step in state.completed_steps.iter().rev() {
effects.push(compensation_for_step(step));
}
effects
}
Pattern 2: Compensating Actions
Each action has a compensating action:
| Action | Compensating Action |
|---|---|
| Create order | Cancel order |
| Charge payment | Refund payment |
| Reserve inventory | Release inventory |
| Send email | Send cancellation email |
Pattern 3: Idempotent Compensation
Compensation must be idempotent (may be retried):
fn compensate_order(state: &mut State, order_id: &str) -> Vec<Effect> {
// Check if already compensated
if state.compensated_order_ids.contains(order_id) {
return vec![Effect::None];
}
state.compensated_order_ids.insert(order_id.to_string());
vec![Effect::PublishEvent(OrderCommand::CancelOrder {
order_id: order_id.to_string(),
})]
}
Pattern 4: Partial Compensation
Some actions can't be fully compensated. Use best-effort:
match failed_step {
SagaStep::EmailSent => {
// Can't unsend email, send apology email instead
vec![Effect::PublishEvent(EmailCommand::SendApology {
customer_id: state.customer_id.clone(),
})]
}
SagaStep::ExternalApiCalled => {
// External API may not support compensation
// Log for manual intervention
vec![Effect::Log {
level: LogLevel::Error,
message: "Manual compensation required for external API".to_string(),
}]
}
}
Error Handling and Retries
Transient vs Permanent Failures
use composable_rust_core::delay;
match error {
Error::NetworkTimeout | Error::ServiceUnavailable => {
// Transient error → retry
if state.retry_count < MAX_RETRIES {
state.retry_count += 1;
vec![delay! {
duration: exponential_backoff(state.retry_count),
action: original_action
}]
} else {
// Max retries → compensate
self.compensate(state, env)
}
}
Error::InvalidData | Error::InsufficientFunds => {
// Permanent error → compensate immediately
self.compensate(state, env)
}
}
Dead Letter Queue Pattern
if state.retry_count >= MAX_RETRIES {
vec![
Effect::PublishEvent {
topic: "dead-letter-queue".to_string(),
event: SagaFailedEvent {
saga_id: state.saga_id.clone(),
failed_at: state.current_step.clone(),
reason: error.to_string(),
},
},
// Then compensate
...self.compensate(state, env),
]
}
Pattern: Send to DLQ for manual review, then compensate.
Testing Patterns
Unit Test: Saga State Machine
#[test]
fn test_checkout_saga_success_flow() {
let env = test_environment();
let mut state = CheckoutSagaState::new("saga-1".to_string());
// Start checkout
let effects = reducer.reduce(
&mut state,
CheckoutSagaAction::StartCheckout { ... },
&env,
);
assert_eq!(state.current_step, SagaStep::CreatingOrder);
// Order created
let effects = reducer.reduce(
&mut state,
CheckoutSagaAction::OrderCreated { order_id: "order-1".to_string() },
&env,
);
assert_eq!(state.current_step, SagaStep::ProcessingPayment);
// Payment processed
let effects = reducer.reduce(
&mut state,
CheckoutSagaAction::PaymentProcessed { payment_id: "pay-1".to_string() },
&env,
);
assert_eq!(state.current_step, SagaStep::ReservingInventory);
// Inventory reserved
let effects = reducer.reduce(
&mut state,
CheckoutSagaAction::InventoryReserved { reservation_id: "res-1".to_string() },
&env,
);
assert_eq!(state.current_step, SagaStep::Completed);
}
Unit Test: Compensation
#[test]
fn test_checkout_saga_compensation() {
let env = test_environment();
let mut state = CheckoutSagaState::new("saga-1".to_string());
// Simulate completed steps
state.current_step = SagaStep::ProcessingPayment;
state.completed_steps.push(SagaStep::CreatingOrder);
state.order_id = Some("order-1".to_string());
// Payment fails
let effects = reducer.reduce(
&mut state,
CheckoutSagaAction::PaymentFailed { reason: "Insufficient funds".to_string() },
&env,
);
// Should compensate order
assert!(matches!(state.current_step, SagaStep::Compensating { .. }));
assert!(matches!(effects[0], Effect::PublishEvent(OrderCommand::CancelOrder { .. })));
}
Integration Test: With InMemoryEventBus
#[tokio::test]
async fn test_saga_with_event_bus() {
let event_bus = InMemoryEventBus::new();
// Create stores for each aggregate
let order_store = Store::new(OrderState::default(), OrderReducer, order_env);
let payment_store = Store::new(PaymentState::default(), PaymentReducer, payment_env);
let saga_store = Store::new(CheckoutSagaState::new("saga-1"), SagaReducer, saga_env);
// Subscribe aggregates to events
event_bus.subscribe("orders", "payment-service", |event| {
Box::pin(async move {
payment_store.send(PaymentAction::from_order_event(event)).await
})
}).await?;
// Start saga
saga_store.send(CheckoutSagaAction::StartCheckout { ... }).await;
// Wait for completion
tokio::time::sleep(Duration::from_millis(100)).await;
let saga_state = saga_store.state().await;
assert_eq!(saga_state.current_step, SagaStep::Completed);
}
Common Anti-Patterns to Avoid
❌ Anti-Pattern 1: Synchronous Cross-Aggregate Calls
// ❌ Don't call other aggregates directly
fn reduce(...) -> Vec<Effect> {
let payment_result = payment_service.process_payment().await; // ❌ Tight coupling
}
Solution: Use event bus for async communication.
❌ Anti-Pattern 2: Not Tracking Compensation Data
// ❌ Can't compensate without IDs
struct SagaState {
current_step: Step,
// ❌ Missing: order_id, payment_id, etc.
}
Solution: Store all IDs needed for compensation.
❌ Anti-Pattern 3: Ignoring Idempotency
// ❌ Processing same event twice
fn reduce(...) -> Vec<Effect> {
state.balance -= amount; // ❌ Double-deduct if event replays
}
Solution: Check if event already processed.
❌ Anti-Pattern 4: Compensation Order Errors
// ❌ Compensating in wrong order
fn compensate(state: &SagaState) -> Vec<Effect> {
for step in state.completed_steps.iter() { // ❌ Forward order
// Compensation
}
}
Solution: Compensate in reverse order.
❌ Anti-Pattern 5: No Timeout for Saga Steps
// ❌ Saga waits forever for event
fn reduce(...) -> Vec<Effect> {
// Waits indefinitely for PaymentProcessed event
}
Solution: Use timeouts and retry logic:
use composable_rust_core::delay;
vec![
Effect::PublishEvent(command),
delay! {
duration: Duration::from_secs(30),
action: SagaAction::StepTimeout { step: current_step }
},
]
Advanced Patterns
Pattern: Nested Sagas (Hierarchical Workflows)
Use Case: Complex workflows where one saga orchestrates multiple child sagas.
Example: Order Fulfillment saga coordinates Payment saga + Shipping saga + Notification saga.
Parent Saga State
#[derive(Clone, Debug)]
pub struct OrderFulfillmentState {
pub order_id: String,
pub status: FulfillmentStatus,
// Child saga states (owned by parent)
pub payment_saga: PaymentSagaState,
pub shipping_saga: Option<ShippingSagaState>,
pub notification_saga: Option<NotificationSagaState>,
// Track which child sagas are active
pub active_children: Vec<ChildSaga>,
}
#[derive(Clone, Debug)]
pub enum FulfillmentStatus {
Idle,
ProcessingPayment,
ProcessingShipping,
SendingNotifications,
Completed,
Compensating { failed_child: ChildSaga },
Failed,
}
#[derive(Clone, Debug, PartialEq)]
pub enum ChildSaga {
Payment,
Shipping,
Notification,
}
Parent Saga Reducer
impl Reducer for OrderFulfillmentSaga {
type State = OrderFulfillmentState;
type Action = FulfillmentAction;
type Environment = FulfillmentEnvironment;
fn reduce(
&self,
state: &mut Self::State,
action: Self::Action,
env: &Self::Environment,
) -> SmallVec<[Effect<Self::Action>; 4]> {
use FulfillmentStatus::*;
match (&state.status, action) {
// Parent starts: delegate to Payment child saga
(Idle, FulfillmentAction::StartFulfillment { order_id, amount }) => {
state.status = ProcessingPayment;
state.order_id = order_id.clone();
state.active_children.push(ChildSaga::Payment);
// Delegate to child Payment saga
smallvec![Effect::PublishEvent(PaymentAction::InitiatePayment {
order_id,
amount,
})]
}
// Child Payment saga completed successfully
(ProcessingPayment, FulfillmentAction::PaymentCompleted { transaction_id }) => {
state.payment_saga.transaction_id = Some(transaction_id.clone());
state.status = ProcessingShipping;
state.active_children.push(ChildSaga::Shipping);
// Start next child saga
smallvec![Effect::PublishEvent(ShippingAction::InitiateShipping {
order_id: state.order_id.clone(),
transaction_id,
})]
}
// Child Payment saga failed → Compensate
(ProcessingPayment, FulfillmentAction::PaymentFailed { error }) => {
state.status = Compensating {
failed_child: ChildSaga::Payment,
};
// No compensation needed (nothing to undo yet)
smallvec![Effect::PublishEvent(FulfillmentAction::FulfillmentFailed {
reason: format!("Payment failed: {error}"),
})]
}
// Child Shipping saga failed → Compensate Payment
(ProcessingShipping, FulfillmentAction::ShippingFailed { error }) => {
state.status = Compensating {
failed_child: ChildSaga::Shipping,
};
// Compensate: Refund payment (undo child Payment saga)
if let Some(transaction_id) = &state.payment_saga.transaction_id {
smallvec![Effect::PublishEvent(PaymentAction::RefundPayment {
transaction_id: transaction_id.clone(),
})]
} else {
smallvec![Effect::None]
}
}
// All child sagas succeeded
(ProcessingShipping, FulfillmentAction::ShippingCompleted { tracking }) => {
state.status = Completed;
state.active_children.clear();
smallvec![Effect::PublishEvent(FulfillmentAction::FulfillmentCompleted {
order_id: state.order_id.clone(),
tracking,
})]
}
_ => smallvec![Effect::None],
}
}
}
Key Patterns for Nested Sagas
1. Parent Owns Child State
pub struct ParentState {
pub child1: Child1State, // ✅ Owned by parent
pub child2: Child2State, // ✅ Owned by parent
}
Why: Parent needs visibility into child state for compensation and coordination.
2. Explicit Child Tracking
pub active_children: Vec<ChildSaga>, // Track which children are running
Why: Know which children to compensate if parent saga fails.
3. Error Propagation Up, Compensation Down
// Error propagates UP from child to parent
(ProcessingChild, Action::ChildFailed { error }) => {
state.status = Compensating { failed_child };
// Parent decides what to do
}
// Compensation flows DOWN from parent to children
(Compensating, _) => {
// Parent triggers compensation in children
smallvec![Effect::PublishEvent(ChildAction::Compensate)]
}
4. Sequential vs Parallel Children
// Sequential: Start child2 after child1 completes
(ChildCompleted, Child1Success) => {
smallvec![Effect::PublishEvent(StartChild2)]
}
// Parallel: Start all children at once
(Idle, Start) => {
smallvec![
Effect::PublishEvent(StartChild1),
Effect::PublishEvent(StartChild2),
Effect::PublishEvent(StartChild3),
]
}
Benefits of Nested Sagas
- •✅ Modularity: Child sagas are reusable in different parent contexts
- •✅ Clear ownership: Parent owns coordination, children own domain logic
- •✅ Explicit compensation: Parent decides compensation strategy
- •✅ Testing: Test child sagas independently, parent tests coordination
When to Use Nested Sagas
Use nested sagas when:
- •Workflow has clear hierarchical structure
- •Child workflows are reusable across different parents
- •Need different compensation strategies per parent context
Avoid nested sagas when:
- •Workflow is simple and flat (use single saga)
- •Children are tightly coupled (merge into one saga)
- •Communication overhead exceeds benefit (inline the logic)
Pattern: Saga Timeouts
use composable_rust_core::delay;
vec![
Effect::PublishEvent(command),
delay! {
duration: Duration::from_secs(30),
action: SagaAction::Timeout {
step: state.current_step.clone(),
}
},
]
// In reducer
(step, SagaAction::Timeout { step: timeout_step }) if step == timeout_step => {
// Step timed out, compensate
self.compensate(state, env)
}
Pattern: Saga Persistence
Persist saga state to recover from crashes:
fn reduce(...) -> Vec<Effect> {
vec![
Effect::Database(SaveSagaState(state.clone())),
Effect::PublishEvent(next_command),
]
}
Quick Reference Checklist
When implementing sagas:
- • State machine: Track current step and completed steps
- • Compensation data: Store IDs needed for rollback
- • Idempotent actions: Check if already processed
- • Reverse compensation: Undo in reverse order
- • Timeout handling: Don't wait forever for events
- • Retry logic: Distinguish transient vs permanent errors
- • Dead letter queue: Handle max retries
- • Event bus: Use for cross-aggregate communication
- • Consumer groups: For parallel processing
- • Manual offset commit: For at-least-once delivery
Performance Considerations
- •Orchestration overhead: Saga coordinator is extra hop, but usually negligible (<10ms)
- •Event bus latency: Kafka/Redpanda add ~5-20ms per event
- •Compensation cost: Rare in happy path (typically <1% of sagas compensate)
- •Consumer lag: Monitor with Kafka metrics, scale consumers if needed
See Also
- •Architecture:
composable-rust-architecture.skill- Core reducer/effect patterns - •Event Sourcing:
composable-rust-event-sourcing.skill- Event store and persistence - •Documentation:
docs/sagas.md- Comprehensive saga guide - •Patterns:
docs/saga-patterns.md- Additional saga patterns - •Consistency:
docs/consistency-patterns.md- Consistency guarantees
Remember: A saga is just a reducer with a state machine. Compensate in reverse order. Design for idempotency. Use event bus for coordination.