Skip to content

Disbursement Processing Workflow

Temporal Workflow Definition Service: payout-svc Markets: PK, BD, NP, IQ, EG Scale: 270M+ transactions, $1B+ processed


Overview

The Disbursement Workflow orchestrates the full lifecycle of a single payout — from validation and balance reservation through channel submission, polling, ledger update, and merchant notification. It implements the Saga pattern with compensating actions to ensure consistency across distributed services.

A parent BatchDisbursementWorkflow fans out to individual DisbursementWorkflow instances as child workflows.


Workflow Definition

package workflows

import (
    "time"

    "go.temporal.io/sdk/temporal"
    "go.temporal.io/sdk/workflow"
)

// DisbursementRequest is the input to the disbursement workflow.
type DisbursementRequest struct {
    DisbursementID  string
    MerchantID      string
    BeneficiaryID   string
    Amount          decimal
    Currency        string  // ISO 4217: PKR, BDT, NPR, IQD
    Channel         string  // mobile_wallet, bank_transfer, agent_network
    IdempotencyKey  string
    TraceID         string
    ScheduledAt     *time.Time // nil = immediate
    Metadata        map[string]interface{}
}

// DisbursementResult is the output of a completed disbursement.
type DisbursementResult struct {
    DisbursementID string
    Status         string
    ChannelRef     string
    CompletedAt    time.Time
    Fee            decimal
    NetAmount      decimal
}

