Chuỗi bài (Phần 3 của 8): Bài này xây dựng trên nền tảng ACID transactions từ Phần 2. Chúng ta sẽ thiết kế ledger sử dụng Event Sourcing — giải pháp mà Monzo, Starling Bank và nhiều neo-bank lớn đang dùng để scale.

Event Sourcing & CQRS Trong Fintech Là Gì?

Các hệ thống microservices fintech sử dụng Event Sourcing và CQRS patterns để giữ tính nhất quán dữ liệu phân tán mà không cần distributed locks. Để tránh lỗi dual-write, Transactional Outbox pattern được áp dụng kết hợp với CDC tools như Debezium. Pre-calculated CQRS balance lookups đạt <1ms, trong khi on-the-fly SUM() aggregates tăng từ 2ms đến 200ms theo $O(N)$ với account history.


Tại Sao Ledger Đã Là Event Sourcing Từ Đầu?

Kế toán kép (double-entry bookkeeping) — được phát minh từ thế kỷ 15 — thực chất đã là Event Sourcing:

  • Traditional approach: Lưu current state (balance hiện tại) → mất history
  • Event Sourcing: Lưu chuỗi events bất biến → current state là kết quả replay
Traditional:  accounts.balance = 500,000 VNĐ   (không biết tại sao)

Event Sourcing:
  Event 1: AccountOpened        → balance = 0
  Event 2: MoneyDeposited(1M)   → balance = 1,000,000
  Event 3: MoneyWithdrawn(200K) → balance = 800,000
  Event 4: InterestAccrued(50K) → balance = 850,000
  Event 5: FeeCharged(350K)     → balance = 500,000

Đây chính xác là cách sổ cái kế toán hoạt động — mọi bút toán là một event không thể xóa. Số dư = replay tất cả events từ đầu (hoặc từ snapshot gần nhất).


Event Store Schema: PostgreSQL Production Design

Core Event Store Table

-- Event Store: Bảng trung tâm lưu toàn bộ events của hệ thống
CREATE TABLE event_store (
    event_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    stream_id       UUID NOT NULL,         -- Account/Entity ID (aggregate boundary)
    sequence_number BIGINT NOT NULL,       -- Monotonic counter PER stream
    event_type      VARCHAR(100) NOT NULL, -- 'MoneyDeposited', 'MoneyWithdrawn', etc.
    event_data      JSONB NOT NULL,        -- Payload của event
    metadata        JSONB,                 -- correlation_id, causation_id, user_id, etc.
    created_at      TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,

    -- Quan trọng: Ngăn concurrent race conditions — mỗi stream có sequence riêng
    CONSTRAINT uq_stream_sequence UNIQUE (stream_id, sequence_number)
);

-- Index cho event replay per account
CREATE INDEX idx_event_store_stream ON event_store (stream_id, sequence_number ASC);
-- Index cho CDC/Outbox polling
CREATE INDEX idx_event_store_created ON event_store (created_at ASC);

sequence_number là chìa khóa của Optimistic Concurrency Control (OCC):

// Append event với OCC — prevent concurrent writes lên cùng stream
func appendEvent(db *sql.DB, streamID uuid.UUID, expectedSeq int64, event Event) error {
    query := `
        INSERT INTO event_store (stream_id, sequence_number, event_type, event_data, metadata)
        VALUES ($1, $2, $3, $4, $5)
    `
    // sequence_number = expectedSeq + 1
    // Nếu sequence đã tồn tại → UNIQUE constraint violation → conflict detected
    _, err := db.Exec(query,
        streamID,
        expectedSeq+1,
        event.Type,
        event.Data,
        event.Metadata,
    )
    if isUniqueViolation(err) {
        return ErrConcurrentModification // Retry hoặc return conflict
    }
    return err
}

Event Snapshots: Tránh O(N) Replay

Với tài khoản có lịch sử hàng triệu transactions, replay toàn bộ event store sẽ trở nên cực kỳ chậm. Giải pháp: snapshot định kỳ.

