Chuỗi bài (Phần 7 của 8): Bài cuối kỹ thuật trước phần QA. Chúng ta sẽ xây dựng real-time fraud detection pipeline với SLA <100ms scoring — nơi mà latency budget của bạn bị chia sẻ giữa CEP pattern matching, state lookup, và ML model inference.

Hệ thống real-time fraud detection dựa vào các stream processing framework như Apache Flink và state backends như RocksDB để đáp ứng SLA xử lý từ 50-100ms. Bằng cách ứng dụng Complex Event Processing (CEP) và ML async I/O, các hệ thống này có thể giảm tới 80% false positives so với các legacy static rule engines, vốn có false positive rate lên tới 85-99%. Khi fraud detect triggered block một account, nên kết hợp với Saga Pattern để orchestrate compensation flow và FAPI 2.0 API Security để protect fraud scoring API endpoint.


Fraud Detection SLA Architecture

Latency Budget Breakdown

Tổng end-to-end authorization latency cho credit card transaction: 100-300ms

ComponentAllocated BudgetTypical Actual
Network (client → gateway)10-20ms5-15ms
API Gateway routing5-10ms2-5ms
Fraud scoring (Flink/ML)50-100ms30-80ms
Core Banking authorization10-20ms5-10ms
Network (gateway → client)10-20ms5-15ms
Total85-170ms47-125ms

Nguồn: Redis Fraud Detection Brief, Feedzai AI Report.


Apache Flink cung cấp CEP library để detect complex event sequences trong real-time streams.

CEP Pattern: 3 Failed Logins + High-Value Transaction

Detect khi một user có ≥3 failed logins trong 5 phút, ngay sau đó có giao dịch >1,000,000 VND:

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;

// Event types
record Event(String userId, String type, double amount, long timestamp) {}
record AlertEvent(String userId, String reason, List<Event> matchedEvents) {}

// === CEP Pattern Definition ===
Pattern<Event, ?> fraudPattern = Pattern.<Event>begin("failed_login")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) {
            return "login_failed".equals(value.type());
        }
    })
    .timesOrMore(3)        // ≥3 failed logins
    .consecutive()         // Consecutive events (no other event type in between)
    .followedByAny("high_value_transaction")  // Followed by ANY transaction
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) {
            return "transaction".equals(value.type())
                && value.amount() > 1_000_000.0;  // >1M VND
        }
    })
    .within(Time.minutes(5));  // Entire sequence within 5 minutes

// === Apply Pattern to Stream ===
DataStream<Event> eventStream = ...; // Kafka source

PatternStream<Event> patternStream = CEP.pattern(
    eventStream.keyBy(Event::userId),  // Partition by user
    fraudPattern
);

// === Process Matches ===
DataStream<AlertEvent> alerts = patternStream.process(
    new PatternProcessFunction<Event, AlertEvent>() {
        @Override
        public void processMatch(
            Map<String, List<Event>> match,
            Context ctx,
            Collector<AlertEvent> out
        ) throws Exception {
            List<Event> failedLogins = match.get("failed_login");
            List<Event> highValueTxs = match.get("high_value_transaction");
            
            out.collect(new AlertEvent(
                failedLogins.get(0).userId(),
                "3+ failed logins followed by high-value transaction",
                Stream.concat(failedLogins.stream(), highValueTxs.stream())
                      .collect(Collectors.toList())
            ));
        }
    }
);

Sliding Window: Velocity Checks

Kiểm tra số lượng transactions trong 1 giờ vừa qua per user:

// Velocity check: >10 transactions trong 1 giờ cho cùng user
DataStream<Alert> velocityAlerts = eventStream
    .filter(e -> "transaction".equals(e.type()))
    .keyBy(Event::userId)
    .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
    .aggregate(new CountAggregator(), new VelocityAlertFunction())
    .filter(count -> count > 10);