// DisbursementWorkflow orchestrates a single payout with saga compensation.
//
// Workflow ID: disbursement-{idempotencyKey}
// Task Queue:  payout-worker
func DisbursementWorkflow(ctx workflow.Context, req DisbursementRequest) (*DisbursementResult, error) {

    // -----------------------------------------------------------------
    // Activity options with per-channel retry policies
    // -----------------------------------------------------------------
    validateOpts := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts: 3,
        },
    }

    reserveOpts := workflow.ActivityOptions{
        StartToCloseTimeout: 15 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts: 3,
            BackoffCoefficient: 2.0,
        },
    }

    channelOpts := channelRetryPolicy(req.Channel, req.Currency)

    ledgerOpts := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts: 5,
            BackoffCoefficient: 2.0,
        },
    }

    notifyOpts := workflow.ActivityOptions{
        StartToCloseTimeout: 60 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts: 3,
        },
    }

    // -----------------------------------------------------------------
    // Signal channel for manual intervention on STUCK state
    // -----------------------------------------------------------------
    manualResolveCh := workflow.GetSignalChannel(ctx, "manual-resolve")

    // -----------------------------------------------------------------
    // Step 1: Validate Request
    // -----------------------------------------------------------------
    validateCtx := workflow.WithActivityOptions(ctx, validateOpts)
    var validationResult ValidationResult
    err := workflow.ExecuteActivity(validateCtx, ValidateRequest, req).Get(ctx, &validationResult)
    if err != nil {
        return nil, err
    }

    // -----------------------------------------------------------------
    // Step 2: Reserve Balance (compensatable)
    // -----------------------------------------------------------------
    reserveCtx := workflow.WithActivityOptions(ctx, reserveOpts)
    var reservation BalanceReservation
    err = workflow.ExecuteActivity(reserveCtx, ReserveBalance, ReserveBalanceInput{
        MerchantID:     req.MerchantID,
        DisbursementID: req.DisbursementID,
        Amount:         req.Amount,
        Currency:       req.Currency,
    }).Get(ctx, &reservation)
    if err != nil {
        return nil, err
    }

    // From this point, we must compensate on failure (release reservation)
    var sagaCompensations []func()
    sagaCompensations = append(sagaCompensations, func() {
        compensateCtx := workflow.WithActivityOptions(ctx, reserveOpts)
        _ = workflow.ExecuteActivity(compensateCtx, ReleaseBalance, ReleaseBalanceInput{
            ReservationID:  reservation.ReservationID,
            DisbursementID: req.DisbursementID,
            Reason:         "saga_compensation",
        }).Get(ctx, nil)
    })

    // -----------------------------------------------------------------
    // Step 3: Submit to Channel
    // -----------------------------------------------------------------
    submitCtx := workflow.WithActivityOptions(ctx, channelOpts)
    var submitResult ChannelSubmitResult
    err = workflow.ExecuteActivity(submitCtx, SubmitToChannel, SubmitToChannelInput{
        DisbursementID: req.DisbursementID,
        BeneficiaryID:  req.BeneficiaryID,
        Amount:         req.Amount,
        Currency:       req.Currency,
        Channel:        req.Channel,
        TraceID:        req.TraceID,
    }).Get(ctx, &submitResult)
    if err != nil {
        // Saga compensation: release reserved balance
        executeSagaCompensations(sagaCompensations)
        return nil, err
    }

    // -----------------------------------------------------------------
    // Step 4: Poll Channel Status (with timeout and STUCK handling)
    // -----------------------------------------------------------------
    pollCtx := workflow.WithActivityOptions(ctx, channelOpts)
    var channelStatus ChannelStatusResult
    maxPollDuration := 30 * time.Minute
    pollInterval := 15 * time.Second

    pollDeadline := workflow.Now(ctx).Add(maxPollDuration)
    for {
        err = workflow.ExecuteActivity(pollCtx, PollChannelStatus, PollChannelInput{
            DisbursementID: req.DisbursementID,
            ChannelRef:     submitResult.ChannelRef,
            Channel:        req.Channel,
        }).Get(ctx, &channelStatus)

        if err != nil || channelStatus.IsFinal() {
            break
        }

        // Check if we've exceeded the poll deadline
        if workflow.Now(ctx).After(pollDeadline) {
            // Mark as STUCK and wait for manual intervention signal
            _ = workflow.ExecuteActivity(pollCtx, MarkAsStuck, req.DisbursementID).Get(ctx, nil)

            // Wait for manual resolution signal (no timeout — human decides)
            var resolution ManualResolution
            manualResolveCh.Receive(ctx, &resolution)
            channelStatus = ChannelStatusResult{
                Status:     resolution.Status,
                ChannelRef: submitResult.ChannelRef,
            }
            break
        }

        // Wait before next poll
        _ = workflow.Sleep(ctx, pollInterval)
    }

    // Handle channel failure
    if channelStatus.Status == "FAILED" {
        executeSagaCompensations(sagaCompensations)
        return &DisbursementResult{
            DisbursementID: req.DisbursementID,
            Status:         "FAILED",
            ChannelRef:     submitResult.ChannelRef,
            CompletedAt:    workflow.Now(ctx),
        }, nil
    }

    // -----------------------------------------------------------------
    // Step 5: Update Ledger
    // -----------------------------------------------------------------
    ledgerCtx := workflow.WithActivityOptions(ctx, ledgerOpts)
    var ledgerResult LedgerUpdateResult
    err = workflow.ExecuteActivity(ledgerCtx, UpdateLedger, UpdateLedgerInput{
        DisbursementID: req.DisbursementID,
        ReservationID:  reservation.ReservationID,
        MerchantID:     req.MerchantID,
        Amount:         req.Amount,
        Currency:       req.Currency,
        ChannelRef:     submitResult.ChannelRef,
        Status:         channelStatus.Status,
    }).Get(ctx, &ledgerResult)
    if err != nil {
        return nil, err
    }

    // -----------------------------------------------------------------
    // Step 6: Notify Merchant (webhook)
    // -----------------------------------------------------------------
    notifyCtx := workflow.WithActivityOptions(ctx, notifyOpts)
    _ = workflow.ExecuteActivity(notifyCtx, NotifyMerchant, NotifyMerchantInput{
        MerchantID:     req.MerchantID,
        DisbursementID: req.DisbursementID,
        Status:         channelStatus.Status,
        Amount:         req.Amount,
        Currency:       req.Currency,
        ChannelRef:     submitResult.ChannelRef,
        CompletedAt:    workflow.Now(ctx),
    }).Get(ctx, nil)
    // Webhook failures are non-blocking — delivery retried by WebhookDeliveryWorkflow

    return &DisbursementResult{
        DisbursementID: req.DisbursementID,
        Status:         channelStatus.Status,
        ChannelRef:     submitResult.ChannelRef,
        CompletedAt:    workflow.Now(ctx),
        Fee:            ledgerResult.Fee,
        NetAmount:      ledgerResult.NetAmount,
    }, nil
}

Batch Disbursement (Child Workflow Fan-Out)

