构建从分布式SQLite到Couchbase的CDC数据管道并用DVC管理其Schema演进


我们面临一个棘手的现实:数百个边缘节点上的应用各自将状态写入本地的SQLite文件。这些数据需要近实时地汇集到一个中央数据存储进行分析。直接的文件同步或定期的批量导出都无法满足时效性要求,并且在网络不稳定的环境下极其脆弱。我们需要的是一套针对分布式SQLite文件的变更数据捕获(Change Data Capture, CDC)机制。

传统的CDC方案依赖于数据库的事务日志(如MySQL的binlog或Postgres的WAL),但SQLite作为一个嵌入式文件数据库,并没有一个标准的、可供外部消费的日志流。这意味着我们必须从零开始构建捕获代理。

初步构想是利用SQLite的触发器(Triggers)。当INSERT, UPDATE, DELETE操作发生时,触发器可以将变更的行数据写入一个专门的outbox表。然后,一个独立的代理进程会轮询这个outbox表,将变更事件发送到消息队列。

这个方案的技术栈选型如下:

  • 代码组织: Monorepo。捕获代理、消息消费者、共享类型定义和部署脚本都放在一个仓库中,使用pnpm的workspace进行管理。这对于维护这样一个多组件系统至关重要,能保证接口定义的一致性和原子化的提交。
  • 消息中间件: RabbitMQ。它提供了可靠的消息投递(ACK机制)、持久化和灵活的路由,足以应对边缘网络不稳定的情况。
  • 目标数据库: Couchbase。它是一个分布式的NoSQL文档数据库,其灵活的JSON格式非常适合存储来自不同版本SQLite的、结构可能略有差异的CDC事件。其内置的缓存层也能提供高性能的读取。
  • Schema与配置版本控制: DVC (Data Version Control)。这是整个方案的关键。边缘应用的SQLite schema会随着版本迭代而演进。我们不能硬编码处理逻辑。DVC可以像Git一样管理数据和配置文件,但它更适合大文件和结构化数据。我们将用它来版本化每个SQLite版本的schema定义(.sql文件)以及关联的转换逻辑,确保整个数据管道的演进是可追溯和可复现的。

Monorepo项目结构搭建

整个项目的根目录结构清晰地反映了各个组件的职责。

# directory-structure
cdc-pipeline/
├── pnpm-workspace.yaml
├── package.json
├── packages/
│   ├── sqlite-capture-agent/    # 运行在边缘节点的捕获代理
│   │   ├── src/
│   │   ├── package.json
│   │   └── tsconfig.json
│   ├── mq-couchbase-consumer/     # 消费消息并写入Couchbase的服务
│   │   ├── src/
│   │   ├── package.json
│   │   └── tsconfig.json
│   └── shared-types/            # 共享的TypeScript类型定义
│       ├── src/
│       ├── package.json
│       └── tsconfig.json
└── schemas/                     # DVC管理的核心区域
    ├── v1/
    │   ├── user_schema.sql
    │   └── user_schema.sql.dvc  # DVC元数据文件
    └── v2/
        ├── user_schema.sql
        └── user_schema.sql.dvc

pnpm-workspace.yaml 文件内容很简单,它告诉pnpm去哪里寻找子包:

# pnpm-workspace.yaml
packages:
  - 'packages/*'

这样,在根目录运行 pnpm install 就会安装所有子包的依赖。

核心组件一:SQLite变更捕获代理

这是最关键的自研部分。代理需要完成三件事:

  1. 初始化目标SQLite数据库,确保outbox表和相关触发器存在。
  2. 轮询outbox表,读取未处理的变更。
  3. 将变更事件序列化后,可靠地发送到RabbitMQ。

数据库初始化与触发器

假设我们的业务表是users。我们需要为它的INSERT, UPDATE, DELETE操作都创建触发器。

// packages/sqlite-capture-agent/src/db-setup.ts
import Database from 'better-sqlite3';

const CREATE_USERS_TABLE = `
  CREATE TABLE IF NOT EXISTS users (
    id TEXT PRIMARY KEY,
    name TEXT NOT NULL,
    email TEXT,
    version INTEGER DEFAULT 1,
    last_updated DATETIME DEFAULT CURRENT_TIMESTAMP
  );
`;