-- Snapshot Table: Lưu pre-computed state tại một điểm sequence
CREATE TABLE event_snapshots (
    stream_id            UUID PRIMARY KEY,
    last_sequence_number BIGINT NOT NULL,
    state                JSONB NOT NULL,  -- Pre-computed balance tại điểm này
    updated_at           TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

Pattern đọc balance với snapshot:

// 1. Load snapshot gần nhất
snapshot, err := loadSnapshot(db, accountID)

// 2. Load chỉ events SAU snapshot
events, err := loadEventsAfter(db, accountID, snapshot.LastSequenceNumber)

// 3. Apply events vào snapshot state
balance := snapshot.State.Balance
for _, event := range events {
    balance = applyEvent(balance, event)
}

// Thay vì replay 5 triệu events → chỉ cần replay N events sau snapshot

Quy tắc snapshot: Tạo snapshot mỗi 100-1000 events (tùy throughput). Background job có thể tự động tạo snapshots cho “hot accounts”.


Monzo’s Event Sourcing Architecture

Monzo Engineering đã publish chi tiết kiến trúc của họ:

  • Write Path: Go microservices ghi ledger postings vào PostgreSQL (primary source of truth)
  • Distribution: Kafka pub/sub phân phối events sang các read models
  • Read Models:
    • Cassandra: Primary read database, optimized for time-series lookups
    • Elasticsearch: Full-text search, transaction search
    • BigQuery: Analytics và reporting
  • Consistency: Offline reconciliation systems kiểm tra định kỳ

Monzo Transaction Flow (simplified):

Mobile App Request
       │
       ▼
Account Service (Go)
       │
  ┌────┴─────────────────────────────┐
  │  PostgreSQL Transaction          │
  │  1. INSERT into event_store      │
  │  2. INSERT into outbox_events    │
  └────┬─────────────────────────────┘
       │ commit
       ▼
Debezium CDC Connector
       │ reads WAL
       ▼
Apache Kafka
       │
  ┌────┼─────────────────────────────┐
  │    │                             │
  ▼    ▼                             ▼
Cassandra  Elasticsearch          BigQuery
(balance)  (search)               (analytics)

CQRS Latency: <1ms vs O(N) SUM()

CQRS (Command Query Responsibility Segregation) tách write path (commands) khỏi read path (queries):

On-the-fly Aggregation: O(N) Disaster

-- BAD: Tính balance bằng SUM() trực tiếp từ ledger
SELECT SUM(CASE WHEN direction = 'CREDIT' THEN amount ELSE -amount END) AS balance
FROM entries
WHERE account_id = 'acc-001';

-- Latency: 2ms với 1K entries → 50ms với 100K → 200ms với 1M entries

CQRS Pre-computed Read Model: <1ms

-- GOOD: Đọc pre-computed balance từ materialized view / Redis
SELECT balance, available_balance, last_updated_at
FROM account_balances  -- CQRS read model
WHERE account_id = 'acc-001';

-- Latency: <1ms (point lookup, indexed)
-- Redis: <0.5ms (in-memory)

CQRS Write/Read Flow:

WRITE SIDE (Command)                READ SIDE (Query)
────────────────────────            ──────────────────────────
POST /transfers            →        account_balances table
POST /accounts             →        Elasticsearch index
PUT /loans/repay           →        Redis balance cache

↓ Event Published ↓                ↑ Subscribe & Update ↑
         └──────────────────────────┘
               (Kafka event stream)

Transactional Outbox Pattern: Giải Quyết Dual-Write

Vấn Đề Dual-Write

❌ WRONG — Không atomic:
1. db.Update(account)     ← SUCCESS
2. kafka.Publish(event)   ← FAIL (network error)
→ Database updated nhưng downstream services không nhận được event
→ Balance sai, notification không gửi, read models stale

Outbox Pattern Solution

Ghi event vào cùng database transaction với business logic, dùng background worker để publish lên Kafka:

-- PostgreSQL Transactional Outbox Table
CREATE TABLE outbox_events (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type  VARCHAR(100) NOT NULL,  -- 'Account', 'Transfer', 'Loan'
    aggregate_id    VARCHAR(100) NOT NULL,  -- Entity ID
    event_type      VARCHAR(100) NOT NULL,  -- 'MoneyTransferred', 'AccountOpened'
    payload         JSONB NOT NULL,
    status          VARCHAR(20) NOT NULL DEFAULT 'PENDING',  -- PENDING, PUBLISHED
    created_at      TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    published_at    TIMESTAMP WITH TIME ZONE
);

CREATE INDEX idx_outbox_status_created ON outbox_events (status, created_at ASC);

Application code — trong cùng một DB transaction:

func (s *AccountService) Transfer(ctx context.Context, req TransferRequest) error {
    return s.db.WithTransaction(ctx, func(tx *sql.Tx) error {
        // 1. Business logic: Ghi ledger entries
        if err := insertLedgerEntries(tx, req); err != nil {
            return err
        }

        // 2. CÙNG transaction: Ghi outbox event
        outboxPayload, _ := json.Marshal(map[string]interface{}{
            "from_account": req.FromAccount,
            "to_account":   req.ToAccount,
            "amount":       req.Amount,
            "currency":     req.Currency,
        })
        _, err := tx.Exec(`
            INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
            VALUES ($1, $2, $3, $4)
        `, "Account", req.FromAccount, "MoneyTransferred", outboxPayload)
        return err
        // Nếu commit thành công: BOTH ledger AND outbox event được ghi
        // Nếu rollback: NEITHER được ghi → perfect atomicity
    })
}

Debezium CDC Connector đọc PostgreSQL WAL và forward events lên Kafka:

// Debezium connector config (connector.json)
{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres.internal",
    "database.port": "5432",
    "database.user": "debezium",
    "database.dbname": "core_banking",
    "table.include.list": "public.outbox_events",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.table.field.event.type": "event_type"
  }
}

