Trong Phần 5: The Self-Reflection Critique Loop - Kỹ Thuật Ngăn Chặn Hallucination, chúng ta đã xây dựng thành công bộ kiểm duyệt câu trả lời tự động để đảm bảo độ chính xác logic. Tuy nhiên, khi đưa hệ thống Agentic Search này lên môi trường production quy mô lớn phục vụ hàng triệu người dùng, bạn sẽ lập tức đối mặt với những thách thức vận hành thực tế:

  1. Chi phí vận hành (Unit Economics): Mỗi lượt người dùng tìm kiếm phải đi qua nhiều lượt gọi LLM (từ sinh câu trả lời, gọi tool, đến tự phê bình) sẽ làm tăng vọt hóa đơn API.
  2. Độ trễ phản hồi (Latency): Khách hàng không thể kiên nhẫn chờ đợi 5-10 giây để nhận toàn bộ câu trả lời hoàn chỉnh.
  3. Giám sát hệ thống (Observability): Làm thế nào để trace được một yêu cầu đi qua những node nào, tiêu tốn bao nhiêu token và gặp lỗi ở đâu?

Bài viết cuối cùng trong series này sẽ hướng dẫn bạn giải quyết triệt để các bài toán trên bằng cách tích hợp Semantic Caching (Redis), Deterministic Model Routing, Server-Sent Events (SSE) Streaming, và OpenTelemetry Tracing vào framework Eino (CloudWeGo).


1. Semantic Caching Với Redis (Bộ Nhớ Đệm Ngũ Nghĩa)

Khái Niệm & Sự Khác Biệt

Khác với bộ nhớ đệm truyền thống (Key-Value Cache chỉ khớp chính xác từng ký tự), Semantic Caching lưu trữ cặp câu hỏi - câu trả lời dưới dạng vector embeddings. Khi người dùng gửi câu hỏi mới:

  1. Hệ thống sinh vector embedding cho câu hỏi đó.
  2. Thực hiện tìm kiếm láng giềng gần nhất (KNN Vector Search) trên Redis để tìm các câu hỏi tương tự đã có trong bộ đệm.
  3. Nếu khoảng cách cosine (Cosine Distance) nhỏ hơn một ngưỡng quy định (ví dụ: Cosine Distance < 0.15 hay Similarity > 0.85), hệ thống sẽ trả về câu trả lời đã lưu trong cache ngay lập tức, bỏ qua hoàn toàn việc gọi LLM.

Cấu Hình Kết Nối go-redis/v9

Để tương thích với tìm kiếm vector (FT.SEARCH) trên Redis Stack, client go-redis/v9 cần được cấu hình sử dụng giao thức Protocol 2 và kích hoạt cờ UnstableResp3.

Dưới đây là mã nguồn khởi tạo bộ nạp dữ liệu (Retriever) của Eino tích hợp với Redis:

package cache

import (
	"context"
	"fmt"

	"github.com/redis/go-redis/v9"
	"github.com/cloudwego/eino-ext/components/retriever/redis"
)

// InitRedisRetriever thiết lập kết nối và khởi tạo Eino Redis Retriever
func InitRedisRetriever(ctx context.Context) (*redis.Retriever, error) {
	// 1. Khởi tạo go-redis client dùng Protocol 2 để tương thích FT.SEARCH
	client := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // Điền mật khẩu nếu có
		DB:       0,
		Protocol: 2, 
	})

	// Kích hoạt UnstableResp3 để hỗ trợ phân tích định dạng phản hồi phức tạp từ RediSearch
	client.Options().UnstableResp3 = true

	// Kiểm tra kết nối tới Redis Server
	if err := client.Ping(ctx).Err(); err != nil {
		return nil, fmt.Errorf("không thể kết nối Redis: %w", err)
	}

	// 2. Khởi tạo Retriever cấu hình tìm kiếm vector của eino-ext
	retriever, err := redis.NewRetriever(ctx, &redis.RetrieverConfig{
		Client:       client,
		Index:        "semantic_cache_idx", // Tên chỉ mục vector trên Redis
		VectorField:  "query_vector",       // Trường lưu trữ embedding
		EmbeddingKey: "query_text",         // Trường lưu trữ câu hỏi thô
		TopK:         1,                    // Chỉ lấy kết quả tương tự nhất
	})
	if err != nil {
		return nil, fmt.Errorf("khởi tạo Eino Redis Retriever thất bại: %w", err)
	}

	return retriever, nil
}

2. Deterministic LLM Model Routing (Phân Luồng Mô Hình)

Không phải câu hỏi nào của người dùng cũng cần đến các mô hình ngôn ngữ lớn đắt đỏ và chậm chạp.

  • Câu hỏi đơn giản: “Xin chào”, “Shop ở đâu vậy?” -> Định tuyến đến mô hình giá rẻ, tốc độ cao (ví dụ: gpt-4o-mini).
  • Câu hỏi phức tạp: “So sánh Asus ROG với MSI Cyborg và lọc các máy còn hàng tại Quận 1” -> Định tuyến đến mô hình nâng cao (ví dụ: Gemini 1.5 Pro hoặc gpt-4o).

