Khi dọn nhà rời khỏi Magento 2, chướng ngại vật đầu tiên luôn luôn là cái database schema. Magento đéo thèm lưu dữ liệu thành các hàng phẳng phiu sạch sẽ — nó xài một mô hình gọi là Entity-Attribute-Value (EAV), băm vằm dữ liệu rải rác ra hàng chục cái bảng khác nhau kèm theo tính năng kế thừa theo cấp độ store (store-scope inheritance). Việc hiểu được mớ bòng bong này trước khi cắm đầu vào viết SQL sẽ cứu rỗi bạn vài ngày thanh xuân.

Bài viết này bao quát 2 bài toán trích xuất: export đơn hàng (ca dễ thở hơn) và export catalog sản phẩm (ca này mới thực sự chua chát), tiếp nối bằng một pipeline Node.js chuẩn production để nạp (ingest) mớ dữ liệu đó vào các database của service mới.

Phần 1: Export Đơn hàng (Orders)

Dữ liệu đơn hàng nằm rải rác ở các bảng sales_order, sales_order_address, sales_order_payment, và sales_order_item. Không giống như cái catalog sản phẩm, đám này chỉ xài khóa ngoại (foreign-key joins) tiêu chuẩn — chứ không dính tới vụ pivot của EAV.

Export trọn gói Đơn hàng + Thanh toán + Giao hàng

SELECT
    so.entity_id            AS order_id,
    so.increment_id         AS magento_order_number,
    so.status               AS order_status,
    so.grand_total          AS total_amount,
    so.base_currency_code   AS currency,
    so.created_at           AS order_created_at,
    so.customer_email,
    so.customer_firstname   AS customer_first,
    so.customer_lastname    AS customer_last,

    -- Địa chỉ giao hàng (đã được khử chuẩn)
    soa.street              AS ship_street,
    soa.city                AS ship_city,
    soa.region              AS ship_region,
    soa.postcode            AS ship_postcode,
    soa.country_id          AS ship_country,
    soa.telephone           AS ship_phone,

    -- Phương thức thanh toán
    sop.method              AS payment_method,
    sop.last_trans_id       AS payment_transaction_id,

    -- Giao vận (Sẽ là NULL nếu chưa có mã vận đơn)
    sos.entity_id           AS shipment_id,
    sos.created_at          AS shipped_at

FROM sales_order so
LEFT JOIN sales_order_address soa
    ON soa.parent_id = so.entity_id AND soa.address_type = 'shipping'
LEFT JOIN sales_order_payment sop
    ON sop.parent_id = so.entity_id
LEFT JOIN sales_shipment sos
    ON sos.order_id = so.entity_id

WHERE so.status NOT IN ('canceled', 'fraud')
  AND so.created_at >= '2022-01-01 00:00:00'
ORDER BY so.created_at ASC;

Danh sách Sản phẩm trong Đơn (Lần quét thứ 2)

SELECT
    soi.order_id,
    soi.sku,
    soi.name                AS product_name,
    soi.qty_ordered,
    soi.qty_shipped,
    soi.qty_refunded,
    soi.price               AS unit_price,
    soi.row_total,
    soi.product_type,
    soi.parent_item_id      -- Dòng này sẽ khác null nếu là dòng con của loại hàng configurable

FROM sales_order_item soi

WHERE soi.parent_item_id IS NULL  -- Bỏ qua mấy dòng con ảo bóng ma của hàng configurable
ORDER BY soi.order_id ASC, soi.item_id ASC;

Trong cái script nạp liệu của bạn, cứ đem cục này JOIN với order_id là bạn sẽ dựng lại được hoàn chỉnh cái object đơn hàng.


Phần 2: Export Catalog Sản phẩm (Ca Khó Nhằn)

Đây là chỗ mà đa số các kỹ sư migration ăn hành vì đánh giá sai độ khó. Catalog sản phẩm xài mô hình EAV kết hợp với kế thừa theo Store Scope: một giá trị ở store_id = 0 (Admin/Global) là giá trị mặc định; một giá trị nằm ở một store_id cụ thể sẽ ghi đè lên giá trị mặc định đó dành riêng cho cái store view tương ứng. Một câu SELECT * ngây thơ sẽ trả về mớ rác bị lỗi hoặc thiếu dữ liệu nghiêm trọng.