// BatchDisbursementWorkflow processes a batch by fanning out child workflows.
//
// Workflow ID: batch-{batchId}
// Task Queue:  payout-worker
func BatchDisbursementWorkflow(ctx workflow.Context, batch BatchDisbursementRequest) (*BatchResult, error) {

    var futures []workflow.ChildWorkflowFuture

    for _, item := range batch.Items {
        childOpts := workflow.ChildWorkflowOptions{
            WorkflowID: "disbursement-" + item.IdempotencyKey,
            TaskQueue:  "payout-worker",
            RetryPolicy: &temporal.RetryPolicy{
                MaximumAttempts: 1, // Individual items handle their own retries
            },
        }
        childCtx := workflow.WithChildOptions(ctx, childOpts)
        future := workflow.ExecuteChildWorkflow(childCtx, DisbursementWorkflow, item)
        futures = append(futures, future)
    }

    // Collect results
    result := &BatchResult{
        BatchID:    batch.BatchID,
        MerchantID: batch.MerchantID,
    }

    for _, future := range futures {
        var disbResult DisbursementResult
        err := future.Get(ctx, &disbResult)
        if err != nil {
            result.FailedCount++
        } else if disbResult.Status == "SUCCESS" {
            result.SuccessCount++
        } else {
            result.FailedCount++
        }
        result.TotalProcessed++
    }

    // Determine batch status
    if result.FailedCount == 0 {
        result.Status = "COMPLETED"
    } else if result.SuccessCount == 0 {
        result.Status = "FAILED"
    } else {
        result.Status = "PARTIALLY_COMPLETED"
    }

    return result, nil
}

Channel-Specific Retry Policies

// channelRetryPolicy returns activity options tuned per channel and market.
func channelRetryPolicy(channel, currency string) workflow.ActivityOptions {
    switch channel {
    case "mobile_wallet":
        // Mobile wallets: fast but occasionally flaky
        return workflow.ActivityOptions{
            StartToCloseTimeout: 60 * time.Second,
            RetryPolicy: &temporal.RetryPolicy{
                InitialInterval:    2 * time.Second,
                BackoffCoefficient: 2.0,
                MaximumInterval:    30 * time.Second,
                MaximumAttempts:    5,
            },
        }
    case "bank_transfer":
        // Bank transfers: slower, more reliable
        return workflow.ActivityOptions{
            StartToCloseTimeout: 5 * time.Minute,
            RetryPolicy: &temporal.RetryPolicy{
                InitialInterval:    5 * time.Second,
                BackoffCoefficient: 2.0,
                MaximumInterval:    2 * time.Minute,
                MaximumAttempts:    3,
            },
        }
    case "agent_network":
        // Agent networks: variable latency
        return workflow.ActivityOptions{
            StartToCloseTimeout: 3 * time.Minute,
            RetryPolicy: &temporal.RetryPolicy{
                InitialInterval:    3 * time.Second,
                BackoffCoefficient: 2.0,
                MaximumInterval:    1 * time.Minute,
                MaximumAttempts:    4,
            },
        }
    default:
        return workflow.ActivityOptions{
            StartToCloseTimeout: 2 * time.Minute,
            RetryPolicy: &temporal.RetryPolicy{
                MaximumAttempts: 3,
            },
        }
    }
}

Activities Summary

Activity Description Idempotent Compensating Action
ValidateRequest Validates beneficiary, amount limits, channel availability Yes N/A
ReserveBalance Holds funds in merchant balance Yes ReleaseBalance
SubmitToChannel Sends disbursement to operator/bank channel Yes (via idempotency key) N/A (channel handles)
PollChannelStatus Polls channel for terminal status Yes N/A
MarkAsStuck Updates status to STUCK for manual intervention Yes N/A
UpdateLedger Commits or releases reservation, records settlement Yes N/A
NotifyMerchant Enqueues webhook delivery to merchant Yes N/A
ReleaseBalance Releases reserved funds (saga compensation) Yes N/A

Signals

Signal Payload Purpose
manual-resolve ManualResolution{Status, ResolvedBy, Notes} Allows operations team to resolve STUCK disbursements

Failure Modes & Recovery

Failure Behaviour
Validation fails Workflow returns error, no side effects
Balance reservation fails Workflow returns error (insufficient funds)
Channel submission fails Saga compensation releases reserved balance
Channel polling times out (30m) Marked STUCK, awaits manual signal
Ledger update fails Retried 5 times with backoff; on exhaustion, alert raised
Webhook delivery fails Non-blocking; retried by separate WebhookDeliveryWorkflow