Skip to content

AML/KYC Verification Workflow

Temporal Workflow Definition Service: compliance-svc Markets: PK, BD, NP, IQ, EG Scale: 270M+ transactions, $1B+ processed


Overview

The AML/KYC Workflow orchestrates anti-money laundering screening and know-your-customer verification for cross-border remittances. It runs sanctions screening against multiple watchlists, performs KYC verification with primary and fallback providers, executes enhanced due diligence (EDD) for high-risk corridors, and supports human-in-the-loop manual review with a 72-hour timeout that auto-escalates to the CDO.


Workflow Definition

package workflows

import (
    "time"

    "go.temporal.io/sdk/temporal"
    "go.temporal.io/sdk/workflow"
)

// AMLKYCRequest is the input to the AML/KYC workflow.
type AMLKYCRequest struct {
    RemittanceID  string
    MerchantID    string
    CorridorID    string
    SourceCountry string
    DestCountry   string
    Sender        PersonInfo
    Receiver      PersonInfo
    Amount        decimal
    Currency      string
    TraceID       string
}

// PersonInfo holds identity details for screening.
type PersonInfo struct {
    FullName          string
    DateOfBirth       string // ISO 8601
    Nationality       string // ISO 3166-1 alpha-2
    IDDocumentType    string // NATIONAL_ID, PASSPORT, DRIVING_LICENCE, VOTER_ID
    IDDocumentNumber  string
    IDDocumentCountry string
    Address           string
    PhoneNumber       string
}

// AMLKYCResult is the final verification outcome.
type AMLKYCResult struct {
    RemittanceID     string
    OverallStatus    string // APPROVED, REJECTED, ESCALATED
    SanctionsStatus  string
    KYCStatus        string
    PEPStatus        string
    EDDStatus        string // N/A if not required
    ManualReviewStatus string // N/A if not required
    CompletedAt      time.Time
}

// ManualReviewSignal is sent by a compliance officer to resolve a case.
type ManualReviewSignal struct {
    ReviewedBy  string // compliance officer user ID
    Decision    string // APPROVED, REJECTED
    Notes       string
    RiskRating  string // LOW, MEDIUM, HIGH
}