const CREATE_OUTBOX_TABLE = `
  CREATE TABLE IF NOT EXISTS cdc_outbox (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    event_id TEXT NOT NULL UNIQUE,
    table_name TEXT NOT NULL,
    operation TEXT NOT NULL,
    payload_before TEXT, -- JSON format
    payload_after TEXT,  -- JSON format
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    processed BOOLEAN DEFAULT 0
  );
`;

const CREATE_USER_INSERT_TRIGGER = `
  CREATE TRIGGER IF NOT EXISTS t_users_insert
  AFTER INSERT ON users
  BEGIN
    INSERT INTO cdc_outbox (event_id, table_name, operation, payload_after)
    VALUES (
      lower(hex(randomblob(16))),
      'users',
      'INSERT',
      json_object('id', NEW.id, 'name', NEW.name, 'email', NEW.email, 'version', NEW.version)
    );
  END;
`;

// UPDATE 和 DELETE 的触发器类似,分别记录 NEW 和 OLD 的值
// ... 省略UPDATE和DELETE触发器代码 ...

export function initializeDatabase(dbPath: string): Database.Database {
  const db = new Database(dbPath, { verbose: console.log });
  db.pragma('journal_mode = WAL'); // WAL模式对并发读写更友好

  // 使用事务确保原子性
  const setupTx = db.transaction(() => {
    db.exec(CREATE_USERS_TABLE);
    db.exec(CREATE_OUTBOX_TABLE);
    db.exec(CREATE_USER_INSERT_TRIGGER);
    // ... 执行其他触发器创建语句 ...
  });
  
  setupTx();
  console.log('Database initialized with CDC triggers.');
  return db;
}

这里的坑在于,触发器内的json_object函数需要SQLite 3.9.0以上版本。在资源受限的边缘设备上,需要确认SQLite版本。

代理轮询与发送逻辑

代理的核心循环需要健壮,能处理网络中断和程序崩溃。我们将通过processed字段和数据库事务来保证事件至少被处理一次。

// packages/sqlite-capture-agent/src/agent.ts
import Database from 'better-sqlite3';
import amqplib from 'amqplib';
import { v4 as uuidv4 } from 'uuid';

const BATCH_SIZE = 100;
const POLLING_INTERVAL_MS = 2000;

interface OutboxEvent {
  id: number;
  event_id: string;
  table_name: string;
  operation: string;
  payload_before: string | null;
  payload_after: string | null;
}

export class CaptureAgent {
  private db: Database.Database;
  private mqChannel: amqplib.Channel | null = null;
  private timer: NodeJS.Timeout | null = null;
  private isProcessing = false;

  constructor(private dbPath: string, private mqUrl: string, private queueName: string) {
    // 实际项目中,数据库连接的创建应包含重试逻辑
    this.db = new Database(dbPath);
  }
  
  public async start() {
    await this.connectMq();
    this.timer = setInterval(() => this.pollAndProcess(), POLLING_INTERVAL_MS);
    console.log('Capture agent started.');
  }

  private async connectMq() {
    try {
      const connection = await amqplib.connect(this.mqUrl);
      this.mqChannel = await connection.createChannel();
      await this.mqChannel.assertQueue(this.queueName, { durable: true });
      console.log('Connected to RabbitMQ.');
    } catch (error) {
      console.error('Failed to connect to RabbitMQ, retrying in 5s...', error);
      setTimeout(() => this.connectMq(), 5000);
    }
  }

