Trong Phần 1: The Paradigm Shift - Kiến Trúc Agentic & Sức Mạnh Điều Phối Của Golang, chúng ta đã thiết lập bộ não điều phối (Orchestration Engine) bằng Golang và Eino. Tuy nhiên, một bộ não thông minh đến đâu cũng sẽ trở nên vô dụng nếu nó được tiếp nạp thông tin sai lệch, thiếu cấu trúc hoặc bị cắt vụn.
Trong bài toán e-commerce, dữ liệu catalog sản phẩm thay đổi liên tục từng giây: giá cả biến động, tồn kho cập nhật, sản phẩm mới được thêm vào. Đồng thời, việc chia nhỏ (chunking) dữ liệu sản phẩm để đưa vào Vector Database (Qdrant) hoàn toàn khác biệt so với việc chia nhỏ một tài liệu PDF hay một bài báo.
Nếu bạn áp dụng phương pháp chia nhỏ theo số lượng ký tự (Character-based Split) truyền thống, hệ thống tìm kiếm của bạn chắc chắn sẽ gặp hiện tượng LLM Hallucination (LLM tự vẽ ra thông tin) do các thông số SKU, thuộc tính kỹ thuật hoặc giá bán bị cắt làm đôi.
Phần này sẽ hướng dẫn bạn cách thiết lập một pipeline đồng bộ dữ liệu thời gian thực (Real-time Ingestion Pipeline) sử dụng Kafka CDC và thiết kế chiến lược Atomic Chunking chuẩn e-commerce bằng Golang.
1. Pipeline CDC (Change Data Capture) Thực Chiến
Để đồng bộ dữ liệu từ database chính (như PostgreSQL) sang Vector Database (Qdrant) mà không làm ảnh hưởng đến hiệu năng của database phục vụ giao dịch trực tuyến (OLTP), chúng ta sử dụng kiến trúc Change Data Capture (CDC) qua Kafka.
Debezium Outbox Pattern
Để đảm bảo tính nhất quán (Atomic Dual-Writes), ta áp dụng Outbox Pattern. Thay vì ghi đồng thời vào Product Table và bắn event sang Kafka (dễ lỗi không đồng nhất khi một bên fail), microservice quản lý catalog sẽ ghi thông tin sản phẩm và một event tương ứng vào bảng Outbox trong cùng một transaction của PostgreSQL.
Debezium sẽ đọc file WAL (Write-Ahead Log) của PostgreSQL, bắt các thay đổi của bảng Outbox này và đẩy thành event dạng JSON/Avro vào Kafka topic product-updates.
graph TD
DB[PostgreSQL <br/> Product & Outbox Table] -- WAL log --> DEB[Debezium Connector]
DEB --> KAFKA[Kafka Topic: product-updates]
KAFKA --> GO[Golang Consumer Sarama]
GO -- Batching & Embedding --> QDRANT[(Qdrant Vector DB)]
Triển khai Go Kafka Consumer bằng sarama
Trong Go, chúng ta cần viết một Consumer hiệu năng cao. Để tránh việc gọi API Embedding (như OpenAI hay Gemini) cho từng sản phẩm đơn lẻ (gây nghẽn cổ chai mạng và vượt giới hạn Rate Limit), ta phải gom các event từ Kafka thành các batch (từ 256 đến 1000 items) trước khi xử lý.
Dưới đây là cách triển khai Go Kafka consumer sử dụng github.com/IBM/sarama bọc trong cơ chế Buffer Channel để gom Batch:
package ingestion
import (
"context"
"encoding/json"
"log"
"time"
"github.com/IBM/sarama"
)
type ProductEvent struct {
ProductID string `json:"product_id"`
Op string `json:"op"` // CREATE, UPDATE, DELETE
Payload map[string]interface{} `json:"payload"`
}
type IngestionWorker struct {
batchSize int
flushInterval time.Duration
eventChan chan ProductEvent
qdrantClient *QdrantIngestClient // Client tùy biến ghi vào Qdrant
}
func NewIngestionWorker(batchSize int, interval time.Duration, client *QdrantIngestClient) *IngestionWorker {
return &IngestionWorker{
batchSize: batchSize,
flushInterval: interval,
eventChan: make(chan ProductEvent, batchSize*2),
qdrantClient: client,
}
}
// StartLoop gom batch theo số lượng hoặc thời gian định kỳ
func (w *IngestionWorker) StartLoop(ctx context.Context) {
var batch []ProductEvent
ticker := time.NewTicker(w.flushInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
w.flushBatch(context.Background(), batch)
}
return
case event := <-w.eventChan:
batch = append(batch, event)
if len(batch) >= w.batchSize {
w.flushBatch(ctx, batch)
batch = make([]ProductEvent, 0, w.batchSize)
ticker.Reset(w.flushInterval) // Reset timer
}
case <-ticker.C:
if len(batch) > 0 {
w.flushBatch(ctx, batch)
batch = make([]ProductEvent, 0, w.batchSize)
}
}
}
}
func (w *IngestionWorker) flushBatch(ctx context.Context, batch []ProductEvent) {
log.Printf("Flushing batch of %d events to Qdrant...", len(batch))
err := w.qdrantClient.UpsertBatch(ctx, batch)
if err != nil {
log.Printf("Failed to upsert batch: %v", err)
// Thực hiện logic Retry hoặc đẩy vào Dead Letter Queue (DLQ) ở đây
}
}
// Triển khai ConsumerGroupHandler của Sarama
type ConsumerHandler struct {
worker *IngestionWorker
}
func (h *ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (h *ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (h *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message, ok := <-claim.Messages():
if !ok {
return nil
}
var event ProductEvent
if err := json.Unmarshal(message.Value, &event); err != nil {
log.Printf("Error unmarshaling kafka message: %v", err)
continue
}
h.worker.eventChan <- event
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}
2. Chiến Lược "Atomic Chunking" Cho Sản Phẩm E-commerce
Khi áp dụng RAG (Retrieval-Augmented Generation) cho văn bản, ta thường dùng RecursiveCharacterTextSplitter cắt tài liệu thành từng khối 500-1000 tokens kèm overlap. Với sản phẩm e-commerce, đây là một sai lầm chết người.
Hãy tưởng tượng sản phẩm sau:
- Tên: Laptop ASUS ROG Zephyrus G14
- SKU: ROG-G14-2026
- Thông số: CPU Ryzen 9, RAM 32GB, GPU RTX 5060, Price 45.000.000 VND.
Nếu cắt nửa chừng, Chunk 1 chứa "ASUS ROG Zephyrus G14 SKU ROG-G14", Chunk 2 chứa "-2026 RAM 32GB GPU RTX 5060 Price 45.000.000".
- Khi tìm kiếm theo mã
ROG-G14-2026, Vector search sẽ lệch do mã bị bẻ đôi. - Khi LLM đọc Chunk 2, nó không biết RAM 32GB hay giá 45 triệu này là của sản phẩm nào!
Giải pháp: Atomic Chunking (Phân mảnh nguyên tử)
Nguyên tắc vàng của chunking e-commerce là giữ nguyên tính toàn vẹn (context) của thực thể sản phẩm. Chúng ta chia nhỏ sản phẩm theo các trường logic cấu trúc (Logical Fields) thay vì số lượng ký tự:
- Product Document (Atomic Entity): Mỗi SKU sản phẩm là một thực thể độc lập.
- Structural Assembly: Gom toàn bộ dữ liệu cấu trúc thành một văn bản thống nhất có chứa Tên, Danh mục, Thương hiệu, Thông số kỹ thuật, và các câu hỏi thường gặp hoặc review tiêu biểu.
- SKU và Ràng buộc cứng: Không nhét SKU trực tiếp vào dense vector để embedding (vì vector model không học được các ký tự mã hóa ngẫu nhiên của SKU). Thay vào đó, đưa SKU và các trường lọc (Giá, Tồn kho, ID Cửa hàng) vào phần Payload Metadata để thực hiện Hybrid Search (kết hợp lọc cứng hoặc tìm kiếm sparse BM25 mà chúng ta sẽ làm ở Phần 3).
type ProductChunk struct {
ProductID string `json:"product_id"`
SKU string `json:"sku"`
StoreFields struct {
StoreID int32 `json:"store_id"`
CategoryID int32 `json:"category_id"`
Price float64 `json:"price"`
InStock bool `json:"in_stock"`
}
// Văn bản thô được định dạng chuẩn hóa để sinh vector embedding
EmbeddingText string
}
func BuildEmbeddingText(product map[string]interface{}) string {
// Sử dụng Builder để tránh cấp phát bộ nhớ liên tục
var sb strings.Builder
sb.WriteString("Tên sản phẩm: " + fmt.Sprintf("%v", product["title"]) + "\n")
sb.WriteString("Thương hiệu: " + fmt.Sprintf("%v", product["brand"]) + "\n")
sb.WriteString("Mô tả ngắn: " + fmt.Sprintf("%v", product["short_description"]) + "\n")
if specs, ok := product["specifications"].(map[string]interface{}); ok {
sb.WriteString("Thông số kỹ thuật:\n")
for k, v := range specs {
sb.WriteString("- " + k + ": " + fmt.Sprintf("%v", v) + "\n")
}
}
return sb.String()
}
3. Thiết Kế Multitenancy & Upsert Batch Vào Qdrant
Với các hệ thống thương mại điện tử lớn hoặc mô hình SaaS phục vụ nhiều nhãn hàng (multi-tenant), việc tạo mỗi cửa hàng/nhãn hàng một Collection riêng trong Qdrant sẽ tàn phá RAM hệ thống. Nguyên nhân là vì mỗi Collection sẽ khởi tạo cấu trúc đồ thị HNSW riêng biệt, ngốn cực kỳ nhiều bộ nhớ đệm.
Payload-Based Partitioning (Phân mảnh trên thuộc tính)
Giải pháp tối ưu là sử dụng một Collection duy nhất cho toàn bộ hệ thống, và thực hiện phân chia dữ liệu khách hàng bằng trường store_id (hoặc tenant_id) nằm trong phần Payload. Khi truy vấn, ta luôn bắt buộc truyền filter bộ lọc store_id để Qdrant thu hẹp không gian tìm kiếm.
Go Code: Upsert Batch và tắt HNSW tạm thời
Khi đưa lượng lớn dữ liệu ban đầu vào Qdrant (Bulk Load), việc cập nhật đồ thị HNSW liên tục sau mỗi điểm dữ liệu mới sẽ làm tốc độ ingest chậm đi hàng chục lần.
Quy tắc tối ưu: Tắt chỉ mục HNSW (set m: 0 ở collection config) trước khi thực hiện bulk insert, sau đó bật lại khi tải xong để Qdrant build đồ thị một lần duy nhất.
Dưới đây là đoạn code Golang thực thi nạp dữ liệu tối ưu vào Qdrant sử dụng Batch API gRPC:
package ingestion
import (
"context"
"fmt"
"github.com/qdrant/go-client/qdrant"
)
type QdrantIngestClient struct {
client *qdrant.Client
collectionName string
}
func (q *QdrantIngestClient) UpsertBatch(ctx context.Context, events []ProductEvent) error {
points := make([]*qdrant.PointStruct, 0, len(events))
for _, event := range events {
product := event.Payload
// 1. Tạo text để embed và gọi API lấy vector (ví dụ giả lập vector 384 chiều)
embeddingText := BuildEmbeddingText(product)
vector, err := GetVectorEmbedding(ctx, embeddingText)
if err != nil {
return fmt.Errorf("embedding error: %w", err)
}
// 2. Chuyển đổi ID sản phẩm sang định dạng UUID/Uint64 của Qdrant
pointID := qdrant.NewIDUUID(fmt.Sprintf("%v", event.ProductID))
// 3. Khởi tạo payload chứa các ràng buộc cứng cho Hybrid Search
payload := qdrant.NewValueMap(map[string]interface{}{
"store_id": int32(product["store_id"].(float64)),
"category_id": int32(product["category_id"].(float64)),
"sku": fmt.Sprintf("%v", product["sku"]),
"price": product["price"].(float64),
"in_stock": product["in_stock"].(bool),
"text_source": embeddingText,
})
points = append(points, &qdrant.PointStruct{
Id: pointID,
Vectors: qdrant.NewVectors(vector),
Payload: payload,
})
}
// 4. Gọi Batch API của Qdrant
_, err := q.client.Upsert(ctx, &qdrant.UpsertPoints{
CollectionName: q.collectionName,
Points: points,
Wait: qdrant.PtrBool(false), // Không block luồng để tăng throughput
})
return err
}
Tóm tắt & Bài học rút ra từ Phần 2
- Dùng CDC thay vì API đồng bộ: Sử dụng Debezium và Kafka đảm bảo dữ liệu catalog từ PostgreSQL sang Qdrant luôn đồng nhất và không bị mất mát (Outbox Pattern).
- Tuyệt đối không cắt chuỗi ký tự trên sản phẩm: Sử dụng chiến lược Atomic Chunking, giữ nguyên cấu trúc thuộc tính để tránh việc LLM đọc hiểu sai ngữ cảnh sản phẩm.
- Tách biệt Vector và Dữ liệu lọc: Các thuộc tính cứng (SKU, giá, tồn kho) đưa vào Payload Metadata. Tuyệt đối không nhét trực tiếp các mã hóa ngẫu nhiên như SKU vào Dense Vector Embedding.
- Tối ưu Ingestion: Gom batch từ 256-1000 items, tắt HNSW build tạm thời khi bulk load và sử dụng cấu hình Partitioning Payload dựa trên
store_idđể tiết kiệm RAM.
Bây giờ, chúng ta đã đưa dữ liệu catalog sản phẩm vào Qdrant một cách sạch sẽ, đúng cấu trúc. Nhưng làm sao để tìm kiếm nó hiệu quả khi người dùng nhập: "iphone 15 promax màu titan tự nhiên giá dưới 25 triệu"?
Nếu chỉ dùng Vector Search (Dense), hệ thống sẽ không hiểu được bộ lọc giá < 25.000.000 và từ khóa chính xác iphone 15 promax.
Trong Phần 3: Làm Chủ Qdrant Hybrid Search (Dense + BM25), chúng ta sẽ giải bài toán nhức nhối này bằng cách thiết lập Hybrid Search và cơ chế Filterable HNSW trong Qdrant bằng Golang.