// AMLKYCWorkflow orchestrates full AML/KYC verification for a remittance.
//
// Workflow ID: aml-kyc-{remittanceId}
// Task Queue:  compliance-worker
func AMLKYCWorkflow(ctx workflow.Context, req AMLKYCRequest) (*AMLKYCResult, error) {

    logger := workflow.GetLogger(ctx)

    // -----------------------------------------------------------------
    // Activity options
    // -----------------------------------------------------------------
    screeningOpts := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts:    3,
            InitialInterval:   2 * time.Second,
            BackoffCoefficient: 2.0,
        },
    }

    kycOpts := workflow.ActivityOptions{
        StartToCloseTimeout: 60 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts:    3,
            InitialInterval:   2 * time.Second,
            BackoffCoefficient: 2.0,
        },
    }

    kycFallbackOpts := workflow.ActivityOptions{
        StartToCloseTimeout: 90 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts:    2,
            InitialInterval:   5 * time.Second,
            BackoffCoefficient: 2.0,
        },
    }

    eddOpts := workflow.ActivityOptions{
        StartToCloseTimeout: 2 * time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts:    3,
            BackoffCoefficient: 2.0,
        },
    }

    pepOpts := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts:    3,
            BackoffCoefficient: 2.0,
        },
    }

    updateOpts := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts: 5,
        },
    }

    // -----------------------------------------------------------------
    // Workflow state
    // -----------------------------------------------------------------
    state := &AMLKYCState{
        RemittanceID: req.RemittanceID,
        Status:       "IN_PROGRESS",
    }

    // Query handler for verification status
    _ = workflow.SetQueryHandler(ctx, "verification-status", func() (*AMLKYCState, error) {
        return state, nil
    })

    // Signal channel for manual review resolution
    manualReviewCh := workflow.GetSignalChannel(ctx, "manual-review-decision")

    // -----------------------------------------------------------------
    // Step 1: Sanctions Screening (parallel for sender and receiver)
    // -----------------------------------------------------------------
    screenCtx := workflow.WithActivityOptions(ctx, screeningOpts)

    watchlists := []string{
        "OFAC_SDN",
        "UN_CONSOLIDATED",
        "EU_SANCTIONS",
        "UK_HMT",
        "LOCAL_PEP_" + req.DestCountry,
        "LOCAL_SANCTIONS_" + req.DestCountry,
    }

    // Screen sender and receiver in parallel
    senderScreenFuture := workflow.ExecuteActivity(screenCtx, ScreenSanctions, ScreenSanctionsInput{
        Person:     req.Sender,
        Watchlists: watchlists,
        Role:       "SENDER",
        TraceID:    req.TraceID,
    })

    receiverScreenFuture := workflow.ExecuteActivity(screenCtx, ScreenSanctions, ScreenSanctionsInput{
        Person:     req.Receiver,
        Watchlists: watchlists,
        Role:       "RECEIVER",
        TraceID:    req.TraceID,
    })

    var senderScreenResult, receiverScreenResult SanctionsResult
    if err := senderScreenFuture.Get(ctx, &senderScreenResult); err != nil {
        return nil, err
    }
    if err := receiverScreenFuture.Get(ctx, &receiverScreenResult); err != nil {
        return nil, err
    }

    // If either party has a confirmed sanctions match, reject immediately
    if senderScreenResult.Status == "MATCH_CONFIRMED" || receiverScreenResult.Status == "MATCH_CONFIRMED" {
        state.SanctionsStatus = "BLOCKED"
        state.Status = "REJECTED"

        updateCtx := workflow.WithActivityOptions(ctx, updateOpts)
        _ = workflow.ExecuteActivity(updateCtx, UpdateComplianceRecord, UpdateComplianceInput{
            RemittanceID:    req.RemittanceID,
            OverallStatus:   "REJECTED",
            SanctionsStatus: "BLOCKED",
            RiskLevel:       "CRITICAL",
        }).Get(ctx, nil)

        return &AMLKYCResult{
            RemittanceID:    req.RemittanceID,
            OverallStatus:   "REJECTED",
            SanctionsStatus: "BLOCKED",
            CompletedAt:     workflow.Now(ctx),
        }, nil
    }

    state.SanctionsStatus = "CLEAR"
    requiresManualReview := senderScreenResult.Status == "POTENTIAL_MATCH" ||
        receiverScreenResult.Status == "POTENTIAL_MATCH"

    // -----------------------------------------------------------------
    // Step 2: KYC Verification (primary provider, with fallback)
    // -----------------------------------------------------------------
    kycCtx := workflow.WithActivityOptions(ctx, kycOpts)
    var kycResult KYCResult
    err := workflow.ExecuteActivity(kycCtx, SubmitKYC, SubmitKYCInput{
        Person:   req.Receiver,
        Provider: "primary", // Configured per corridor
        Country:  req.DestCountry,
        TraceID:  req.TraceID,
    }).Get(ctx, &kycResult)

    // Fallback to secondary provider on timeout or failure
    if err != nil || kycResult.Status == "TIMEOUT" || kycResult.Status == "PROVIDER_ERROR" {
        logger.Warn("Primary KYC provider failed, falling back to secondary",
            "remittanceId", req.RemittanceID,
            "primaryError", err,
        )

        fallbackCtx := workflow.WithActivityOptions(ctx, kycFallbackOpts)
        err = workflow.ExecuteActivity(fallbackCtx, FallbackKYC, FallbackKYCInput{
            Person:   req.Receiver,
            Provider: "secondary",
            Country:  req.DestCountry,
            TraceID:  req.TraceID,
        }).Get(ctx, &kycResult)

        if err != nil {
            // Both providers failed — escalate for manual review
            kycResult = KYCResult{Status: "MANUAL_REVIEW_REQUIRED"}
            requiresManualReview = true
        }
    }

    state.KYCStatus = kycResult.Status
    if kycResult.Status == "FAILED" {
        state.Status = "REJECTED"

        updateCtx := workflow.WithActivityOptions(ctx, updateOpts)
        _ = workflow.ExecuteActivity(updateCtx, UpdateComplianceRecord, UpdateComplianceInput{
            RemittanceID:    req.RemittanceID,
            OverallStatus:   "REJECTED",
            SanctionsStatus: "CLEAR",
            KYCStatus:       "FAILED",
            RiskLevel:       "HIGH",
        }).Get(ctx, nil)

        return &AMLKYCResult{
            RemittanceID:    req.RemittanceID,
            OverallStatus:   "REJECTED",
            SanctionsStatus: "CLEAR",
            KYCStatus:       "FAILED",
            CompletedAt:     workflow.Now(ctx),
        }, nil
    }

    // -----------------------------------------------------------------
    // Step 3: PEP (Politically Exposed Person) Check
    // -----------------------------------------------------------------
    pepCtx := workflow.WithActivityOptions(ctx, pepOpts)

    senderPEPFuture := workflow.ExecuteActivity(pepCtx, PEPCheck, PEPCheckInput{
        Person:  req.Sender,
        Role:    "SENDER",
        Country: req.SourceCountry,
        TraceID: req.TraceID,
    })

    receiverPEPFuture := workflow.ExecuteActivity(pepCtx, PEPCheck, PEPCheckInput{
        Person:  req.Receiver,
        Role:    "RECEIVER",
        Country: req.DestCountry,
        TraceID: req.TraceID,
    })

    var senderPEP, receiverPEP PEPResult
    if err := senderPEPFuture.Get(ctx, &senderPEP); err != nil {
        return nil, err
    }
    if err := receiverPEPFuture.Get(ctx, &receiverPEP); err != nil {
        return nil, err
    }

    state.PEPStatus = "CLEAR"
    if senderPEP.IsPEP || receiverPEP.IsPEP {
        state.PEPStatus = "PEP_IDENTIFIED"
        requiresManualReview = true
    }

    // -----------------------------------------------------------------
    // Step 4: Enhanced Due Diligence (for high-risk corridors)
    // -----------------------------------------------------------------
    corridorRequiresEDD := isHighRiskCorridor(req.CorridorID, req.DestCountry)
    state.EDDStatus = "N/A"

    if corridorRequiresEDD {
        eddCtx := workflow.WithActivityOptions(ctx, eddOpts)
        var eddResult EDDResult
        err = workflow.ExecuteActivity(eddCtx, EnhancedDueDiligence, EDDInput{
            RemittanceID:  req.RemittanceID,
            Sender:        req.Sender,
            Receiver:      req.Receiver,
            Amount:        req.Amount,
            Currency:      req.Currency,
            CorridorID:    req.CorridorID,
            SourceCountry: req.SourceCountry,
            DestCountry:   req.DestCountry,
            TraceID:       req.TraceID,
        }).Get(ctx, &eddResult)

        if err != nil {
            return nil, err
        }

        state.EDDStatus = eddResult.Status
        if eddResult.RiskLevel == "HIGH" || eddResult.RiskLevel == "CRITICAL" {
            requiresManualReview = true
        }
    }

    // -----------------------------------------------------------------
    // Step 5: Manual Review (human-in-the-loop, if required)
    // -----------------------------------------------------------------
    state.ManualReviewStatus = "N/A"

    if requiresManualReview {
        state.Status = "AWAITING_MANUAL_REVIEW"
        state.ManualReviewStatus = "PENDING"

        logger.Info("Escalating to manual review",
            "remittanceId", req.RemittanceID,
            "sanctions", state.SanctionsStatus,
            "kyc", state.KYCStatus,
            "pep", state.PEPStatus,
            "edd", state.EDDStatus,
        )

        // Notify compliance team
        notifyCtx := workflow.WithActivityOptions(ctx, updateOpts)
        _ = workflow.ExecuteActivity(notifyCtx, NotifyComplianceTeam, NotifyComplianceInput{
            RemittanceID: req.RemittanceID,
            MerchantID:   req.MerchantID,
            Reason:       buildReviewReason(state),
            Priority:     determinePriority(state),
        }).Get(ctx, nil)

        // Wait for manual review signal with 72-hour timeout
        manualReviewTimeout := 72 * time.Hour
        timerCtx, cancelTimer := workflow.WithCancel(ctx)
        timerFuture := workflow.NewTimer(timerCtx, manualReviewTimeout)

        resolved := false
        var reviewSignal ManualReviewSignal

        selector := workflow.NewSelector(ctx)

        // Manual review signal from compliance officer
        selector.AddReceive(manualReviewCh, func(ch workflow.ReceiveChannel, more bool) {
            ch.Receive(ctx, &reviewSignal)
            resolved = true
            cancelTimer()
        })

        // 72-hour timeout — auto-escalate to CDO
        selector.AddFuture(timerFuture, func(f workflow.Future) {
            if !resolved {
                logger.Warn("Manual review timed out after 72h, escalating to CDO",
                    "remittanceId", req.RemittanceID,
                )
            }
        })

        selector.Select(ctx)

        if resolved {
            // Compliance officer made a decision
            state.ManualReviewStatus = reviewSignal.Decision
            if reviewSignal.Decision == "APPROVED" {
                state.Status = "APPROVED"
            } else {
                state.Status = "REJECTED"
            }

            // Record the review
            updateCtx := workflow.WithActivityOptions(ctx, updateOpts)
            _ = workflow.ExecuteActivity(updateCtx, RecordManualReview, RecordManualReviewInput{
                RemittanceID: req.RemittanceID,
                ReviewedBy:   reviewSignal.ReviewedBy,
                Decision:     reviewSignal.Decision,
                Notes:        reviewSignal.Notes,
                RiskRating:   reviewSignal.RiskRating,
            }).Get(ctx, nil)

        } else {
            // Timeout: auto-escalate to CDO
            state.ManualReviewStatus = "ESCALATED_TO_CDO"
            state.Status = "ESCALATED"

            escalateCtx := workflow.WithActivityOptions(ctx, updateOpts)
            _ = workflow.ExecuteActivity(escalateCtx, EscalateToCDO, EscalateToCDOInput{
                RemittanceID: req.RemittanceID,
                MerchantID:   req.MerchantID,
                Reason:       "Manual review timed out after 72 hours",
                State:        state,
            }).Get(ctx, nil)

            // Wait for CDO signal (no timeout — CDO must decide)
            var cdoSignal ManualReviewSignal
            manualReviewCh.Receive(ctx, &cdoSignal)

            state.ManualReviewStatus = cdoSignal.Decision
            if cdoSignal.Decision == "APPROVED" {
                state.Status = "APPROVED"
            } else {
                state.Status = "REJECTED"
            }
        }
    } else {
        // All automated checks passed
        state.Status = "APPROVED"
    }

    // -----------------------------------------------------------------
    // Step 6: Update Final Compliance Record
    // -----------------------------------------------------------------
    updateCtx := workflow.WithActivityOptions(ctx, updateOpts)
    _ = workflow.ExecuteActivity(updateCtx, UpdateComplianceRecord, UpdateComplianceInput{
        RemittanceID:    req.RemittanceID,
        OverallStatus:   state.Status,
        SanctionsStatus: state.SanctionsStatus,
        KYCStatus:       state.KYCStatus,
        PEPStatus:       state.PEPStatus,
        EDDStatus:       state.EDDStatus,
        ManualReview:    state.ManualReviewStatus,
        RiskLevel:       determineRiskLevel(state),
    }).Get(ctx, nil)

    return &AMLKYCResult{
        RemittanceID:       req.RemittanceID,
        OverallStatus:      state.Status,
        SanctionsStatus:    state.SanctionsStatus,
        KYCStatus:          state.KYCStatus,
        PEPStatus:          state.PEPStatus,
        EDDStatus:          state.EDDStatus,
        ManualReviewStatus: state.ManualReviewStatus,
        CompletedAt:        workflow.Now(ctx),
    }, nil
}