  private async pollAndProcess() {
    if (this.isProcessing || !this.mqChannel) {
      return;
    }

    this.isProcessing = true;
    
    // 查询并锁定一批未处理的事件
    const selectStmt = this.db.prepare(
      `SELECT * FROM cdc_outbox WHERE processed = 0 ORDER BY id ASC LIMIT ?`
    );
    const updateStmt = this.db.prepare(
      `UPDATE cdc_outbox SET processed = 1 WHERE id = ?`
    );

    const processTx = this.db.transaction((events: OutboxEvent[]) => {
      for (const event of events) {
        const message = {
          // 在消息中加入元数据,例如schema版本
          metadata: {
            eventId: event.event_id,
            sourceNodeId: 'edge-node-001', // 从配置中读取
            schemaVersion: 'v1', // 关键!后续会用DVC管理
            timestamp: new Date().toISOString()
          },
          tableName: event.table_name,
          operation: event.operation,
          before: event.payload_before ? JSON.parse(event.payload_before) : null,
          after: event.payload_after ? JSON.parse(event.payload_after) : null,
        };

        const successfullySent = this.mqChannel!.sendToQueue(
          this.queueName,
          Buffer.from(JSON.stringify(message)),
          { persistent: true }
        );

        if (!successfullySent) {
          // RabbitMQ的Node.js客户端会在缓冲区满时返回false
          // 这是一种背压机制。我们应该停止处理并等待 drain 事件
          // 为简化示例,这里我们直接抛出错误回滚事务
          console.warn('RabbitMQ buffer is full. Pausing sending.');
          throw new Error('MQ_BUFFER_FULL');
        }
        
        updateStmt.run(event.id);
      }
    });

    try {
      const eventsToProcess = selectStmt.all(BATCH_SIZE) as OutboxEvent[];
      if (eventsToProcess.length > 0) {
        console.log(`Processing ${eventsToProcess.length} events.`);
        processTx(eventsToProcess);
      }
    } catch (error) {
      console.error('Failed to process outbox batch. Will retry.', error);
      // 事务回滚,事件的 processed 字段仍为 0,下次轮询会重试
    } finally {
      this.isProcessing = false;
    }
  }
}

这个实现的关键点是事务。我们将消息发送和更新processed字段包裹在同一个SQLite事务中。如果消息发送失败(例如,RabbitMQ缓冲区满),事务会回滚,processed字段不变,保证了数据不会丢失。

核心组件二:DVC管理Schema演进

随着业务发展,users表可能需要增加一个department字段。这就是Schema演进。如果不加管理,消费者服务很快就会因为遇到不认识的字段而崩溃。

我们使用DVC来追踪schemas/目录。

  1. 初始化DVC
    在项目根目录运行 dvc init

  2. 追踪第一个版本的Schema
    schemas/v1/user_schema.sql 文件内容:

    CREATE TABLE users (
      id TEXT PRIMARY KEY,
      name TEXT NOT NULL,
      email TEXT,
      version INTEGER,
      last_updated DATETIME
    );

    使用DVC添加追踪:

    dvc add schemas/v1/user_schema.sql

    这会生成一个.dvc文件,并把user_schema.sql的哈希值记录下来。然后将.dvc文件和.gitignore中自动添加的/schemas/v1/user_schema.sql提交到Git。

  3. 演进到第二版Schema
    创建schemas/v2/user_schema.sql

    CREATE TABLE users (
      id TEXT PRIMARY KEY,
      name TEXT NOT NULL,
      email TEXT,
      department TEXT, -- 新增字段
      version INTEGER,
      last_updated DATETIME
    );

    再次添加追踪:

    dvc add schemas/v2/user_schema.sql

    现在,Git仓库的每次提交都精确对应一个完整的、可复现的Schema版本集合。CI/CD流水线可以利用dvc pull拉取指定Git版本的Schema文件,用于测试或部署。

捕获代理发送的消息中包含schemaVersion字段。消费端就可以根据这个版本号来决定如何解析和处理消息。

核心组件三:消息消费者与写入Couchbase

消费者的职责是:

  1. 从RabbitMQ接收消息。
  2. 根据消息元数据中的schemaVersion,选择合适的逻辑来解析和转换数据。
  3. 将转换后的数据写入Couchbase。
// packages/mq-couchbase-consumer/src/consumer.ts
import amqplib from 'amqplib';
import couchbase from 'couchbase';

// 实际项目中,这些配置应来自环境变量或配置文件
const MQ_URL = 'amqp://localhost';
const QUEUE_NAME = 'cdc_queue';
const COUCHBASE_CONN_STR = 'couchbase://localhost';
const COUCHBASE_BUCKET = 'cdc-events';
const COUCHBASE_USER = 'admin';
const COUCHBASE_PASS = 'password';

export class Consumer {
  private mqChannel: amqplib.Channel | null = null;
  private couchbaseCluster: couchbase.Cluster | null = null;
  private couchbaseBucket: couchbase.Bucket | null = null;

  async start() {
    await this.connectAll();
    this.listen();
  }