Cách tiếp cận đúng đắn là một quy trình 2 bước.

Bước 1: Vật chất hóa các ID Thuộc tính (Attribute IDs)

Các giá trị attribute_id mang tính đặc thù theo môi trường — tụi nó đéo giống nhau giữa các con web Magento khác nhau. Hãy chạy cục query này 1 lần duy nhất rồi lấy kết quả đó để phang vào câu query export chính của bạn:

SELECT attribute_id, attribute_code, backend_type
FROM eav_attribute
WHERE entity_type_id = (
    SELECT entity_type_id FROM eav_entity_type
    WHERE entity_type_code = 'catalog_product'
)
AND attribute_code IN (
    'name', 'url_key', 'description', 'short_description',
    'price', 'special_price', 'status', 'visibility', 'weight'
);

Bước 2: Lấy dữ liệu Sản phẩm dạng phẳng có kèm Fallback theo Store-Scope

Cục query này sẽ export các sản phẩm thuộc store store_id = 1. Đối với mỗi thuộc tính, nó sẽ ưu tiên lấy cái giá trị đặc thù của store đó trước, nếu rỗng thì nó lùi về (fallback) lấy giá trị mặc định của hệ thống (store_id = 0). Hãy nhớ thay mấy cái attribute_id bên dưới bằng con số bạn lấy được từ Bước 1:

SELECT
    e.entity_id,
    e.sku,
    e.type_id                                           AS product_type,
    e.created_at,

    -- Tên (varchar): ưu tiên lấy theo store, fallback về global
    COALESCE(v_name_s.value, v_name_g.value)            AS name,
    COALESCE(v_url_s.value, v_url_g.value)              AS url_key,

    -- Trạng thái: 1=Bật, 2=Tắt (int)
    COALESCE(i_status_s.value, i_status_g.value)        AS status,
    -- Độ hiển thị: 1=Ẩn, 4=Hiện ở Catalog+Search (int)
    COALESCE(i_vis_s.value, i_vis_g.value)              AS visibility,

    -- Giá (decimal — trong Magento thằng này mặc định là global)
    d_price.value                                       AS price,
    d_special.value                                     AS special_price,
    d_weight.value                                      AS weight

FROM catalog_product_entity e

-- === VARCHAR: name ===
LEFT JOIN catalog_product_entity_varchar v_name_s
    ON v_name_s.entity_id = e.entity_id AND v_name_s.attribute_id = 73 AND v_name_s.store_id = 1
LEFT JOIN catalog_product_entity_varchar v_name_g
    ON v_name_g.entity_id = e.entity_id AND v_name_g.attribute_id = 73 AND v_name_g.store_id = 0

-- === VARCHAR: url_key ===
LEFT JOIN catalog_product_entity_varchar v_url_s
    ON v_url_s.entity_id = e.entity_id AND v_url_s.attribute_id = 120 AND v_url_s.store_id = 1
LEFT JOIN catalog_product_entity_varchar v_url_g
    ON v_url_g.entity_id = e.entity_id AND v_url_g.attribute_id = 120 AND v_url_g.store_id = 0

-- === INT: status ===
LEFT JOIN catalog_product_entity_int i_status_s
    ON i_status_s.entity_id = e.entity_id AND i_status_s.attribute_id = 96 AND i_status_s.store_id = 1
LEFT JOIN catalog_product_entity_int i_status_g
    ON i_status_g.entity_id = e.entity_id AND i_status_g.attribute_id = 96 AND i_status_g.store_id = 0

-- === INT: visibility ===
LEFT JOIN catalog_product_entity_int i_vis_s
    ON i_vis_s.entity_id = e.entity_id AND i_vis_s.attribute_id = 99 AND i_vis_s.store_id = 1
LEFT JOIN catalog_product_entity_int i_vis_g
    ON i_vis_g.entity_id = e.entity_id AND i_vis_g.attribute_id = 99 AND i_vis_g.store_id = 0

-- === DECIMAL: price, special_price, weight (chỉ có global) ===
LEFT JOIN catalog_product_entity_decimal d_price
    ON d_price.entity_id = e.entity_id AND d_price.attribute_id = 77 AND d_price.store_id = 0