// Geographic velocity: transaction từ 2 locations >500km apart trong <1 giờ
Pattern<Event, ?> geoVelocityPattern = Pattern.<Event>begin("tx1")
    .where(e -> "transaction".equals(e.type()))
    .followedBy("tx2")
    .where(new IterativeCondition<Event>() {
        @Override
        public boolean filter(Event value, Context<Event> ctx) {
            Iterator<Event> tx1Events = ctx.getEventsForPattern("tx1").iterator();
            if (tx1Events.hasNext()) {
                Event tx1 = tx1Events.next();
                double distance = haversineDistance(tx1.location(), value.location());
                return distance > 500_000; // 500km in meters
            }
            return false;
        }
    })
    .within(Time.hours(1));

Async ML Inference: Duy Trì 50ms SLA

Traditional synchronous ML inference calls block the processing thread và destroy throughput. Flink’s AsyncDataStream giải quyết:

// Async ML inference — không block main stream
DataStream<ScoredTransaction> scoredStream = AsyncDataStream.unorderedWait(
    txStream,
    new AsyncMLInferenceFunction(),
    100,              // Timeout: 100ms — SLA hard limit
    TimeUnit.MILLISECONDS,
    1000              // Max 1000 concurrent async requests
);

// ML Inference Function
class AsyncMLInferenceFunction
    extends RichAsyncFunction<Event, ScoredTransaction> {
    
    private OkHttpClient httpClient;  // Non-blocking HTTP client
    
    @Override
    public void open(Configuration parameters) {
        httpClient = new OkHttpClient.Builder()
            .connectTimeout(20, TimeUnit.MILLISECONDS)
            .readTimeout(80, TimeUnit.MILLISECONDS)
            .build();
    }
    
    @Override
    public void asyncInvoke(Event input, ResultFuture<ScoredTransaction> resultFuture) {
        // Fire async HTTP request đến TensorFlow Serving
        Request request = new Request.Builder()
            .url("http://ml-serving:8501/v1/models/fraud_model:predict")
            .post(buildFeatureVector(input))
            .build();
        
        httpClient.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                // Timeout hoặc ML service down → default score (không block transaction)
                resultFuture.complete(Collections.singletonList(
                    new ScoredTransaction(input, 0.5, "ml_timeout")
                ));
            }
            
            @Override
            public void onResponse(Call call, Response response) throws IOException {
                double score = parseFraudScore(response.body().string());
                resultFuture.complete(Collections.singletonList(
                    new ScoredTransaction(input, score, "ml_success")
                ));
            }
        });
    }
}

Scoring decision logic:

// Combine CEP rule alerts + ML score → final decision
DataStream<FraudDecision> finalDecision = scoredStream
    .connect(alerts.broadcast(rulesDescriptor))
    .process(new RuleAndMLCombiner());

class RuleAndMLCombiner extends BroadcastProcessFunction<ScoredTransaction, Alert, FraudDecision> {
    @Override
    public void processElement(ScoredTransaction tx, ReadOnlyContext ctx, Collector<FraudDecision> out) {
        FraudDecision decision;
        
        if (tx.mlScore() > 0.95) {
            // High confidence fraud → auto-block
            decision = FraudDecision.block(tx, "ml_high_confidence");
        } else if (tx.mlScore() > 0.75) {
            // Medium confidence → step-up authentication (OTP required)
            decision = FraudDecision.stepUp(tx, "ml_medium_confidence");
        } else {
            // Low risk → approve
            decision = FraudDecision.approve(tx);
        }
        
        out.collect(decision);
    }
}

RocksDB State Backend: Production Configuration

Apache Flink Docs — RocksDB cho phép state vượt quá RAM (spill to SSD):

# flink-conf.yaml — Production RocksDB configuration

# === State Backend ===
state.backend: rocksdb
state.backend.rocksdb.localdir: /mnt/nvme-ssd/flink-state  # NVMe local SSD
state.backend.incremental: true                              # Incremental checkpoints

# === Memory Tuning ===
# Block cache: 2GB — keep hot user profiles in memory
state.backend.rocksdb.block.cache-size: 2147483648  # 2 GB

# Block size: 64KB — optimize for sequential fraud profile reads
state.backend.rocksdb.block.size: 65536  # 64 KB

# Write buffers: 256MB MemTable — buffer writes before flush
state.backend.rocksdb.write-buffer-size: 268435456  # 256 MB

# Compaction threads: 8 — parallel background compaction
state.backend.rocksdb.thread.num: 8

