Skip to content

Events Schema

Owner Classification Review Date Status
CDO Office Internal April 2027 Active
-- ============================================================================
-- Simpaisa Event Sourcing SurrealDB Schema
-- Domain: Event Store, Event Snapshots, CloudEvents Format
-- Target: SurrealDB 2.x | SurrealQL
-- Markets: PK, BD, NP, IQ
-- Scale: 270M+ transactions, $1B+ processed
--
-- DESIGN NOTES:
--   This schema implements an append-only event store following the
--   CloudEvents v1.0 specification (https://cloudevents.io). All domain
--   events from pay-ins, pay-outs, remittances, and cards flow into this
--   store via SurrealDB DEFINE EVENT triggers on their respective tables.
--
--   The event_snapshot table provides periodic aggregate state snapshots
--   to accelerate replay — avoiding full event replay from genesis.
--
--   Events are immutable: no UPDATE or DELETE permissions are granted.
-- ============================================================================

-- ============================================================================
-- TABLE: event_store
-- Append-only event log following CloudEvents v1.0 specification
-- ============================================================================

DEFINE TABLE event_store SCHEMAFULL
  PERMISSIONS
    FOR select WHERE $scope = 'service'
    FOR create WHERE $scope = 'service'
    FOR update NONE
    FOR delete NONE;

-- CloudEvents v1.0 required attributes
DEFINE FIELD id                ON event_store TYPE string
  COMMENT 'CloudEvents: Unique event identifier (ULID recommended)';
DEFINE FIELD specversion       ON event_store TYPE string
  DEFAULT '1.0'
  COMMENT 'CloudEvents: Specification version';
DEFINE FIELD type              ON event_store TYPE string
  COMMENT 'CloudEvents: Event type (e.g. com.simpaisa.payin.success)';
DEFINE FIELD source            ON event_store TYPE string
  COMMENT 'CloudEvents: Event source URI (e.g. /payin-svc/v1)';
DEFINE FIELD time              ON event_store TYPE datetime
  DEFAULT time::now()
  COMMENT 'CloudEvents: Timestamp of occurrence';

-- CloudEvents v1.0 optional attributes
DEFINE FIELD subject           ON event_store TYPE option<string>
  COMMENT 'CloudEvents: Subject of the event (e.g. transaction ID)';
DEFINE FIELD datacontenttype   ON event_store TYPE string
  DEFAULT 'application/json'
  COMMENT 'CloudEvents: Content type of data';
DEFINE FIELD dataschema        ON event_store TYPE option<string>
  COMMENT 'CloudEvents: URI of the data schema';

-- Domain-specific fields (mapped to CloudEvents extensions)
DEFINE FIELD aggregateId       ON event_store TYPE string
  ASSERT $value != NONE
  COMMENT 'Aggregate root identifier';
DEFINE FIELD aggregateType     ON event_store TYPE string
  ASSERT $value != NONE
  COMMENT 'Aggregate type (e.g. pay_in_transaction, disbursement, remittance)';
DEFINE FIELD eventType         ON event_store TYPE string
  ASSERT $value != NONE
  COMMENT 'Domain event type (e.g. PAYIN_SUCCESS, DISBURSEMENT_FAILED)';
DEFINE FIELD eventData         ON event_store TYPE object
  COMMENT 'CloudEvents: Event payload (the "data" attribute)';
DEFINE FIELD sequenceNumber    ON event_store TYPE datetime
  COMMENT 'Monotonic sequence for ordering within an aggregate';
DEFINE FIELD correlationId     ON event_store TYPE option<string>
  COMMENT 'Distributed trace correlation ID';
DEFINE FIELD causationId       ON event_store TYPE option<string>
  COMMENT 'ID of the event that caused this event';
DEFINE FIELD merchantId        ON event_store TYPE option<string>
  COMMENT 'Tenant identifier for partitioned queries';
DEFINE FIELD country           ON event_store TYPE option<string>
  COMMENT 'Market identifier (PK, BD, NP, IQ)';
DEFINE FIELD schemaVersion     ON event_store TYPE int
  DEFAULT 1
  COMMENT 'Payload schema version for backwards-compatible evolution';

-- Timestamp (using CloudEvents time as canonical)
DEFINE FIELD createdAt         ON event_store TYPE datetime
  DEFAULT time::now();

-- ============================================================================
-- Indexes for event replay and query patterns
-- ============================================================================

-- Primary replay index: all events for an aggregate in order
DEFINE INDEX idx_event_aggregate_sequence
  ON event_store FIELDS aggregateId, sequenceNumber
  COMMENT 'Primary replay: fetch all events for an aggregate in order';

-- Type-based queries: all events of a given type
DEFINE INDEX idx_event_type
  ON event_store FIELDS eventType, createdAt
  COMMENT 'Query by event type (e.g. all PAYIN_SUCCESS events)';

-- Aggregate type queries: all events for a class of aggregates
DEFINE INDEX idx_event_aggregate_type
  ON event_store FIELDS aggregateType, createdAt
  COMMENT 'Query by aggregate type (e.g. all disbursement events)';

-- Correlation-based queries: trace a distributed transaction
DEFINE INDEX idx_event_correlation
  ON event_store FIELDS correlationId
  COMMENT 'Trace events across a distributed transaction';

-- Tenant-scoped queries: events for a specific merchant
DEFINE INDEX idx_event_merchant
  ON event_store FIELDS merchantId, createdAt
  COMMENT 'Tenant-scoped event queries';

-- Time-based queries: events within a time range
DEFINE INDEX idx_event_created
  ON event_store FIELDS createdAt
  COMMENT 'Time-range queries for analytics and replay';

-- Country-based queries: events for a specific market
DEFINE INDEX idx_event_country
  ON event_store FIELDS country, eventType, createdAt
  COMMENT 'Market-specific event queries';

-- CloudEvents type + source combination
DEFINE INDEX idx_event_cloudevents_type_source
  ON event_store FIELDS type, source, time
  COMMENT 'CloudEvents: query by type and source';

-- Causation chain: find events caused by a specific event
DEFINE INDEX idx_event_causation
  ON event_store FIELDS causationId
  COMMENT 'Trace causal event chains';

-- ============================================================================
-- TABLE: event_snapshot
-- Periodic aggregate state snapshots to accelerate replay
-- ============================================================================

DEFINE TABLE event_snapshot SCHEMAFULL
  PERMISSIONS
    FOR select WHERE $scope = 'service'
    FOR create, update WHERE $scope = 'service'
    FOR delete WHERE $scope = 'service';

DEFINE FIELD id                ON event_snapshot TYPE string;
DEFINE FIELD aggregateId       ON event_snapshot TYPE string
  ASSERT $value != NONE
  COMMENT 'Aggregate root identifier';
DEFINE FIELD aggregateType     ON event_snapshot TYPE string
  ASSERT $value != NONE
  COMMENT 'Type of aggregate (e.g. pay_in_transaction, disbursement)';
DEFINE FIELD snapshotData      ON event_snapshot TYPE object
  COMMENT 'Serialised aggregate state at snapshot time';
DEFINE FIELD lastEventId       ON event_snapshot TYPE string
  COMMENT 'ID of the last event included in this snapshot';
DEFINE FIELD lastSequence      ON event_snapshot TYPE datetime
  COMMENT 'Sequence number of the last event included';
DEFINE FIELD eventCount        ON event_snapshot TYPE int
  COMMENT 'Total number of events up to this snapshot';
DEFINE FIELD schemaVersion     ON event_snapshot TYPE int
  DEFAULT 1
  COMMENT 'Snapshot schema version for migration';
DEFINE FIELD createdAt         ON event_snapshot TYPE datetime
  DEFAULT time::now();

-- Indexes
DEFINE INDEX idx_snapshot_aggregate
  ON event_snapshot FIELDS aggregateId, createdAt
  COMMENT 'Find latest snapshot for an aggregate';
DEFINE INDEX idx_snapshot_type
  ON event_snapshot FIELDS aggregateType, createdAt
  COMMENT 'Find snapshots by aggregate type';

-- ============================================================================
-- HELPER COMMENTS: Event Replay Patterns
-- ============================================================================
--
-- Full replay for an aggregate:
--   SELECT * FROM event_store
--   WHERE aggregateId = $id
--   ORDER BY sequenceNumber ASC;
--
-- Replay from snapshot:
--   LET $snap = (SELECT * FROM event_snapshot
--     WHERE aggregateId = $id
--     ORDER BY createdAt DESC LIMIT 1);
--   SELECT * FROM event_store
--   WHERE aggregateId = $id
--     AND sequenceNumber > $snap.lastSequence
--   ORDER BY sequenceNumber ASC;
--
-- Fan-out projection (all events of a type since timestamp):
--   SELECT * FROM event_store
--   WHERE eventType = 'PAYIN_SUCCESS'
--     AND createdAt > $since
--   ORDER BY createdAt ASC;
--
-- Cross-aggregate correlation trace:
--   SELECT * FROM event_store
--   WHERE correlationId = $traceId
--   ORDER BY createdAt ASC;
-- ============================================================================