Event Versioning: Xử Lý Schema Evolution

Event store là immutable — không thể thay đổi schema của events cũ. Giải pháp là versioning:

// Upcaster: Convert event v1 → v2 khi reading
type MoneyDepositedV1 struct {
    AccountID string  `json:"account_id"`
    Amount    float64 `json:"amount"`  // v1 dùng float (SAI)
}

type MoneyDepositedV2 struct {
    AccountID string `json:"account_id"`
    AmountCents int64  `json:"amount_cents"` // v2 dùng integer (ĐÚNG)
    Currency    string `json:"currency"`
}

func upcaster(eventType string, version int, data json.RawMessage) (interface{}, error) {
    switch {
    case eventType == "MoneyDeposited" && version == 1:
        var v1 MoneyDepositedV1
        json.Unmarshal(data, &v1)
        return MoneyDepositedV2{
            AccountID:   v1.AccountID,
            AmountCents: int64(v1.Amount * 100), // Convert
            Currency:    "VND",                   // Default
        }, nil
    // ...
    }
}

QA & SDET Testing Strategy

Test 1: Event Replay Consistency

// Scenario: Drop read model → replay từ event store → verify balance match
func TestEventReplayConsistency(t *testing.T) {
    ctx := context.Background()
    
    // 1. Lấy "live" balance từ read model TRƯỚC
    liveBalance := getReadModelBalance(ctx, "account-001")
    
    // 2. Drop và rebuild read model từ event store
    dropAccountBalancesTable(ctx)
    replayAllEventsFromEventStore(ctx)
    
    // 3. Lấy balance sau replay
    replayedBalance := getReadModelBalance(ctx, "account-001")
    
    // 4. Phải khớp chính xác
    assert.Equal(t, liveBalance, replayedBalance,
        "Replayed balance phải khớp chính xác với live balance")
}

Test 2: Outbox Atomicity Under Failure

// Inject failure GIỮA database commit và Kafka publish
func TestOutboxAtomicityUnderFailure(t *testing.T) {
    // Mock Kafka publisher để fail
    mockKafka := &FailingKafkaPublisher{}
    
    // Execute transfer
    err := transferService.Transfer(ctx, TransferRequest{
        From: "acc-A", To: "acc-B", Amount: 1000000,
    })
    
    // Transfer vẫn thành công (DB committed)
    assert.NoError(t, err)
    
    // Outbox event vẫn ở status PENDING (Kafka failed)
    pendingEvents := countOutboxPending()
    assert.Greater(t, pendingEvents, 0)
    
    // Sau khi Kafka recover, outbox worker retry và publish thành công
    fixKafka()
    waitForOutboxProcessing()
    
    // Balance của cả hai accounts phải chính xác
    assert.Equal(t, expectedBalanceA, getBalance("acc-A"))
    assert.Equal(t, expectedBalanceB, getBalance("acc-B"))
}

📚 Xem thêm: Saga Pattern — Saga Pattern để handle distributed failures

FAQ

Event Sourcing có làm query phức tạp hơn không?

Đúng — Event Sourcing tối ưu cho writes và audit, nhưng phức tạp hơn cho reads. Đó chính xác là lý do CQRS tồn tại. Write side lưu events; read side build materialized views tối ưu cho queries. Không nên dùng Event Sourcing đơn thuần mà không có CQRS read models.

Debezium có thể xử lý được PostgreSQL WAL volume lớn không?

Có, nhưng cần monitor LAG (Debezium lag sau WAL position). Với volume >10,000 TPS, nên dedicate một PostgreSQL replica chỉ cho Debezium để không ảnh hưởng primary.

Snapshot nên tạo mỗi bao nhiêu events?

Phụ thuộc vào average event size và acceptable replay time. Rule of thumb: snapshot mỗi 500 events với event size trung bình 1KB → snapshot file ~500KB. Replay từ snapshot (0 events) đến max (500 events) không bao giờ quá vài chục milliseconds.


Tiếp theo: Phần 4 — Saga Pattern — Choreography vs Orchestration Saga, failure transition matrices, và implementation với Temporal workflow engine.