// isHighRiskCorridor returns true for corridors requiring Enhanced Due Diligence.
func isHighRiskCorridor(corridorID, destCountry string) bool {
    highRiskCountries := map[string]bool{
        "IQ": true, // Iraq — elevated risk per FATF
    }
    return highRiskCountries[destCountry]
    // Additional logic may check corridor-specific configuration
}

Activities Summary

Activity Description Timeout Idempotent
ScreenSanctions Screens person against OFAC, UN, EU, UK HMT, local watchlists 30s Yes
SubmitKYC Submits KYC verification to primary provider 60s Yes
FallbackKYC Submits KYC to secondary provider on primary failure/timeout 90s Yes
PEPCheck Checks if person is a Politically Exposed Person 30s Yes
EnhancedDueDiligence Runs EDD checks for high-risk corridors (source of funds, etc.) 2m Yes
NotifyComplianceTeam Sends alert to compliance team for manual review 10s Yes
RecordManualReview Persists compliance officer's review decision 10s Yes
EscalateToCDO Escalates unresolved case to Chief Digital Officer 10s Yes
UpdateComplianceRecord Updates final compliance status in SurrealDB 10s Yes

Signals

Signal Payload Purpose
manual-review-decision ManualReviewSignal{ReviewedBy, Decision, Notes, RiskRating} Compliance officer or CDO submits their review decision