LEFT JOIN catalog_product_entity_decimal d_special
    ON d_special.entity_id = e.entity_id AND d_special.attribute_id = 78 AND d_special.store_id = 0
LEFT JOIN catalog_product_entity_decimal d_weight
    ON d_weight.entity_id = e.entity_id AND d_weight.attribute_id = 80 AND d_weight.store_id = 0

-- Chỉ nhổ ra mấy thằng sản phẩm đang được bật
WHERE COALESCE(i_status_s.value, i_status_g.value) = 1
ORDER BY e.entity_id ASC;

Hiệu năng: Nếu cái catalog có tầm 25,000+ SKUs trở lên, câu query này sẽ lết mỏi mệt. Hãy chạy EXPLAIN ANALYZE trước, đảm bảo bạn đã tạo các composite indexes trên cụm (entity_id, attribute_id, store_id) cho từng cái bảng giá trị EAV, và nhớ chia lô theo cụm entity_id (WHERE e.entity_id BETWEEN 1 AND 5000) để không khóa cứng cmn database production của người ta.


Phần 3: Ingestion Pipeline bằng Node.js chuẩn Production

Sau khi dữ liệu đã được ói ra file CSV, bạn cần một streaming pipeline đủ đô để nuốt hàng gigabyte mà không bị trào bộ nhớ (OOM). Nó phải biết cách chia lô (batching), có logic thử lại (retry), đảm bảo tính lũy đẳng (idempotency), và có một thùng rác (dead-letter queue) để nhốt mấy dòng bị lỗi.

Kiến trúc Pipeline

File CSV → Readable Stream → csv-parse → Bộ gom lô (Batch) → Upsert DB (kèm retry)
                                                          ↓ (nếu tịt sau nhiều lần thử)
                                                    File Thùng rác DLQ (JSONL)

Triển khai code

// migrate.js — Pipeline hút dữ liệu Magento → PostgreSQL chuẩn Production
const { pipeline, Transform } = require('stream');
const { promisify } = require('util');
const { parse } = require('csv-parse');
const fs = require('fs');
const db = require('./db'); // connection pool pg của bạn

const pipe = promisify(pipeline);

const BATCH_SIZE = 500;
const MAX_RETRIES = 3;
const RETRY_BASE_MS = 500;

const dlqStream = fs.createWriteStream('./failed-rows.jsonl', { flags: 'a' });
let processed = 0, failed = 0;
const startTime = Date.now();

// Tính năng Exponential backoff retry (thử lại với độ trễ lũy thừa)
async function withRetry(fn, label) {
    for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
        try {
            return await fn();
        } catch (err) {
            if (attempt === MAX_RETRIES) throw err;
            const delay = RETRY_BASE_MS * Math.pow(2, attempt - 1);
            console.warn(`\n⚠ Lỗi đoạn ${label} (lần ${attempt}). Thử lại sau ${delay}ms…`);
            await new Promise(r => setTimeout(r, delay));
        }
    }
}

// Upsert theo lô — đảm bảo lũy đẳng thông qua magento_order_id
async function upsertBatch(batch) {
    const client = await db.connect();
    try {
        await client.query('BEGIN');
        for (const row of batch) {
            await client.query(`
                INSERT INTO orders (
                    magento_order_id, magento_increment_id, status,
                    total_amount, currency, customer_email, created_at
                ) VALUES ($1,$2,$3,$4,$5,$6,$7)
                ON CONFLICT (magento_order_id) DO UPDATE SET
                    status       = EXCLUDED.status,
                    total_amount = EXCLUDED.total_amount,
                    updated_at   = NOW()
            `, [
                row.order_id, row.magento_order_number, row.order_status,
                parseFloat(row.total_amount) || 0, row.currency,
                row.customer_email, row.order_created_at
            ]);
        }
        await client.query('COMMIT');
    } catch (err) {
        await client.query('ROLLBACK');
        throw err;
    } finally {
        client.release();
    }
}