Chúng ta sử dụng compose.NewGraphBranch kết hợp với compose.ProcessState để kiểm tra độ phức tạp của câu truy vấn dựa trên từ khóa và độ dài chuỗi ký tự, từ đó quyết định rẽ nhánh đồ thị:

package routing

import (
	"context"
	"strings"

	"github.com/cloudwego/eino/compose"
	"github.com/cloudwego/eino/schema"
)

// QueryState lưu trữ thông tin câu truy vấn để ra quyết định định tuyến
type QueryState struct {
	Query     string
	IsComplex bool
}

// ModelRouterBranch thực hiện định tuyến động dựa trên trạng thái câu hỏi
var ModelRouterBranch = compose.NewGraphBranch(func(ctx context.Context, input *schema.Message) (string, error) {
	var nextNode string
	
	err := compose.ProcessState[*QueryState](ctx, func(ctx context.Context, state *QueryState) error {
		queryLower := strings.ToLower(state.Query)
		
		// Ràng buộc định tuyến: Nếu câu truy vấn dài (> 80 ký tự) hoặc chứa các từ khóa so sánh/phân tích phức tạp
		if len(state.Query) > 80 || 
			strings.Contains(queryLower, "so sánh") || 
			strings.Contains(queryLower, "phân tích") || 
			strings.Contains(queryLower, "tại sao") {
			
			state.IsComplex = true
			nextNode = "advanced_llm_node"
		} else {
			state.IsComplex = false
			nextNode = "cheap_llm_node"
		}
		return nil
	})
	
	return nextNode, err
}, map[string]bool{
	"cheap_llm_node":    true, // Node gpt-4o-mini
	"advanced_llm_node": true, // Node Gemini 1.5 Pro
})

3. Server-Sent Events (SSE) Streaming HTTP Handler

Độ trễ khởi đầu (Time-to-First-Token - TTFT) rất quan trọng trong trải nghiệm trò chuyện AI. Bằng cách sử dụng Server-Sent Events (SSE), backend có thể đẩy từng token do LLM sinh ra về trình duyệt ngay lập tức thông qua giao thức HTTP chuẩn.

Đoạn mã Go dưới đây thiết lập một handler SSE chuẩn nhận luồng Stream từ Eino và đảm bảo giải phóng tài nguyên bằng cách gọi defer streamReader.Close() nhằm tránh rò rỉ Goroutine:

package sse

import (
	"context"
	"fmt"
	"net/http"

	"github.com/cloudwego/eino/compose"
	"github.com/cloudwego/eino/schema"
)

// StreamSSEHandler xử lý yêu cầu HTTP và đẩy dữ liệu dạng Server-Sent Events
func StreamSSEHandler(w http.ResponseWriter, r *http.Request, runnable compose.Runnable[[]*schema.Message, *schema.StreamReader[*schema.Message]]) {
	// 1. Thiết lập các HTTP Header bắt buộc cho SSE
	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("Transfer-Encoding", "chunked")

	// Đảm bảo Web Server hỗ trợ Streaming (Flusher)
	flusher, ok := w.(http.Flusher)
	if !ok {
		http.Error(w, "Trình duyệt hoặc máy chủ không hỗ trợ Response Streaming", http.StatusInternalServerError)
		return
	}

	query := r.URL.Query().Get("q")
	if query == "" {
		http.Error(w, "Tham số truy vấn 'q' không được trống", http.StatusBadRequest)
		return
	}

	input := []*schema.Message{schema.UserMessage(query)}

	// 2. Kích hoạt Eino Stream để đọc dữ liệu từng phần
	streamReader, err := runnable.Stream(r.Context(), input)
	if err != nil {
		http.Error(w, fmt.Sprintf("Không thể khởi tạo luồng Stream: %v", err), http.StatusInternalServerError)
		return
	}
	// QUAN TRỌNG: Luôn Close streamReader để giải phóng goroutine chạy ngầm của Eino
	defer streamReader.Close() 

	// 3. Vòng lặp nhận dữ liệu và đẩy xuống Client
	for {
		msg, err := streamReader.Recv()
		if err != nil {
			// Nhận tín hiệu EOF khi stream kết thúc tự nhiên
			break
		}
		
		// Ghi dữ liệu theo định dạng chuẩn SSE (data: <nội dung>\n\n)
		_, _ = fmt.Fprintf(w, "data: %s\n\n", msg.Content)
		flusher.Flush() // Đẩy dữ liệu ra mạng ngay lập tức
	}

	// Gửi sự kiện kết thúc luồng SSE để Frontend đóng kết nối
	_, _ = fmt.Fprint(w, "event: done\ndata: [DONE]\n\n")
	flusher.Flush()
}

