Chuỗi bài (Phần 7 của 8): Bài cuối kỹ thuật trước phần QA. Chúng ta sẽ xây dựng real-time fraud detection pipeline với SLA <100ms scoring — nơi mà latency budget của bạn bị chia sẻ giữa CEP pattern matching, state lookup, và ML model inference.
Flink Fraud Detection Architecture Là Gì?
Hệ thống real-time fraud detection dựa vào các stream processing framework như Apache Flink và state backends như RocksDB để đáp ứng SLA xử lý từ 50-100ms. Bằng cách ứng dụng Complex Event Processing (CEP) và ML async I/O, các hệ thống này có thể giảm tới 80% false positives so với các legacy static rule engines, vốn có false positive rate lên tới 85-99%. Khi fraud detect triggered block một account, nên kết hợp với Saga Pattern để orchestrate compensation flow và FAPI 2.0 API Security để protect fraud scoring API endpoint.
Fraud Detection SLA Architecture
Latency Budget Breakdown
Tổng end-to-end authorization latency cho credit card transaction: 100-300ms
| Component | Allocated Budget | Typical Actual |
|---|---|---|
| Network (client → gateway) | 10-20ms | 5-15ms |
| API Gateway routing | 5-10ms | 2-5ms |
| Fraud scoring (Flink/ML) | 50-100ms | 30-80ms |
| Core Banking authorization | 10-20ms | 5-10ms |
| Network (gateway → client) | 10-20ms | 5-15ms |
| Total | 85-170ms | 47-125ms |
Nguồn: Redis Fraud Detection Brief, Feedzai AI Report.
Apache Flink: Complex Event Processing (CEP)
Apache Flink cung cấp CEP library để detect complex event sequences trong real-time streams.
CEP Pattern: 3 Failed Logins + High-Value Transaction
Detect khi một user có ≥3 failed logins trong 5 phút, ngay sau đó có giao dịch >1,000,000 VND:
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
// Event types
record Event(String userId, String type, double amount, long timestamp) {}
record AlertEvent(String userId, String reason, List<Event> matchedEvents) {}
// === CEP Pattern Definition ===
Pattern<Event, ?> fraudPattern = Pattern.<Event>begin("failed_login")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return "login_failed".equals(value.type());
}
})
.timesOrMore(3) // ≥3 failed logins
.consecutive() // Consecutive events (no other event type in between)
.followedByAny("high_value_transaction") // Followed by ANY transaction
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return "transaction".equals(value.type())
&& value.amount() > 1_000_000.0; // >1M VND
}
})
.within(Time.minutes(5)); // Entire sequence within 5 minutes
// === Apply Pattern to Stream ===
DataStream<Event> eventStream = ...; // Kafka source
PatternStream<Event> patternStream = CEP.pattern(
eventStream.keyBy(Event::userId), // Partition by user
fraudPattern
);
// === Process Matches ===
DataStream<AlertEvent> alerts = patternStream.process(
new PatternProcessFunction<Event, AlertEvent>() {
@Override
public void processMatch(
Map<String, List<Event>> match,
Context ctx,
Collector<AlertEvent> out
) throws Exception {
List<Event> failedLogins = match.get("failed_login");
List<Event> highValueTxs = match.get("high_value_transaction");
out.collect(new AlertEvent(
failedLogins.get(0).userId(),
"3+ failed logins followed by high-value transaction",
Stream.concat(failedLogins.stream(), highValueTxs.stream())
.collect(Collectors.toList())
));
}
}
);
Sliding Window: Velocity Checks
Kiểm tra số lượng transactions trong 1 giờ vừa qua per user:
// Velocity check: >10 transactions trong 1 giờ cho cùng user
DataStream<Alert> velocityAlerts = eventStream
.filter(e -> "transaction".equals(e.type()))
.keyBy(Event::userId)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new CountAggregator(), new VelocityAlertFunction())
.filter(count -> count > 10);
// Geographic velocity: transaction từ 2 locations >500km apart trong <1 giờ
Pattern<Event, ?> geoVelocityPattern = Pattern.<Event>begin("tx1")
.where(e -> "transaction".equals(e.type()))
.followedBy("tx2")
.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) {
Iterator<Event> tx1Events = ctx.getEventsForPattern("tx1").iterator();
if (tx1Events.hasNext()) {
Event tx1 = tx1Events.next();
double distance = haversineDistance(tx1.location(), value.location());
return distance > 500_000; // 500km in meters
}
return false;
}
})
.within(Time.hours(1));
Async ML Inference: Duy Trì 50ms SLA
Traditional synchronous ML inference calls block the processing thread và destroy throughput. Flink’s AsyncDataStream giải quyết:
// Async ML inference — không block main stream
DataStream<ScoredTransaction> scoredStream = AsyncDataStream.unorderedWait(
txStream,
new AsyncMLInferenceFunction(),
100, // Timeout: 100ms — SLA hard limit
TimeUnit.MILLISECONDS,
1000 // Max 1000 concurrent async requests
);
// ML Inference Function
class AsyncMLInferenceFunction
extends RichAsyncFunction<Event, ScoredTransaction> {
private OkHttpClient httpClient; // Non-blocking HTTP client
@Override
public void open(Configuration parameters) {
httpClient = new OkHttpClient.Builder()
.connectTimeout(20, TimeUnit.MILLISECONDS)
.readTimeout(80, TimeUnit.MILLISECONDS)
.build();
}
@Override
public void asyncInvoke(Event input, ResultFuture<ScoredTransaction> resultFuture) {
// Fire async HTTP request đến TensorFlow Serving
Request request = new Request.Builder()
.url("http://ml-serving:8501/v1/models/fraud_model:predict")
.post(buildFeatureVector(input))
.build();
httpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
// Timeout hoặc ML service down → default score (không block transaction)
resultFuture.complete(Collections.singletonList(
new ScoredTransaction(input, 0.5, "ml_timeout")
));
}
@Override
public void onResponse(Call call, Response response) throws IOException {
double score = parseFraudScore(response.body().string());
resultFuture.complete(Collections.singletonList(
new ScoredTransaction(input, score, "ml_success")
));
}
});
}
}
Scoring decision logic:
// Combine CEP rule alerts + ML score → final decision
DataStream<FraudDecision> finalDecision = scoredStream
.connect(alerts.broadcast(rulesDescriptor))
.process(new RuleAndMLCombiner());
class RuleAndMLCombiner extends BroadcastProcessFunction<ScoredTransaction, Alert, FraudDecision> {
@Override
public void processElement(ScoredTransaction tx, ReadOnlyContext ctx, Collector<FraudDecision> out) {
FraudDecision decision;
if (tx.mlScore() > 0.95) {
// High confidence fraud → auto-block
decision = FraudDecision.block(tx, "ml_high_confidence");
} else if (tx.mlScore() > 0.75) {
// Medium confidence → step-up authentication (OTP required)
decision = FraudDecision.stepUp(tx, "ml_medium_confidence");
} else {
// Low risk → approve
decision = FraudDecision.approve(tx);
}
out.collect(decision);
}
}
RocksDB State Backend: Production Configuration
Apache Flink Docs — RocksDB cho phép state vượt quá RAM (spill to SSD):
# flink-conf.yaml — Production RocksDB configuration
# === State Backend ===
state.backend: rocksdb
state.backend.rocksdb.localdir: /mnt/nvme-ssd/flink-state # NVMe local SSD
state.backend.incremental: true # Incremental checkpoints
# === Memory Tuning ===
# Block cache: 2GB — keep hot user profiles in memory
state.backend.rocksdb.block.cache-size: 2147483648 # 2 GB
# Block size: 64KB — optimize for sequential fraud profile reads
state.backend.rocksdb.block.size: 65536 # 64 KB
# Write buffers: 256MB MemTable — buffer writes before flush
state.backend.rocksdb.write-buffer-size: 268435456 # 256 MB
# Compaction threads: 8 — parallel background compaction
state.backend.rocksdb.thread.num: 8
# Level-based compaction — minimize read amplification
state.backend.rocksdb.compaction.style: LEVEL
# Direct I/O: bypass filesystem cache — RocksDB manages its own cache
state.backend.rocksdb.use-direct-io-for-flush-and-compaction: true
# === Checkpointing ===
execution.checkpointing.interval: 30000 # 30 seconds
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 120000 # 2 minutes timeout
# State TTL — auto-expire stale user profiles
# Set per-operator trong code
Write-Stall Prevention
Write-stalls xảy ra khi memory flush không theo kịp write rate → RocksDB pause all writes:
// Flink operator: Configure state TTL để prevent state size growth
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24)) // Expire user profiles sau 24h inactive
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<UserFraudProfile> descriptor =
new ValueStateDescriptor<>("user-fraud-profile", UserFraudProfile.class);
descriptor.enableTimeToLive(ttlConfig);
ValueState<UserFraudProfile> userProfileState = getRuntimeContext().getState(descriptor);
Checkpoint Size: Với RocksDB incremental checkpoints, chỉ SSTable changes được upload lên S3 thay vì toàn bộ state. Checkpoint latency: 100ms - 5s tùy state size và throughput.
Flink State: User Profile Schema
// User fraud profile stored in Flink RocksDB state
public class UserFraudProfile implements Serializable {
public String userId;
// Velocity counters
public int txLast1h; // Số transactions trong 1 giờ
public int txLast24h; // Số transactions trong 24 giờ
public double totalAmountLast1h;
// Failed auth tracking
public int failedLoginLast5m; // Failed logins trong 5 phút
public long lastFailedLoginTs;
// Geographic data
public String lastLocation; // "10.8231,106.6297" (lat,lng)
public long lastTxTimestamp;
// Device fingerprint
public String lastDeviceHash;
public int newDeviceCountLast7d;
// Risk scores history
public double avgRiskScore30d;
public boolean isHighRiskUser;
}
ML vs Rules Engine: False Positive Analysis
Nguồn: Feedzai AI Report
| Approach | False Positive Rate | False Negative Rate | Latency |
|---|---|---|---|
| Static Rules Engine | 85-99% | 15-30% | <5ms |
| Hybrid (Rules + ML) | 20-45% | 5-15% | 30-80ms |
| Pure ML | 10-25% | 3-8% | 50-100ms |
Business Impact của False Positives:
- Mỗi false positive = blocked legitimate transaction = revenue loss + customer friction
- Bank với 1M transactions/ngày, 1% false positive = 10,000 blocked good transactions/ngày
- ML giảm 80% false positives → giảm từ 10,000 xuống 2,000 blocked good transactions/ngày
Feature Engineering cho Fraud ML Model
# Feature extraction cho training data
def extract_features(transaction: Transaction, user_history: UserHistory) -> dict:
return {
# Amount features
"amount_usd": transaction.amount / 23000, # Normalize sang USD
"amount_zscore": (transaction.amount - user_history.avg_amount) / user_history.std_amount,
# Velocity features
"tx_count_1h": user_history.tx_count_1h,
"tx_count_24h": user_history.tx_count_24h,
"amount_sum_1h": user_history.amount_sum_1h,
# Behavioral features
"hour_of_day": transaction.timestamp.hour,
"is_weekend": transaction.timestamp.weekday() >= 5,
"days_since_account_open": user_history.account_age_days,
# Geographic features
"distance_from_last_tx_km": haversine_distance(
transaction.location, user_history.last_tx_location
),
"is_new_country": transaction.country != user_history.home_country,
# Device features
"is_new_device": transaction.device_hash not in user_history.known_devices,
"new_device_count_7d": user_history.new_device_count_7d,
# Auth features
"failed_auth_count_5m": user_history.failed_auth_5m,
}
QA & SDET Testing Strategy
Test 1: Flink Operator State Testing (TestHarness)
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
@Test
public void testFraudDetectorOperator() throws Exception {
// Setup test harness — no real Kafka/Flink cluster needed
KeyedOneInputStreamOperatorTestHarness<String, Event, AlertEvent> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
new FraudDetectorOperator(),
Event::userId,
Types.STRING
);
testHarness.open();
long baseTime = System.currentTimeMillis();
// Inject 3 failed logins
for (int i = 0; i < 3; i++) {
testHarness.processElement(
new Event("user-001", "login_failed", 0.0, baseTime + i * 1000),
baseTime + i * 1000
);
}
// Inject high-value transaction WITHIN 5 minutes
testHarness.processElement(
new Event("user-001", "transaction", 5_000_000.0, baseTime + 120_000),
baseTime + 120_000
);
// Advance watermark to trigger timer
testHarness.processWatermark(baseTime + 300_001);
// Verify alert was generated
List<StreamRecord<AlertEvent>> output = testHarness.extractOutputStreamRecords();
assertEquals(1, output.size(), "Phải có 1 fraud alert");
assertEquals("user-001", output.get(0).getValue().userId());
testHarness.close();
}
Test 2: MiniCluster Integration Test
@Test
public void testFraudPipelineEndToEnd() throws Exception {
// Embedded Flink cluster — no external dependencies
MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(4)
.setNumberTaskManagers(1)
.build()
);
flinkCluster.before();
// Run full pipeline with test data
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000); // 1 second checkpoints
List<Event> testEvents = generateFraudScenario("user-001");
DataStream<Event> source = env.fromCollection(testEvents);
// Connect full pipeline
DataStream<FraudDecision> results = buildFraudPipeline(source);
// Collect and verify
List<FraudDecision> collected = results.executeAndCollect();
assertTrue(collected.stream().anyMatch(d -> d.action() == BLOCK),
"Fraud scenario phải trigger BLOCK decision");
flinkCluster.after();
}
Test 3: Latency SLA Validation
func TestFraudScoringLatencySLA(t *testing.T) {
const (
targetP50 = 50 * time.Millisecond
targetP99 = 100 * time.Millisecond
sampleSize = 10000
)
var latencies []time.Duration
for i := 0; i < sampleSize; i++ {
start := time.Now()
_ = fraudScoringService.Score(generateTestTransaction())
latencies = append(latencies, time.Since(start))
}
sort.Slice(latencies, func(i, j int) bool {
return latencies[i] < latencies[j]
})
p50 := latencies[len(latencies)*50/100]
p99 := latencies[len(latencies)*99/100]
assert.LessOrEqual(t, p50, targetP50, "P50 phải < 50ms")
assert.LessOrEqual(t, p99, targetP99, "P99 phải < 100ms")
}
FAQ
RocksDB vs HashMapStateBackend — khi nào dùng gì?
- HashMapStateBackend: State trong JVM heap. Nhanh hơn (~2-5x), nhưng giới hạn bởi JVM heap size. Phù hợp khi state size <10GB.
- RocksDB: State trên local SSD. Chậm hơn nhưng scale đến terabytes. Phù hợp cho fraud profiles của hàng triệu users.
Exactly-Once semantics quan trọng không với fraud detection?
Có. Với Kafka → Flink → Kafka pipeline và EXACTLY_ONCE mode, mỗi fraud alert chỉ được emit đúng một lần dù có checkpoint failures và task restarts. Thiếu exactly-once có thể dẫn đến duplicate alerts → duplicate account freezes → customer complaints.
Làm thế nào tune Flink để đạt <100ms P99?
- Tăng
state.backend.rocksdb.block.cache-sizeđể hot data ở trong memory. - Dùng unordered async I/O (
unorderedWait) — không đợi ML response theo thứ tự. - Đặt Flink TaskManagers gần ML serving nodes về network.
- Enable
state.backend.rocksdb.use-direct-io-for-flush-and-compaction: true. - Giảm checkpoint interval xuống nếu không cần (mỗi checkpoint có I/O overhead).
Tiếp theo: Phần 8 — QA & SDET Handbook — Tổng hợp toàn bộ chiến lược kiểm thử cho hệ thống tài chính phân tán: split-brain, clock skew, double-submit, và chaos engineering.