Query Handlers

Query Response Purpose
verification-status AMLKYCState{RemittanceID, Status, SanctionsStatus, KYCStatus, PEPStatus, EDDStatus, ManualReviewStatus} Check verification progress from compliance dashboard

Watchlists Screened

Watchlist Source Coverage
OFAC SDN US Treasury Global
UN Consolidated United Nations Global
EU Sanctions European Union Global
UK HMT HM Treasury Global
Local PEP lists Per-country databases PK, BD, NP, IQ, EG
Local Sanctions Per-country regulators PK, BD, NP, IQ, EG

Timeout & Escalation Chain

Sanctions + KYC + PEP + EDD (automated)
  │
  ├─ All clear → APPROVED (seconds)
  │
  └─ Review needed → Manual Review (compliance team)
                        │
                        ├─ Decision within 72h → APPROVED / REJECTED
                        │
                        └─ No decision after 72h → Auto-escalate to CDO
                                                     │
                                                     └─ CDO decides → APPROVED / REJECTED

State Machine

IN_PROGRESS → APPROVED
            → REJECTED
            → AWAITING_MANUAL_REVIEW → APPROVED
                                     → REJECTED
                                     → ESCALATED → APPROVED
                                                 → REJECTED

Risk Corridor Configuration

Corridor EDD Required Notes
*→IQ Yes Iraq — FATF elevated monitoring
High-value (>$10K equivalent) Yes All corridors, per internal policy
PEP-involved Yes Automatic EDD when PEP identified

Failure Modes

Failure Behaviour
Sanctions provider unavailable Retried 3 times; workflow fails if all attempts exhausted
Primary KYC provider timeout Automatic fallback to secondary provider
Both KYC providers fail Escalated to manual review
PEP check fails Retried 3 times; on exhaustion, escalated to manual review
EDD check fails Workflow fails (cannot proceed without EDD for high-risk corridors)
Manual review not completed in 72h Auto-escalated to CDO with full case context
CDO escalation No timeout — CDO must provide a decision