# Level-based compaction — minimize read amplification
state.backend.rocksdb.compaction.style: LEVEL

# Direct I/O: bypass filesystem cache — RocksDB manages its own cache
state.backend.rocksdb.use-direct-io-for-flush-and-compaction: true

# === Checkpointing ===
execution.checkpointing.interval: 30000  # 30 seconds
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 120000  # 2 minutes timeout

# State TTL — auto-expire stale user profiles
# Set per-operator trong code

Write-Stall Prevention

Write-stalls xảy ra khi memory flush không theo kịp write rate → RocksDB pause all writes:

// Flink operator: Configure state TTL để prevent state size growth
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(24))          // Expire user profiles sau 24h inactive
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

ValueStateDescriptor<UserFraudProfile> descriptor =
    new ValueStateDescriptor<>("user-fraud-profile", UserFraudProfile.class);
descriptor.enableTimeToLive(ttlConfig);

ValueState<UserFraudProfile> userProfileState = getRuntimeContext().getState(descriptor);

Checkpoint Size: Với RocksDB incremental checkpoints, chỉ SSTable changes được upload lên S3 thay vì toàn bộ state. Checkpoint latency: 100ms - 5s tùy state size và throughput.


// User fraud profile stored in Flink RocksDB state
public class UserFraudProfile implements Serializable {
    public String userId;
    
    // Velocity counters
    public int txLast1h;          // Số transactions trong 1 giờ
    public int txLast24h;         // Số transactions trong 24 giờ
    public double totalAmountLast1h;
    
    // Failed auth tracking
    public int failedLoginLast5m; // Failed logins trong 5 phút
    public long lastFailedLoginTs;
    
    // Geographic data
    public String lastLocation;   // "10.8231,106.6297" (lat,lng)
    public long lastTxTimestamp;
    
    // Device fingerprint
    public String lastDeviceHash;
    public int newDeviceCountLast7d;
    
    // Risk scores history
    public double avgRiskScore30d;
    public boolean isHighRiskUser;
}

ML vs Rules Engine: False Positive Analysis

Nguồn: Feedzai AI Report

ApproachFalse Positive RateFalse Negative RateLatency
Static Rules Engine85-99%15-30%<5ms
Hybrid (Rules + ML)20-45%5-15%30-80ms
Pure ML10-25%3-8%50-100ms

Business Impact của False Positives:

  • Mỗi false positive = blocked legitimate transaction = revenue loss + customer friction
  • Bank với 1M transactions/ngày, 1% false positive = 10,000 blocked good transactions/ngày
  • ML giảm 80% false positives → giảm từ 10,000 xuống 2,000 blocked good transactions/ngày

Feature Engineering cho Fraud ML Model

# Feature extraction cho training data
def extract_features(transaction: Transaction, user_history: UserHistory) -> dict:
    return {
        # Amount features
        "amount_usd": transaction.amount / 23000,  # Normalize sang USD
        "amount_zscore": (transaction.amount - user_history.avg_amount) / user_history.std_amount,
        
        # Velocity features
        "tx_count_1h": user_history.tx_count_1h,
        "tx_count_24h": user_history.tx_count_24h,
        "amount_sum_1h": user_history.amount_sum_1h,
        
        # Behavioral features
        "hour_of_day": transaction.timestamp.hour,
        "is_weekend": transaction.timestamp.weekday() >= 5,
        "days_since_account_open": user_history.account_age_days,
        
        # Geographic features
        "distance_from_last_tx_km": haversine_distance(
            transaction.location, user_history.last_tx_location
        ),
        "is_new_country": transaction.country != user_history.home_country,
        
        # Device features
        "is_new_device": transaction.device_hash not in user_history.known_devices,
        "new_device_count_7d": user_history.new_device_count_7d,
        
        # Auth features
        "failed_auth_count_5m": user_history.failed_auth_5m,
    }

QA & SDET Testing Strategy

import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;

