AgentSkillsCN

clickhouse-cdc

在将数据从关系型数据库(PostgreSQL、MySQL、MongoDB)同步到ClickHouse时使用。涵盖使用Debezium、Airbyte或自定义触发器的数据变更捕获。包括处理模式演化、DELETE操作以及保持一致性。不适用于消息队列(参见clickhouse-streaming)或查询优化(参见clickhouse-patterns)。

SKILL.md
--- frontmatter
name: clickhouse-cdc
description: Use when syncing data FROM relational databases (PostgreSQL, MySQL, MongoDB) TO ClickHouse. Covers change data capture using Debezium, Airbyte, or custom triggers. Includes handling schema evolution, DELETE operations, and maintaining consistency. NOT for message queues (see clickhouse-streaming) or query optimization (see clickhouse-patterns).

ClickHouse CDC Patterns

Overview

Change Data Capture (CDC) replicates database changes to ClickHouse for analytics. Core principle: Capture changes at source, transform for column-storage, handle deletes gracefully.

Key challenge: ClickHouse isn't designed for updates/deletes. Use ReplacingMergeTree or append-only patterns.

When to Use

Symptoms:

  • Need to sync PostgreSQL/MySQL tables to ClickHouse
  • Want real-time analytics on transactional data
  • Schema changes breaking replication pipeline
  • Unsure how to handle DELETE operations

When NOT to use:

  • Streaming from Kafka/message queues → See clickhouse-streaming
  • One-time data migration → Use batch ETL
  • Query optimization → See clickhouse-patterns

Prerequisites

  • Understanding of source database (PostgreSQL triggers, MySQL binlog)
  • Basic ClickHouse knowledge → See clickhouse-patterns
  • Familiarity with ReplacingMergeTree engine

Quick Reference

CDC Method Selection

dot
digraph cdc_methods {
    rankdir=TD;
    node [shape=box, style=rounded];

    start [label="Choose CDC Method", shape=ellipse];
    volume [label="High volume?\n(>10k rows/sec)", shape=diamond];
    complexity [label="Complex schema?\n(many tables)", shape=diamond];

    debezium [label="Debezium\n(production-grade)"];
    airbyte [label="Airbyte\n(managed service)"];
    custom [label="Custom Triggers\n(simple setup)"];

    start -> volume;
    volume -> debezium [label="yes"];
    volume -> complexity [label="no"];
    complexity -> airbyte [label="yes"];
    complexity -> custom [label="no"];
}
MethodBest ForProsCons
DebeziumHigh volume, productionLog-based, no DB impactComplex setup
AirbyteMulti-source, managedUI, pre-built connectorsCost, less control
Custom TriggersSimple, low volumeEasy to understandDB overhead

Critical Patterns

PatternUse CaseKey Technique
Soft DeleteHandle DELETEsAdd is_deleted flag
ReplacingMergeTreeHandle UPDATEsUse version/timestamp column
Schema EvolutionAdd columnsNullable with defaults
BackfillInitial syncSnapshot then CDC

Pattern 1: PostgreSQL with Debezium

Architecture

code
PostgreSQL (WAL) → Debezium → Kafka → ClickHouse Kafka Engine

ClickHouse Setup

sql
-- 1. Kafka staging table
CREATE TABLE users_kafka (
    id UInt64,
    name String,
    email String,
    updated_at DateTime,
    _operation String  -- INSERT, UPDATE, DELETE
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list = 'dbserver1.public.users',
    kafka_group_name = 'clickhouse_consumer',
    kafka_format = 'JSONEachRow';

-- 2. Target table with dedup
CREATE TABLE users (
    id UInt64,
    name String,
    email String,
    updated_at DateTime,
    is_deleted UInt8 DEFAULT 0
) ENGINE = ReplacingMergeTree(updated_at)
ORDER BY id;

-- 3. Transform with materialized view
CREATE MATERIALIZED VIEW users_mv TO users AS
SELECT
    id, name, email, updated_at,
    if(_operation = 'DELETE', 1, 0) AS is_deleted
FROM users_kafka;

-- 4. Query active records
SELECT * FROM users FINAL WHERE is_deleted = 0;

Debezium Connector

json
{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "replicator",
    "database.dbname": "mydb",
    "database.server.name": "dbserver1",
    "table.include.list": "public.users,public.orders",
    "plugin.name": "pgoutput"
  }
}

Pattern 2: PostgreSQL with Custom Triggers

Use Case

  • Low volume (< 1k rows/sec)
  • Simple schema (1-5 tables)
  • No Kafka infrastructure

PostgreSQL Trigger

sql
CREATE OR REPLACE FUNCTION notify_changes()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
        PERFORM pg_notify('table_changes', json_build_object(
            'table', TG_TABLE_NAME,
            'operation', TG_OP,
            'data', row_to_json(NEW)
        )::text);
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        PERFORM pg_notify('table_changes', json_build_object(
            'table', TG_TABLE_NAME,
            'operation', 'DELETE',
            'id', OLD.id
        )::text);
        RETURN OLD;
    END IF;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER users_notify_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION notify_changes();

