Skip to content

ETL/ELT Pipeline Standards

Standard ID: STD-DATA-052 Version: 1.0 Last Updated: 2026-04-03 Owner: Data Team Status: Active

Purpose

Define standards for all extract-transform-load (ETL) and extract-load-transform (ELT) pipelines at Simpaisa. Pipelines move data between operational systems, analytical stores, and external partners. At 270M+ transactions annually across six markets, pipeline reliability directly impacts reconciliation, settlement, regulatory reporting, and business intelligence.

Current State

  • No pipeline orchestration: Data movement jobs are ad hoc cron scripts (Python, bash) running on application servers. There is no visibility into job status, no retry logic, and no dependency management.
  • No schema management: When upstream schemas change (MySQL column additions, channel file format changes), downstream pipelines break silently. Failures are discovered when reports are wrong or missing.
  • No idempotency: Re-running a failed pipeline risks duplicate records in the target. Engineers must manually verify and deduplicate.
  • No data quality checks: Data enters analytical stores and reports without validation. Incorrect data (null amounts, invalid currencies, orphaned records) propagates unchecked.
  • No monitoring: Pipeline failures are discovered manually, often hours or days after occurrence.

Target State

All data pipelines are orchestrated by Temporal, are idempotent, support backward-compatible schema evolution, include data quality checks, and are monitored with alerting.

Pipeline Architecture

Source (MySQL, SurrealDB, Channel Files, Bank Statements)
  → Extraction (Temporal Activity)
    → Transformation (Temporal Activity, idempotent)
      → Quality Checks (Temporal Activity)
        → Load (Temporal Activity, idempotent)
          → Verification (Temporal Activity)

Each pipeline is a Temporal workflow with discrete activities for each stage. Temporal provides retry, timeout, dependency management, and observability natively.

Orchestration Standards

Rule Requirement
Orchestrator All pipelines MUST use Temporal for orchestration. No cron jobs, no ad hoc scripts
Workflow naming etl-{source}-{target}-{frequency} (e.g., etl-mysql-analytics-hourly)
Idempotency Every pipeline run MUST be idempotent. Re-running with the same input produces the same output without duplicates. Use upsert semantics or deduplication keys
Atomicity Pipeline runs are atomic at the batch level. A failed batch does not leave partial data in the target. Use staging tables or write-ahead patterns
Concurrency No two instances of the same pipeline run concurrently. Temporal's workflow ID uniqueness enforces this
Timeout Every activity MUST have a timeout. No unbounded operations. Default: 30 minutes per activity, 4 hours per workflow

Schema Evolution

All pipeline schemas (source and target) MUST follow backward-compatible evolution rules:

Allowed Change Example
Add a new optional field Add channel_reference to transaction export
Add a new table/collection Add settlement_exceptions table
Widen a field type INTBIGINT, VARCHAR(50)VARCHAR(100)
Forbidden Change Reason
Remove a field Downstream consumers may depend on it
Rename a field Equivalent to remove + add; breaks consumers
Change field type (narrowing) BIGINTINT loses data
Change field semantics Reusing a field for a different purpose

Schema changes MUST be versioned and documented. Breaking changes require a new pipeline version and migration period.

Data Quality Checks

Every pipeline MUST include quality checks at data boundaries (after extraction, after transformation, before load):

Check Type Description Action on Failure
Null check Mandatory fields are not null Reject record, log error
Type check Fields match expected types Reject record, log error
Range check Amounts > 0, dates within reasonable range Reject record, log error
Referential check Foreign keys resolve (merchant exists, channel valid) Reject record, log warning
Volume check Record count within expected range (±50% of historical average) Alert, do not auto-reject
Freshness check Source data timestamp within expected window Alert, pause pipeline if data is stale
Duplicate check No duplicate records in batch (by primary key) Deduplicate, log warning

Rejected records are written to a dead letter table with the rejection reason. The DLT is reviewed daily by the data team.

Monitoring and Alerting

Metric Threshold Alert
Pipeline lag (time since last successful run) >2x scheduled frequency PagerDuty alert
Error rate (rejected records / total records) >1% Slack notification
Pipeline duration >2x historical average Slack notification
Throughput (records/second) <50% of historical average Slack notification
Dead letter table size >100 records in 24 hours PagerDuty alert

All pipeline metrics are exported via OpenTelemetry to Grafana for dashboard visibility.

Implementation Guidelines

Go Pipeline Template

All pipelines are implemented in Go using the Temporal Go SDK:

// Workflow definition
func ETLWorkflow(ctx workflow.Context, params ETLParams) error {
    extracted := workflow.ExecuteActivity(ctx, ExtractActivity, params)
    transformed := workflow.ExecuteActivity(ctx, TransformActivity, extracted)
    validated := workflow.ExecuteActivity(ctx, QualityCheckActivity, transformed)
    loaded := workflow.ExecuteActivity(ctx, LoadActivity, validated)
    return workflow.ExecuteActivity(ctx, VerifyActivity, loaded)
}

Extraction Patterns

Source Type Pattern
MySQL (legacy) CDC via Debezium → NSQ → pipeline consumer
SurrealDB LIVE SELECT subscription or scheduled query
Channel settlement files SFTP poll or webhook trigger → file parse activity
Bank statements SFTP poll → file parse activity
API sources HTTP client with retry and circuit breaker

Transformation Rules

  • Transformations MUST be pure functions — given the same input, produce the same output, with no side effects.
  • Currency amounts MUST be handled as integers (minor units) during transformation to avoid floating-point errors. Convert to decimal only at the presentation layer.
  • Timestamps MUST be normalised to UTC during transformation. Local timezone is stored as a separate field where needed.
  • PII MUST be masked or tokenised per PII-HANDLING-STANDARD.md before loading into analytical stores.

Actions

  1. Deprecate cron pipelines: Audit and inventory all existing cron-based data jobs. Create Temporal workflow equivalents. Decommission cron jobs within 3 months.
  2. Pipeline registry: Maintain a registry of all pipelines in SurrealDB: name, source, target, frequency, owner, last run status. Queryable via admin API.
  3. Template repository: Publish a Go pipeline template with Temporal workflow, activities, quality checks, and monitoring instrumentation. All new pipelines start from this template.
  4. Data quality dashboard: Build a Grafana dashboard showing pipeline health, error rates, dead letter volumes, and data freshness across all pipelines.
  5. Runbook: Document operational procedures for common pipeline failures: stale source data, schema drift, target unavailability, dead letter overflow.