  private async connectAll() {
    // 省略MQ和Couchbase的连接与错误处理代码...
    this.couchbaseCluster = await couchbase.connect(COUCHBASE_CONN_STR, {
      username: COUCHBASE_USER,
      password: COUCHBASE_PASS,
    });
    this.couchbaseBucket = this.couchbaseCluster.bucket(COUCHBASE_BUCKET);
  }

  private listen() {
    if (!this.mqChannel) return;

    this.mqChannel.consume(QUEUE_NAME, async (msg) => {
      if (!msg) {
        return;
      }

      try {
        const content = JSON.parse(msg.content.toString());
        const { metadata, tableName, operation, after } = content;

        // 根据Schema版本进行数据转换
        const transformedData = this.transform(after, metadata.schemaVersion);

        if (operation === 'INSERT' || operation === 'UPDATE') {
          // 使用记录的ID作为Couchbase文档的ID,实现upsert
          const docId = `${tableName}::${transformedData.id}`;
          await this.couchbaseBucket!.defaultCollection().upsert(docId, transformedData);
        } else if (operation === 'DELETE') {
          const docId = `${tableName}::${content.before.id}`;
          await this.couchbaseBucket!.defaultCollection().remove(docId);
        }

        // 消息处理成功,进行ACK
        this.mqChannel!.ack(msg);

      } catch (error) {
        console.error('Error processing message, sending to NACK for requeue.', error);
        // 处理失败,nack(msg, false, true)表示不处理,并让MQ重新投递
        // 在生产环境中,需要有更复杂的死信队列(DLQ)机制
        this.mqChannel!.nack(msg, false, true);
      }
    });
  }

  // 这里的转换逻辑是核心
  private transform(payload: any, schemaVersion: string): any {
    if (!payload) return null;
    
    // 单元测试应覆盖所有版本的转换逻辑
    switch (schemaVersion) {
      case 'v1':
        // v1版本的数据已经是我们想要的格式
        return {
          id: payload.id,
          name: payload.name,
          email: payload.email,
          data_schema_version: 'v1'
        };
      case 'v2':
        // v2版本多了department字段
        return {
          id: payload.id,
          name: payload.name,
          email: payload.email,
          department: payload.department || 'UNKNOWN', // 提供默认值,增强健壮性
          data_schema_version: 'v2'
        };
      default:
        // 未知版本,抛出错误让消息进入重试或死信队列
        throw new Error(`Unsupported schema version: ${schemaVersion}`);
    }
  }
}

架构图

整个系统的流程可以通过下面的图来概括:

graph TD
    subgraph Monorepo
        direction LR
        DVC[DVC: Manages Schemas]
        AgentCode[sqlite-capture-agent]
        ConsumerCode[mq-couchbase-consumer]
        SharedTypes[shared-types]
    end

    subgraph Edge Node
        direction TB
        App[Application] -- Writes --> SQLite
        SQLite -- Triggers --> OutboxTable[Outbox Table]
        Agent[Capture Agent] -- Polls --> OutboxTable
    end

    subgraph Central Services
        direction TB
        MQ[RabbitMQ]
        Consumer[MQ Consumer]
        CB[Couchbase Cluster]
    end

    DVC --> AgentCode & ConsumerCode
    Agent -- Sends Event --> MQ
    MQ -- Delivers Event --> Consumer
    Consumer -- Writes Document --> CB

这个架构虽然组件多,但职责单一,每个部分都可以独立测试和部署。Monorepo和DVC的引入,解决了在演进过程中最头疼的协同和一致性问题。

方案的局限性与未来展望

此方案并非没有缺点。首先,基于触发器和轮询的捕获机制会给源SQLite数据库带来额外开销,并且存在毫秒到秒级的延迟。对于需要更低延迟的场景,可能需要探索直接解析SQLite的WAL日志文件,但这会带来巨大的复杂性。

其次,消费端的transform函数目前是硬编码的switch语句。当schema版本增多时,这里会变得臃肿。未来的一个优化方向是,将转换逻辑也作为脚本(例如JS或Lua脚本)和schema文件一起用DVC进行版本化管理。消费者在启动时加载所有版本的转换脚本,运行时动态调用,实现真正的配置驱动。

最后,错误处理机制虽然考虑了重试,但一个完善的系统还需要一个死信队列(Dead Letter Queue)和配套的监控告警,以便于开发者能及时发现并处理那些持续失败的“毒丸”消息。


  目录