@Test
public void testFraudDetectorOperator() throws Exception {
    // Setup test harness — no real Kafka/Flink cluster needed
    KeyedOneInputStreamOperatorTestHarness<String, Event, AlertEvent> testHarness =
        new KeyedOneInputStreamOperatorTestHarness<>(
            new FraudDetectorOperator(),
            Event::userId,
            Types.STRING
        );
    
    testHarness.open();
    
    long baseTime = System.currentTimeMillis();
    
    // Inject 3 failed logins
    for (int i = 0; i < 3; i++) {
        testHarness.processElement(
            new Event("user-001", "login_failed", 0.0, baseTime + i * 1000),
            baseTime + i * 1000
        );
    }
    
    // Inject high-value transaction WITHIN 5 minutes
    testHarness.processElement(
        new Event("user-001", "transaction", 5_000_000.0, baseTime + 120_000),
        baseTime + 120_000
    );
    
    // Advance watermark to trigger timer
    testHarness.processWatermark(baseTime + 300_001);
    
    // Verify alert was generated
    List<StreamRecord<AlertEvent>> output = testHarness.extractOutputStreamRecords();
    assertEquals(1, output.size(), "Phải có 1 fraud alert");
    assertEquals("user-001", output.get(0).getValue().userId());
    
    testHarness.close();
}

Test 2: MiniCluster Integration Test

@Test
public void testFraudPipelineEndToEnd() throws Exception {
    // Embedded Flink cluster — no external dependencies
    MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberSlotsPerTaskManager(4)
            .setNumberTaskManagers(1)
            .build()
    );
    flinkCluster.before();
    
    // Run full pipeline with test data
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(1000); // 1 second checkpoints
    
    List<Event> testEvents = generateFraudScenario("user-001");
    DataStream<Event> source = env.fromCollection(testEvents);
    
    // Connect full pipeline
    DataStream<FraudDecision> results = buildFraudPipeline(source);
    
    // Collect and verify
    List<FraudDecision> collected = results.executeAndCollect();
    
    assertTrue(collected.stream().anyMatch(d -> d.action() == BLOCK),
        "Fraud scenario phải trigger BLOCK decision");
    
    flinkCluster.after();
}

Test 3: Latency SLA Validation

func TestFraudScoringLatencySLA(t *testing.T) {
    const (
        targetP50  = 50 * time.Millisecond
        targetP99  = 100 * time.Millisecond
        sampleSize = 10000
    )
    
    var latencies []time.Duration
    
    for i := 0; i < sampleSize; i++ {
        start := time.Now()
        _ = fraudScoringService.Score(generateTestTransaction())
        latencies = append(latencies, time.Since(start))
    }
    
    sort.Slice(latencies, func(i, j int) bool {
        return latencies[i] < latencies[j]
    })
    
    p50 := latencies[len(latencies)*50/100]
    p99 := latencies[len(latencies)*99/100]
    
    assert.LessOrEqual(t, p50, targetP50, "P50 phải < 50ms")
    assert.LessOrEqual(t, p99, targetP99, "P99 phải < 100ms")
}

FAQ

RocksDB vs HashMapStateBackend — khi nào dùng gì?

  • HashMapStateBackend: State trong JVM heap. Nhanh hơn (~2-5x), nhưng giới hạn bởi JVM heap size. Phù hợp khi state size <10GB.
  • RocksDB: State trên local SSD. Chậm hơn nhưng scale đến terabytes. Phù hợp cho fraud profiles của hàng triệu users.

Exactly-Once semantics quan trọng không với fraud detection?

Có. Với Kafka → Flink → Kafka pipeline và EXACTLY_ONCE mode, mỗi fraud alert chỉ được emit đúng một lần dù có checkpoint failures và task restarts. Thiếu exactly-once có thể dẫn đến duplicate alerts → duplicate account freezes → customer complaints.

  1. Tăng state.backend.rocksdb.block.cache-size để hot data ở trong memory.
  2. Dùng unordered async I/O (unorderedWait) — không đợi ML response theo thứ tự.
  3. Đặt Flink TaskManagers gần ML serving nodes về network.
  4. Enable state.backend.rocksdb.use-direct-io-for-flush-and-compaction: true.
  5. Giảm checkpoint interval xuống nếu không cần (mỗi checkpoint có I/O overhead).

Tiếp theo: Phần 8 — QA & SDET Handbook — Tổng hợp toàn bộ chiến lược kiểm thử cho hệ thống tài chính phân tán: split-brain, clock skew, double-submit, và chaos engineering.