// Transform stream: gom row thành từng cục, flush từ từ với cơ chế backpressure
function createBatchCollector(batchSize, onBatch) {
    let buffer = [];

    const flush = async (rows, callback) => {
        try {
            await withRetry(() => onBatch(rows), `Lô từ dòng số ${processed}`);
            processed += rows.length;
            process.stdout.write(
                `\r✓ Đã cắn ${processed.toLocaleString()} dòng | ✗ Xịt ${failed} dòng | ` +
                `Tốn ${((Date.now() - startTime) / 1000).toFixed(0)}s`
            );
        } catch (err) {
            failed += rows.length;
            console.error(`\n✗ Cả lô này bó tay: ${err.message}`);
            rows.forEach(r => dlqStream.write(JSON.stringify(r) + '\n'));
        }
        callback();
    };

    return new Transform({
        objectMode: true,
        async transform(row, _enc, callback) {
            buffer.push(row);
            if (buffer.length >= batchSize) {
                const toFlush = buffer.splice(0, batchSize);
                await flush(toFlush, callback);
            } else {
                callback();
            }
        },
        async flush(callback) {
            if (buffer.length > 0) await flush(buffer, callback);
            else callback();
        }
    });
}

async function migrate(csvPath) {
    console.log(`\nĐang hút: ${csvPath} | Size mỗi mẻ: ${BATCH_SIZE} | Số lần Retry: ${MAX_RETRIES}\n`);
    await pipe(
        fs.createReadStream(csvPath, { encoding: 'utf8' }),
        parse({ columns: true, skip_empty_lines: true, trim: true }),
        createBatchCollector(BATCH_SIZE, upsertBatch)
    );
    dlqStream.end();
    const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
    console.log(`\n\n✅ Xong phim trong ${elapsed}s — Đã bú ${processed.toLocaleString()} dòng | Vứt sọt rác ${failed} dòng DLQ`);
    if (failed > 0) console.log(`   Check rác tại: ./failed-rows.jsonl`);
}

migrate(process.argv[2] || './orders.csv').catch(err => {
    console.error('\n✗ Tạch nguyên cái pipeline:', err.message);
    process.exit(1);
});

Các Quyết định Thiết kế Sống còn

Tính lũy đẳng (ON CONFLICT DO UPDATE): Cái pipeline này có thể được bấm chạy lại bất kỳ lúc nào một cách an toàn. Nếu nó sập nguồn ở dòng thứ 47,000, thì những dòng từ 1–47,000 khi bạn chạy lại script sẽ chỉ đơn giản là tự update lại thành y chang giá trị đó. Không có chuyện đẻ ra dữ liệu rác nhân bản (duplicates).

Thùng rác Dead-Letter Queue (DLQ): Những cái lô nào mà retry khô máu rồi vẫn tịt thì sẽ được ghi thẳng vào file failed-rows.jsonl. Sau khi chạy xong quá trình migration, hãy mở cái file đó ra soi, sửa cái lỗi nguyên nhân gốc rễ, rồi xách cái script đó chạy lại nhưng trỏ vào chính cái file DLQ.

Tính năng kìm hãm Backpressure: Cái callback() nằm trong Transform stream sẽ không được thét lên chừng nào hàm upsertBatch chưa nhai xong. Node.js tự động bóp nghẹt tốc độ đọc file (readable stream) khi thấy database đang trào đờm — bạn không cần phải gọi ba trò pause()/resume() bằng tay.

stream.pipeline: Việc xài cái hàm pipeline đã được promisify (bọc trong Promise) thay vì xài .pipe() thủ công nối đuôi nhau sẽ đảm bảo một điều: nếu bất kỳ đoạn stream nào trong cái xích đó bị quăng lỗi, thì tất cả những khúc stream còn lại sẽ bị tự động bóp chết và mọi file handles (cổng kết nối file) sẽ được nhả ra sạch sẽ.

# Chạy migration
node migrate.js ./exports/magento-orders.csv

# Chạy lại mớ row bị tịt
node migrate.js ./failed-rows.jsonl

Để hiểu rõ bức tranh toàn cảnh về kiến trúc, xem việc nhồi mớ dữ liệu này vào hệ sinh thái microservice để làm gì, hãy đọc bài Vì sao bạn nên Migrate từ Magento sang MicroservicesBản vẽ Zero-Downtime: Di chuyển từ Magento sang Microservices.


🤝 Kết nối với tôi

Bạn đang gặp phải những thách thức tương tự về kiến trúc hệ thống, mở rộng quy mô (scaling) hay dịch chuyển (migration)? Hãy kết nối với tôi trên LinkedIn, theo dõi GitHub của tôi, hoặc gửi một email để trao đổi nhé.