The AML/KYC Workflow orchestrates anti-money laundering screening and know-your-customer verification for cross-border remittances. It runs sanctions screening against multiple watchlists, performs KYC verification with primary and fallback providers, executes enhanced due diligence (EDD) for high-risk corridors, and supports human-in-the-loop manual review with a 72-hour timeout that auto-escalates to the CDO.
package workflows
import (
"time"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
// AMLKYCRequest is the input to the AML/KYC workflow.
type AMLKYCRequest struct {
RemittanceID string
MerchantID string
CorridorID string
SourceCountry string
DestCountry string
Sender PersonInfo
Receiver PersonInfo
Amount decimal
Currency string
TraceID string
}
// PersonInfo holds identity details for screening.
type PersonInfo struct {
FullName string
DateOfBirth string // ISO 8601
Nationality string // ISO 3166-1 alpha-2
IDDocumentType string // NATIONAL_ID, PASSPORT, DRIVING_LICENCE, VOTER_ID
IDDocumentNumber string
IDDocumentCountry string
Address string
PhoneNumber string
}
// AMLKYCResult is the final verification outcome.
type AMLKYCResult struct {
RemittanceID string
OverallStatus string // APPROVED, REJECTED, ESCALATED
SanctionsStatus string
KYCStatus string
PEPStatus string
EDDStatus string // N/A if not required
ManualReviewStatus string // N/A if not required
CompletedAt time.Time
}
// ManualReviewSignal is sent by a compliance officer to resolve a case.
type ManualReviewSignal struct {
ReviewedBy string // compliance officer user ID
Decision string // APPROVED, REJECTED
Notes string
RiskRating string // LOW, MEDIUM, HIGH
}
// AMLKYCWorkflow orchestrates full AML/KYC verification for a remittance.
//
// Workflow ID: aml-kyc-{remittanceId}
// Task Queue: compliance-worker
func AMLKYCWorkflow(ctx workflow.Context, req AMLKYCRequest) (*AMLKYCResult, error) {
logger := workflow.GetLogger(ctx)
// -----------------------------------------------------------------
// Activity options
// -----------------------------------------------------------------
screeningOpts := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
InitialInterval: 2 * time.Second,
BackoffCoefficient: 2.0,
},
}
kycOpts := workflow.ActivityOptions{
StartToCloseTimeout: 60 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
InitialInterval: 2 * time.Second,
BackoffCoefficient: 2.0,
},
}
kycFallbackOpts := workflow.ActivityOptions{
StartToCloseTimeout: 90 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 2,
InitialInterval: 5 * time.Second,
BackoffCoefficient: 2.0,
},
}
eddOpts := workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
BackoffCoefficient: 2.0,
},
}
pepOpts := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
BackoffCoefficient: 2.0,
},
}
updateOpts := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 5,
},
}
// -----------------------------------------------------------------
// Workflow state
// -----------------------------------------------------------------
state := &AMLKYCState{
RemittanceID: req.RemittanceID,
Status: "IN_PROGRESS",
}
// Query handler for verification status
_ = workflow.SetQueryHandler(ctx, "verification-status", func() (*AMLKYCState, error) {
return state, nil
})
// Signal channel for manual review resolution
manualReviewCh := workflow.GetSignalChannel(ctx, "manual-review-decision")
// -----------------------------------------------------------------
// Step 1: Sanctions Screening (parallel for sender and receiver)
// -----------------------------------------------------------------
screenCtx := workflow.WithActivityOptions(ctx, screeningOpts)
watchlists := []string{
"OFAC_SDN",
"UN_CONSOLIDATED",
"EU_SANCTIONS",
"UK_HMT",
"LOCAL_PEP_" + req.DestCountry,
"LOCAL_SANCTIONS_" + req.DestCountry,
}
// Screen sender and receiver in parallel
senderScreenFuture := workflow.ExecuteActivity(screenCtx, ScreenSanctions, ScreenSanctionsInput{
Person: req.Sender,
Watchlists: watchlists,
Role: "SENDER",
TraceID: req.TraceID,
})
receiverScreenFuture := workflow.ExecuteActivity(screenCtx, ScreenSanctions, ScreenSanctionsInput{
Person: req.Receiver,
Watchlists: watchlists,
Role: "RECEIVER",
TraceID: req.TraceID,
})
var senderScreenResult, receiverScreenResult SanctionsResult
if err := senderScreenFuture.Get(ctx, &senderScreenResult); err != nil {
return nil, err
}
if err := receiverScreenFuture.Get(ctx, &receiverScreenResult); err != nil {
return nil, err
}
// If either party has a confirmed sanctions match, reject immediately
if senderScreenResult.Status == "MATCH_CONFIRMED" || receiverScreenResult.Status == "MATCH_CONFIRMED" {
state.SanctionsStatus = "BLOCKED"
state.Status = "REJECTED"
updateCtx := workflow.WithActivityOptions(ctx, updateOpts)
_ = workflow.ExecuteActivity(updateCtx, UpdateComplianceRecord, UpdateComplianceInput{
RemittanceID: req.RemittanceID,
OverallStatus: "REJECTED",
SanctionsStatus: "BLOCKED",
RiskLevel: "CRITICAL",
}).Get(ctx, nil)
return &AMLKYCResult{
RemittanceID: req.RemittanceID,
OverallStatus: "REJECTED",
SanctionsStatus: "BLOCKED",
CompletedAt: workflow.Now(ctx),
}, nil
}
state.SanctionsStatus = "CLEAR"
requiresManualReview := senderScreenResult.Status == "POTENTIAL_MATCH" ||
receiverScreenResult.Status == "POTENTIAL_MATCH"
// -----------------------------------------------------------------
// Step 2: KYC Verification (primary provider, with fallback)
// -----------------------------------------------------------------
kycCtx := workflow.WithActivityOptions(ctx, kycOpts)
var kycResult KYCResult
err := workflow.ExecuteActivity(kycCtx, SubmitKYC, SubmitKYCInput{
Person: req.Receiver,
Provider: "primary", // Configured per corridor
Country: req.DestCountry,
TraceID: req.TraceID,
}).Get(ctx, &kycResult)
// Fallback to secondary provider on timeout or failure
if err != nil || kycResult.Status == "TIMEOUT" || kycResult.Status == "PROVIDER_ERROR" {
logger.Warn("Primary KYC provider failed, falling back to secondary",
"remittanceId", req.RemittanceID,
"primaryError", err,
)
fallbackCtx := workflow.WithActivityOptions(ctx, kycFallbackOpts)
err = workflow.ExecuteActivity(fallbackCtx, FallbackKYC, FallbackKYCInput{
Person: req.Receiver,
Provider: "secondary",
Country: req.DestCountry,
TraceID: req.TraceID,
}).Get(ctx, &kycResult)
if err != nil {
// Both providers failed — escalate for manual review
kycResult = KYCResult{Status: "MANUAL_REVIEW_REQUIRED"}
requiresManualReview = true
}
}
state.KYCStatus = kycResult.Status
if kycResult.Status == "FAILED" {
state.Status = "REJECTED"
updateCtx := workflow.WithActivityOptions(ctx, updateOpts)
_ = workflow.ExecuteActivity(updateCtx, UpdateComplianceRecord, UpdateComplianceInput{
RemittanceID: req.RemittanceID,
OverallStatus: "REJECTED",
SanctionsStatus: "CLEAR",
KYCStatus: "FAILED",
RiskLevel: "HIGH",
}).Get(ctx, nil)
return &AMLKYCResult{
RemittanceID: req.RemittanceID,
OverallStatus: "REJECTED",
SanctionsStatus: "CLEAR",
KYCStatus: "FAILED",
CompletedAt: workflow.Now(ctx),
}, nil
}
// -----------------------------------------------------------------
// Step 3: PEP (Politically Exposed Person) Check
// -----------------------------------------------------------------
pepCtx := workflow.WithActivityOptions(ctx, pepOpts)
senderPEPFuture := workflow.ExecuteActivity(pepCtx, PEPCheck, PEPCheckInput{
Person: req.Sender,
Role: "SENDER",
Country: req.SourceCountry,
TraceID: req.TraceID,
})
receiverPEPFuture := workflow.ExecuteActivity(pepCtx, PEPCheck, PEPCheckInput{
Person: req.Receiver,
Role: "RECEIVER",
Country: req.DestCountry,
TraceID: req.TraceID,
})
var senderPEP, receiverPEP PEPResult
if err := senderPEPFuture.Get(ctx, &senderPEP); err != nil {
return nil, err
}
if err := receiverPEPFuture.Get(ctx, &receiverPEP); err != nil {
return nil, err
}
state.PEPStatus = "CLEAR"
if senderPEP.IsPEP || receiverPEP.IsPEP {
state.PEPStatus = "PEP_IDENTIFIED"
requiresManualReview = true
}
// -----------------------------------------------------------------
// Step 4: Enhanced Due Diligence (for high-risk corridors)
// -----------------------------------------------------------------
corridorRequiresEDD := isHighRiskCorridor(req.CorridorID, req.DestCountry)
state.EDDStatus = "N/A"
if corridorRequiresEDD {
eddCtx := workflow.WithActivityOptions(ctx, eddOpts)
var eddResult EDDResult
err = workflow.ExecuteActivity(eddCtx, EnhancedDueDiligence, EDDInput{
RemittanceID: req.RemittanceID,
Sender: req.Sender,
Receiver: req.Receiver,
Amount: req.Amount,
Currency: req.Currency,
CorridorID: req.CorridorID,
SourceCountry: req.SourceCountry,
DestCountry: req.DestCountry,
TraceID: req.TraceID,
}).Get(ctx, &eddResult)
if err != nil {
return nil, err
}
state.EDDStatus = eddResult.Status
if eddResult.RiskLevel == "HIGH" || eddResult.RiskLevel == "CRITICAL" {
requiresManualReview = true
}
}
// -----------------------------------------------------------------
// Step 5: Manual Review (human-in-the-loop, if required)
// -----------------------------------------------------------------
state.ManualReviewStatus = "N/A"
if requiresManualReview {
state.Status = "AWAITING_MANUAL_REVIEW"
state.ManualReviewStatus = "PENDING"
logger.Info("Escalating to manual review",
"remittanceId", req.RemittanceID,
"sanctions", state.SanctionsStatus,
"kyc", state.KYCStatus,
"pep", state.PEPStatus,
"edd", state.EDDStatus,
)
// Notify compliance team
notifyCtx := workflow.WithActivityOptions(ctx, updateOpts)
_ = workflow.ExecuteActivity(notifyCtx, NotifyComplianceTeam, NotifyComplianceInput{
RemittanceID: req.RemittanceID,
MerchantID: req.MerchantID,
Reason: buildReviewReason(state),
Priority: determinePriority(state),
}).Get(ctx, nil)
// Wait for manual review signal with 72-hour timeout
manualReviewTimeout := 72 * time.Hour
timerCtx, cancelTimer := workflow.WithCancel(ctx)
timerFuture := workflow.NewTimer(timerCtx, manualReviewTimeout)
resolved := false
var reviewSignal ManualReviewSignal
selector := workflow.NewSelector(ctx)
// Manual review signal from compliance officer
selector.AddReceive(manualReviewCh, func(ch workflow.ReceiveChannel, more bool) {
ch.Receive(ctx, &reviewSignal)
resolved = true
cancelTimer()
})
// 72-hour timeout — auto-escalate to CDO
selector.AddFuture(timerFuture, func(f workflow.Future) {
if !resolved {
logger.Warn("Manual review timed out after 72h, escalating to CDO",
"remittanceId", req.RemittanceID,
)
}
})
selector.Select(ctx)
if resolved {
// Compliance officer made a decision
state.ManualReviewStatus = reviewSignal.Decision
if reviewSignal.Decision == "APPROVED" {
state.Status = "APPROVED"
} else {
state.Status = "REJECTED"
}
// Record the review
updateCtx := workflow.WithActivityOptions(ctx, updateOpts)
_ = workflow.ExecuteActivity(updateCtx, RecordManualReview, RecordManualReviewInput{
RemittanceID: req.RemittanceID,
ReviewedBy: reviewSignal.ReviewedBy,
Decision: reviewSignal.Decision,
Notes: reviewSignal.Notes,
RiskRating: reviewSignal.RiskRating,
}).Get(ctx, nil)
} else {
// Timeout: auto-escalate to CDO
state.ManualReviewStatus = "ESCALATED_TO_CDO"
state.Status = "ESCALATED"
escalateCtx := workflow.WithActivityOptions(ctx, updateOpts)
_ = workflow.ExecuteActivity(escalateCtx, EscalateToCDO, EscalateToCDOInput{
RemittanceID: req.RemittanceID,
MerchantID: req.MerchantID,
Reason: "Manual review timed out after 72 hours",
State: state,
}).Get(ctx, nil)
// Wait for CDO signal (no timeout — CDO must decide)
var cdoSignal ManualReviewSignal
manualReviewCh.Receive(ctx, &cdoSignal)
state.ManualReviewStatus = cdoSignal.Decision
if cdoSignal.Decision == "APPROVED" {
state.Status = "APPROVED"
} else {
state.Status = "REJECTED"
}
}
} else {
// All automated checks passed
state.Status = "APPROVED"
}
// -----------------------------------------------------------------
// Step 6: Update Final Compliance Record
// -----------------------------------------------------------------
updateCtx := workflow.WithActivityOptions(ctx, updateOpts)
_ = workflow.ExecuteActivity(updateCtx, UpdateComplianceRecord, UpdateComplianceInput{
RemittanceID: req.RemittanceID,
OverallStatus: state.Status,
SanctionsStatus: state.SanctionsStatus,
KYCStatus: state.KYCStatus,
PEPStatus: state.PEPStatus,
EDDStatus: state.EDDStatus,
ManualReview: state.ManualReviewStatus,
RiskLevel: determineRiskLevel(state),
}).Get(ctx, nil)
return &AMLKYCResult{
RemittanceID: req.RemittanceID,
OverallStatus: state.Status,
SanctionsStatus: state.SanctionsStatus,
KYCStatus: state.KYCStatus,
PEPStatus: state.PEPStatus,
EDDStatus: state.EDDStatus,
ManualReviewStatus: state.ManualReviewStatus,
CompletedAt: workflow.Now(ctx),
}, nil
}
// isHighRiskCorridor returns true for corridors requiring Enhanced Due Diligence.
func isHighRiskCorridor(corridorID, destCountry string) bool {
highRiskCountries := map[string]bool{
"IQ": true, // Iraq — elevated risk per FATF
}
return highRiskCountries[destCountry]
// Additional logic may check corridor-specific configuration
}