4. Giám Sát Với OpenTelemetry (OTel Telemetry Callbacks)

Framework Eino sở hữu kiến trúc hướng khía cạnh (Aspect-Oriented) cho phép can thiệp vào vòng đời thực thi của các thành phần thông qua cơ chế Callback. Mặc dù gói tích hợp OpenTelemetry chính thức đang được thảo luận (Xem Eino Issue #1028), chúng ta hoàn toàn có thể tự triển khai một callbacks.Handler tùy biến để ghi nhận vết thực thi (Spans) và đo lường số lượng Token tiêu thụ.

Đoạn mã dưới đây sử dụng OpenTelemetry Go SDK để trace thời gian chạy của từng Node và đính kèm thông tin token dựa trên đặc tả Semantic Conventions:

package telemetry

import (
	"context"

	"github.com/cloudwego/eino/callbacks"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
)

var tracer = otel.Tracer("eino-search-agent")

type spanCtxKey struct{}

// NewOTelCallbackHandler tạo bộ xử lý sự kiện tùy biến phục vụ giám sát
func NewOTelCallbackHandler() callbacks.Handler {
	return callbacks.NewHandlerBuilder().
		// Kích hoạt khi bắt đầu chạy một Node trong đồ thị
		OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
			// Bắt đầu một Span mới dựa trên tên của Node tương ứng
			ctx, span := tracer.Start(ctx, info.ComponentName, trace.WithSpanKind(trace.SpanKindInternal))
			span.SetAttributes(
				attribute.String("eino.component.type", string(info.ComponentType)),
				attribute.String("eino.component.name", info.ComponentName),
			)
			
			// Lưu Span vào context để Node tiếp theo hoặc callback kết thúc có thể truy cập
			return context.WithValue(ctx, spanCtxKey{}, span)
		}).
		// Kích hoạt khi Node hoàn thành xử lý thành công
		OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
			if span, ok := ctx.Value(spanCtxKey{}).(trace.Span); ok {
				// Ghi nhận số lượng token tiêu thụ theo đặc tả chuẩn OpenTelemetry Semantic Conventions
				if usage, ok := output.Config["token_usage"].(map[string]int); ok {
					span.SetAttributes(
						attribute.Int("gen_ai.usage.input_tokens", usage["input"]),
						attribute.Int("gen_ai.usage.output_tokens", usage["output"]),
					)
				}
				span.End() // Đóng Span
			}
			return ctx
		}).
		// Kích hoạt khi Node xảy ra lỗi trong quá trình thực thi
		OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
			if span, ok := ctx.Value(spanCtxKey{}).(trace.Span); ok {
				span.RecordError(err) // Đánh dấu lỗi trong Span
				span.End()
			}
			return ctx
		}).
		Build()
}

Tổng Kết Chuỗi Bài Viết: Hành Trình Agentic Search Engine

Trải qua 6 phần chuyên sâu, chúng ta đã đi từ những khái niệm kiến trúc cơ bản cho tới giải pháp tối ưu vận hành thực tế cho một hệ thống tìm kiếm trợ lý AI:

  1. Phần 1: The Paradigm Shift: Tìm hiểu lý do tại sao kiến trúc AI Agent trên nền tảng Golang mang lại hiệu suất vượt trội so với Python nhờ cơ chế Concurrency và compile-time safety.
  2. Phần 2: Data Ingestion & Chunking: Thiết kế quy trình xử lý dữ liệu sản phẩm thô, phân tách thông minh để giữ nguyên cấu trúc phân cấp và mối liên hệ ngữ nghĩa.
  3. Phần 3: Làm Chủ Qdrant Hybrid Search: Kết hợp sức mạnh của Vector Search (Dense) cùng bộ lọc thuộc tính cứng để giải quyết bài toán lọc sản phẩm chính xác theo thời gian thực.
  4. Phần 4: Active RAG & Strict Tool Calling: Biến LLM tĩnh thành tác tử động có khả năng gọi API kiểm tra trạng thái kho hàng và chương trình khuyến mãi thực tế.
  5. Phần 5: Self-Reflection Critique Loop: Thiết lập chu trình tự đánh giá và sửa lỗi lặp lại nhằm kiểm soát chất lượng đầu ra, triệt tiêu lỗi ảo giác (hallucination).
  6. Phần 6: Production Operations: Hoàn thiện bài toán chi phí, độ trễ và khả năng giám sát hệ thống với Semantic Cache, Model Routing, SSE và OpenTelemetry.

Đây chính là Bản thiết kế kiến trúc (Architecture Blueprint) hoàn chỉnh giúp bạn tự tin xây dựng và vận hành hệ thống AI Search thế hệ mới trên Go. Chúc các bạn ứng dụng thành công vào dự án thực tế của mình!