Disbursement Processing Workflow¶
Temporal Workflow Definition Service: payout-svc Markets: PK, BD, NP, IQ, EG Scale: 270M+ transactions, $1B+ processed
Overview¶
The Disbursement Workflow orchestrates the full lifecycle of a single payout — from validation and balance reservation through channel submission, polling, ledger update, and merchant notification. It implements the Saga pattern with compensating actions to ensure consistency across distributed services.
A parent BatchDisbursementWorkflow fans out to individual DisbursementWorkflow instances as child workflows.
Workflow Definition¶
package workflows
import (
"time"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
// DisbursementRequest is the input to the disbursement workflow.
type DisbursementRequest struct {
DisbursementID string
MerchantID string
BeneficiaryID string
Amount decimal
Currency string // ISO 4217: PKR, BDT, NPR, IQD
Channel string // mobile_wallet, bank_transfer, agent_network
IdempotencyKey string
TraceID string
ScheduledAt *time.Time // nil = immediate
Metadata map[string]interface{}
}
// DisbursementResult is the output of a completed disbursement.
type DisbursementResult struct {
DisbursementID string
Status string
ChannelRef string
CompletedAt time.Time
Fee decimal
NetAmount decimal
}
// DisbursementWorkflow orchestrates a single payout with saga compensation.
//
// Workflow ID: disbursement-{idempotencyKey}
// Task Queue: payout-worker
func DisbursementWorkflow(ctx workflow.Context, req DisbursementRequest) (*DisbursementResult, error) {
// -----------------------------------------------------------------
// Activity options with per-channel retry policies
// -----------------------------------------------------------------
validateOpts := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
},
}
reserveOpts := workflow.ActivityOptions{
StartToCloseTimeout: 15 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
BackoffCoefficient: 2.0,
},
}
channelOpts := channelRetryPolicy(req.Channel, req.Currency)
ledgerOpts := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 5,
BackoffCoefficient: 2.0,
},
}
notifyOpts := workflow.ActivityOptions{
StartToCloseTimeout: 60 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
},
}
// -----------------------------------------------------------------
// Signal channel for manual intervention on STUCK state
// -----------------------------------------------------------------
manualResolveCh := workflow.GetSignalChannel(ctx, "manual-resolve")
// -----------------------------------------------------------------
// Step 1: Validate Request
// -----------------------------------------------------------------
validateCtx := workflow.WithActivityOptions(ctx, validateOpts)
var validationResult ValidationResult
err := workflow.ExecuteActivity(validateCtx, ValidateRequest, req).Get(ctx, &validationResult)
if err != nil {
return nil, err
}
// -----------------------------------------------------------------
// Step 2: Reserve Balance (compensatable)
// -----------------------------------------------------------------
reserveCtx := workflow.WithActivityOptions(ctx, reserveOpts)
var reservation BalanceReservation
err = workflow.ExecuteActivity(reserveCtx, ReserveBalance, ReserveBalanceInput{
MerchantID: req.MerchantID,
DisbursementID: req.DisbursementID,
Amount: req.Amount,
Currency: req.Currency,
}).Get(ctx, &reservation)
if err != nil {
return nil, err
}
// From this point, we must compensate on failure (release reservation)
var sagaCompensations []func()
sagaCompensations = append(sagaCompensations, func() {
compensateCtx := workflow.WithActivityOptions(ctx, reserveOpts)
_ = workflow.ExecuteActivity(compensateCtx, ReleaseBalance, ReleaseBalanceInput{
ReservationID: reservation.ReservationID,
DisbursementID: req.DisbursementID,
Reason: "saga_compensation",
}).Get(ctx, nil)
})
// -----------------------------------------------------------------
// Step 3: Submit to Channel
// -----------------------------------------------------------------
submitCtx := workflow.WithActivityOptions(ctx, channelOpts)
var submitResult ChannelSubmitResult
err = workflow.ExecuteActivity(submitCtx, SubmitToChannel, SubmitToChannelInput{
DisbursementID: req.DisbursementID,
BeneficiaryID: req.BeneficiaryID,
Amount: req.Amount,
Currency: req.Currency,
Channel: req.Channel,
TraceID: req.TraceID,
}).Get(ctx, &submitResult)
if err != nil {
// Saga compensation: release reserved balance
executeSagaCompensations(sagaCompensations)
return nil, err
}
// -----------------------------------------------------------------
// Step 4: Poll Channel Status (with timeout and STUCK handling)
// -----------------------------------------------------------------
pollCtx := workflow.WithActivityOptions(ctx, channelOpts)
var channelStatus ChannelStatusResult
maxPollDuration := 30 * time.Minute
pollInterval := 15 * time.Second
pollDeadline := workflow.Now(ctx).Add(maxPollDuration)
for {
err = workflow.ExecuteActivity(pollCtx, PollChannelStatus, PollChannelInput{
DisbursementID: req.DisbursementID,
ChannelRef: submitResult.ChannelRef,
Channel: req.Channel,
}).Get(ctx, &channelStatus)
if err != nil || channelStatus.IsFinal() {
break
}
// Check if we've exceeded the poll deadline
if workflow.Now(ctx).After(pollDeadline) {
// Mark as STUCK and wait for manual intervention signal
_ = workflow.ExecuteActivity(pollCtx, MarkAsStuck, req.DisbursementID).Get(ctx, nil)
// Wait for manual resolution signal (no timeout — human decides)
var resolution ManualResolution
manualResolveCh.Receive(ctx, &resolution)
channelStatus = ChannelStatusResult{
Status: resolution.Status,
ChannelRef: submitResult.ChannelRef,
}
break
}
// Wait before next poll
_ = workflow.Sleep(ctx, pollInterval)
}
// Handle channel failure
if channelStatus.Status == "FAILED" {
executeSagaCompensations(sagaCompensations)
return &DisbursementResult{
DisbursementID: req.DisbursementID,
Status: "FAILED",
ChannelRef: submitResult.ChannelRef,
CompletedAt: workflow.Now(ctx),
}, nil
}
// -----------------------------------------------------------------
// Step 5: Update Ledger
// -----------------------------------------------------------------
ledgerCtx := workflow.WithActivityOptions(ctx, ledgerOpts)
var ledgerResult LedgerUpdateResult
err = workflow.ExecuteActivity(ledgerCtx, UpdateLedger, UpdateLedgerInput{
DisbursementID: req.DisbursementID,
ReservationID: reservation.ReservationID,
MerchantID: req.MerchantID,
Amount: req.Amount,
Currency: req.Currency,
ChannelRef: submitResult.ChannelRef,
Status: channelStatus.Status,
}).Get(ctx, &ledgerResult)
if err != nil {
return nil, err
}
// -----------------------------------------------------------------
// Step 6: Notify Merchant (webhook)
// -----------------------------------------------------------------
notifyCtx := workflow.WithActivityOptions(ctx, notifyOpts)
_ = workflow.ExecuteActivity(notifyCtx, NotifyMerchant, NotifyMerchantInput{
MerchantID: req.MerchantID,
DisbursementID: req.DisbursementID,
Status: channelStatus.Status,
Amount: req.Amount,
Currency: req.Currency,
ChannelRef: submitResult.ChannelRef,
CompletedAt: workflow.Now(ctx),
}).Get(ctx, nil)
// Webhook failures are non-blocking — delivery retried by WebhookDeliveryWorkflow
return &DisbursementResult{
DisbursementID: req.DisbursementID,
Status: channelStatus.Status,
ChannelRef: submitResult.ChannelRef,
CompletedAt: workflow.Now(ctx),
Fee: ledgerResult.Fee,
NetAmount: ledgerResult.NetAmount,
}, nil
}
Batch Disbursement (Child Workflow Fan-Out)¶
// BatchDisbursementWorkflow processes a batch by fanning out child workflows.
//
// Workflow ID: batch-{batchId}
// Task Queue: payout-worker
func BatchDisbursementWorkflow(ctx workflow.Context, batch BatchDisbursementRequest) (*BatchResult, error) {
var futures []workflow.ChildWorkflowFuture
for _, item := range batch.Items {
childOpts := workflow.ChildWorkflowOptions{
WorkflowID: "disbursement-" + item.IdempotencyKey,
TaskQueue: "payout-worker",
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 1, // Individual items handle their own retries
},
}
childCtx := workflow.WithChildOptions(ctx, childOpts)
future := workflow.ExecuteChildWorkflow(childCtx, DisbursementWorkflow, item)
futures = append(futures, future)
}
// Collect results
result := &BatchResult{
BatchID: batch.BatchID,
MerchantID: batch.MerchantID,
}
for _, future := range futures {
var disbResult DisbursementResult
err := future.Get(ctx, &disbResult)
if err != nil {
result.FailedCount++
} else if disbResult.Status == "SUCCESS" {
result.SuccessCount++
} else {
result.FailedCount++
}
result.TotalProcessed++
}
// Determine batch status
if result.FailedCount == 0 {
result.Status = "COMPLETED"
} else if result.SuccessCount == 0 {
result.Status = "FAILED"
} else {
result.Status = "PARTIALLY_COMPLETED"
}
return result, nil
}
Channel-Specific Retry Policies¶
// channelRetryPolicy returns activity options tuned per channel and market.
func channelRetryPolicy(channel, currency string) workflow.ActivityOptions {
switch channel {
case "mobile_wallet":
// Mobile wallets: fast but occasionally flaky
return workflow.ActivityOptions{
StartToCloseTimeout: 60 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 2 * time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 30 * time.Second,
MaximumAttempts: 5,
},
}
case "bank_transfer":
// Bank transfers: slower, more reliable
return workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 5 * time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 2 * time.Minute,
MaximumAttempts: 3,
},
}
case "agent_network":
// Agent networks: variable latency
return workflow.ActivityOptions{
StartToCloseTimeout: 3 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 3 * time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 1 * time.Minute,
MaximumAttempts: 4,
},
}
default:
return workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
},
}
}
}
Activities Summary¶
| Activity | Description | Idempotent | Compensating Action |
|---|---|---|---|
ValidateRequest |
Validates beneficiary, amount limits, channel availability | Yes | N/A |
ReserveBalance |
Holds funds in merchant balance | Yes | ReleaseBalance |
SubmitToChannel |
Sends disbursement to operator/bank channel | Yes (via idempotency key) | N/A (channel handles) |
PollChannelStatus |
Polls channel for terminal status | Yes | N/A |
MarkAsStuck |
Updates status to STUCK for manual intervention | Yes | N/A |
UpdateLedger |
Commits or releases reservation, records settlement | Yes | N/A |
NotifyMerchant |
Enqueues webhook delivery to merchant | Yes | N/A |
ReleaseBalance |
Releases reserved funds (saga compensation) | Yes | N/A |
Signals¶
| Signal | Payload | Purpose |
|---|---|---|
manual-resolve |
ManualResolution{Status, ResolvedBy, Notes} |
Allows operations team to resolve STUCK disbursements |
Failure Modes & Recovery¶
| Failure | Behaviour |
|---|---|
| Validation fails | Workflow returns error, no side effects |
| Balance reservation fails | Workflow returns error (insufficient funds) |
| Channel submission fails | Saga compensation releases reserved balance |
| Channel polling times out (30m) | Marked STUCK, awaits manual signal |
| Ledger update fails | Retried 5 times with backoff; on exhaustion, alert raised |
| Webhook delivery fails | Non-blocking; retried by separate WebhookDeliveryWorkflow |