ETL/ELT Pipeline Standards¶
Standard ID: STD-DATA-052 Version: 1.0 Last Updated: 2026-04-03 Owner: Data Team Status: Active
Purpose¶
Define standards for all extract-transform-load (ETL) and extract-load-transform (ELT) pipelines at Simpaisa. Pipelines move data between operational systems, analytical stores, and external partners. At 270M+ transactions annually across six markets, pipeline reliability directly impacts reconciliation, settlement, regulatory reporting, and business intelligence.
Current State¶
- No pipeline orchestration: Data movement jobs are ad hoc cron scripts (Python, bash) running on application servers. There is no visibility into job status, no retry logic, and no dependency management.
- No schema management: When upstream schemas change (MySQL column additions, channel file format changes), downstream pipelines break silently. Failures are discovered when reports are wrong or missing.
- No idempotency: Re-running a failed pipeline risks duplicate records in the target. Engineers must manually verify and deduplicate.
- No data quality checks: Data enters analytical stores and reports without validation. Incorrect data (null amounts, invalid currencies, orphaned records) propagates unchecked.
- No monitoring: Pipeline failures are discovered manually, often hours or days after occurrence.
Target State¶
All data pipelines are orchestrated by Temporal, are idempotent, support backward-compatible schema evolution, include data quality checks, and are monitored with alerting.
Pipeline Architecture¶
Source (MySQL, SurrealDB, Channel Files, Bank Statements)
→ Extraction (Temporal Activity)
→ Transformation (Temporal Activity, idempotent)
→ Quality Checks (Temporal Activity)
→ Load (Temporal Activity, idempotent)
→ Verification (Temporal Activity)
Each pipeline is a Temporal workflow with discrete activities for each stage. Temporal provides retry, timeout, dependency management, and observability natively.
Orchestration Standards¶
| Rule | Requirement |
|---|---|
| Orchestrator | All pipelines MUST use Temporal for orchestration. No cron jobs, no ad hoc scripts |
| Workflow naming | etl-{source}-{target}-{frequency} (e.g., etl-mysql-analytics-hourly) |
| Idempotency | Every pipeline run MUST be idempotent. Re-running with the same input produces the same output without duplicates. Use upsert semantics or deduplication keys |
| Atomicity | Pipeline runs are atomic at the batch level. A failed batch does not leave partial data in the target. Use staging tables or write-ahead patterns |
| Concurrency | No two instances of the same pipeline run concurrently. Temporal's workflow ID uniqueness enforces this |
| Timeout | Every activity MUST have a timeout. No unbounded operations. Default: 30 minutes per activity, 4 hours per workflow |
Schema Evolution¶
All pipeline schemas (source and target) MUST follow backward-compatible evolution rules:
| Allowed Change | Example |
|---|---|
| Add a new optional field | Add channel_reference to transaction export |
| Add a new table/collection | Add settlement_exceptions table |
| Widen a field type | INT → BIGINT, VARCHAR(50) → VARCHAR(100) |
| Forbidden Change | Reason |
|---|---|
| Remove a field | Downstream consumers may depend on it |
| Rename a field | Equivalent to remove + add; breaks consumers |
| Change field type (narrowing) | BIGINT → INT loses data |
| Change field semantics | Reusing a field for a different purpose |
Schema changes MUST be versioned and documented. Breaking changes require a new pipeline version and migration period.
Data Quality Checks¶
Every pipeline MUST include quality checks at data boundaries (after extraction, after transformation, before load):
| Check Type | Description | Action on Failure |
|---|---|---|
| Null check | Mandatory fields are not null | Reject record, log error |
| Type check | Fields match expected types | Reject record, log error |
| Range check | Amounts > 0, dates within reasonable range | Reject record, log error |
| Referential check | Foreign keys resolve (merchant exists, channel valid) | Reject record, log warning |
| Volume check | Record count within expected range (±50% of historical average) | Alert, do not auto-reject |
| Freshness check | Source data timestamp within expected window | Alert, pause pipeline if data is stale |
| Duplicate check | No duplicate records in batch (by primary key) | Deduplicate, log warning |
Rejected records are written to a dead letter table with the rejection reason. The DLT is reviewed daily by the data team.
Monitoring and Alerting¶
| Metric | Threshold | Alert |
|---|---|---|
| Pipeline lag (time since last successful run) | >2x scheduled frequency | PagerDuty alert |
| Error rate (rejected records / total records) | >1% | Slack notification |
| Pipeline duration | >2x historical average | Slack notification |
| Throughput (records/second) | <50% of historical average | Slack notification |
| Dead letter table size | >100 records in 24 hours | PagerDuty alert |
All pipeline metrics are exported via OpenTelemetry to Grafana for dashboard visibility.
Implementation Guidelines¶
Go Pipeline Template¶
All pipelines are implemented in Go using the Temporal Go SDK:
// Workflow definition
func ETLWorkflow(ctx workflow.Context, params ETLParams) error {
extracted := workflow.ExecuteActivity(ctx, ExtractActivity, params)
transformed := workflow.ExecuteActivity(ctx, TransformActivity, extracted)
validated := workflow.ExecuteActivity(ctx, QualityCheckActivity, transformed)
loaded := workflow.ExecuteActivity(ctx, LoadActivity, validated)
return workflow.ExecuteActivity(ctx, VerifyActivity, loaded)
}
Extraction Patterns¶
| Source Type | Pattern |
|---|---|
| MySQL (legacy) | CDC via Debezium → NSQ → pipeline consumer |
| SurrealDB | LIVE SELECT subscription or scheduled query |
| Channel settlement files | SFTP poll or webhook trigger → file parse activity |
| Bank statements | SFTP poll → file parse activity |
| API sources | HTTP client with retry and circuit breaker |
Transformation Rules¶
- Transformations MUST be pure functions — given the same input, produce the same output, with no side effects.
- Currency amounts MUST be handled as integers (minor units) during transformation to avoid floating-point errors. Convert to decimal only at the presentation layer.
- Timestamps MUST be normalised to UTC during transformation. Local timezone is stored as a separate field where needed.
- PII MUST be masked or tokenised per PII-HANDLING-STANDARD.md before loading into analytical stores.
Actions¶
- Deprecate cron pipelines: Audit and inventory all existing cron-based data jobs. Create Temporal workflow equivalents. Decommission cron jobs within 3 months.
- Pipeline registry: Maintain a registry of all pipelines in SurrealDB: name, source, target, frequency, owner, last run status. Queryable via admin API.
- Template repository: Publish a Go pipeline template with Temporal workflow, activities, quality checks, and monitoring instrumentation. All new pipelines start from this template.
- Data quality dashboard: Build a Grafana dashboard showing pipeline health, error rates, dead letter volumes, and data freshness across all pipelines.
- Runbook: Document operational procedures for common pipeline failures: stale source data, schema drift, target unavailability, dead letter overflow.