Node.js CDC Service

typescript
import { Client } from 'pg';
import { ClickHouse } from 'clickhouse';

const pgClient = new Client({ connectionString: process.env.PG_URL });
const clickhouse = new ClickHouse({ url: process.env.CH_URL });

async function startCDC() {
  await pgClient.connect();
  await pgClient.query('LISTEN table_changes');

  pgClient.on('notification', async (msg) => {
    const { table, operation, data, id } = JSON.parse(msg.payload);

    if (operation === 'INSERT' || operation === 'UPDATE') {
      await clickhouse.insert(table, [{ ...data, is_deleted: 0 }]);
    } else if (operation === 'DELETE') {
      await clickhouse.query(`
        ALTER TABLE ${table} UPDATE is_deleted = 1 WHERE id = ${id}
      `).toPromise();
    }
  });
}

Pattern 3: MySQL Binlog

json
{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.user": "debezium",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    "table.include.list": "mydb.orders,mydb.customers"
  }
}

Key differences: Uses binlog (not WAL), requires server.id, schema changes tracked separately.

Handling DELETE Operations

Option 1: Soft Delete (Recommended)

sql
CREATE TABLE orders (
    id UInt64,
    user_id UInt64,
    amount Decimal(10, 2),
    is_deleted UInt8 DEFAULT 0
) ENGINE = ReplacingMergeTree(updated_at)
ORDER BY id;

SELECT * FROM orders FINAL WHERE is_deleted = 0;

Option 2: CollapsingMergeTree

sql
CREATE TABLE orders (
    id UInt64,
    amount Decimal(10, 2),
    sign Int8  -- 1 = active, -1 = deleted
) ENGINE = CollapsingMergeTree(sign)
ORDER BY id;

-- Insert
INSERT INTO orders VALUES (1, 50.00, 1);
-- Delete (insert negative)
INSERT INTO orders VALUES (1, 50.00, -1);
-- Query
SELECT * FROM orders FINAL WHERE sign = 1;

Schema Evolution

Add Columns as Nullable

sql
ALTER TABLE users ADD COLUMN phone String DEFAULT '';
ALTER TABLE users ADD COLUMN country LowCardinality(String) DEFAULT 'UNKNOWN';

Schema Registry Pattern

typescript
const schemaRegistry = {
  'users': { version: 2, columns: ['id', 'name', 'email', 'phone'] }
};

function transformRow(table: string, row: any) {
  return {
    ...row,
    phone: row.phone || '',
    country: row.country || 'UNKNOWN'
  };
}

Backfill Strategy

typescript
// 1. Snapshot existing data
async function backfillTable(tableName: string) {
  const stream = pgClient.query(`COPY (SELECT * FROM ${tableName}) TO STDOUT`);
  await pipeline(stream, csvParser(), clickhouse.insert(tableName).stream());
}

// 2. Start CDC from snapshot position
const lsn = await pgClient.query(`SELECT pg_current_wal_lsn()`);

Common Mistakes

MistakeWhy It FailsFix
No deduplicationDuplicate eventsUse ReplacingMergeTree
Hard deletesNot supported wellSoft delete with flag
Ignoring orderOut-of-order updatesUse version/timestamp
No backfillMissing historical dataSnapshot before CDC

Performance Tips

sql
-- Batch Kafka consumption
SETTINGS kafka_max_block_size = 65536, kafka_poll_timeout_ms = 1000;

-- Async inserts
SET async_insert = 1, wait_for_async_insert = 0;

Monitoring

sql
-- Replication lag
SELECT table, max(synced_at) AS last_sync,
       now() - max(synced_at) AS lag_seconds
FROM system.parts WHERE active GROUP BY table;

-- Kafka consumer errors
SELECT database, table, exceptions, last_exception_time
FROM system.kafka_consumers;

Best Practices

Design:

  • Use ReplacingMergeTree for tables with updates
  • Always include updated_at/version column
  • Implement soft deletes, not hard deletes

Operations:

  • Monitor replication lag (< 1 minute target)
  • Alert on Kafka consumer errors
  • Test schema changes in staging

Performance:

  • Batch Kafka consumption (kafka_max_block_size)
  • Use async inserts
  • Partition by time

Red Flags

  • ❌ "ALTER TABLE DELETE" → Extremely slow
  • ❌ "Updates without version" → Race conditions
  • ❌ "No lag monitoring" → Stale analytics
  • ❌ "Hard deletes" → Data inconsistency

When to Escalate

  • Replication lag > 5 minutes consistently
  • Kafka consumer crashes
  • Schema changes breaking pipeline
  • Multi-datacenter replication needed

Resources: Debezium docs (https://debezium.io), ClickHouse Kafka engine docs

Related Skills

  • ClickHouse fundamentals: See clickhouse-patterns
  • Streaming from queues: See clickhouse-streaming

Remember: CDC is about consistency. Always handle deletes, monitor lag, and test schema changes thoroughly.