Python Debezium + Kafka 的 CDC 实践

2次阅读

debezium connectors 显示 unassigned 的根本原因是 kafka connect 集群未正常协调,通常由 group.id、converter 配置不一致、内部 topic 缺失、mysql binlog 设置错误、权限不足、jvm metaspace 不足或配置项隐式依赖导致。

Python Debezium + Kafka 的 CDC 实践

Debezium 启动后 connectors 一直显示 UNASSIGNED

这是最常见的卡点:Kafka Connect 集群没真正“连上”,或者 Worker 配置没对齐。根本原因通常是 group.idkey.converter/value.converter 在 Connect 配置和 Debezium connector 配置里不一致,导致 Worker 拒绝加入协调组。

实操建议:

立即学习Python免费学习笔记(深入)”;

  • 检查 connect-distributed.properties 中的 group.id 是否唯一(别和 Kafka consumer 冲突),且所有 Worker 实例用同一值
  • key.convertervalue.converter 必须设为 org.apache.kafka.connect.json.JsonConverter,且 schemas.enable=true —— Debezium 的 schema 信息依赖这个开关
  • 确认 offset.storage.topic 已手动创建(比如 25 分区 + 3 副本),且 topic 名和配置中完全一致;否则 Worker 日志里会静默失败,只报 UNASSIGNED
  • 启动后立刻查 kafka-topics.sh --list --bootstrap-server localhost:9092,看是否出现了 connect-offsetsconnect-configsconnect-status 这三个内部 topic,缺一个就说明配置没生效

MySQL binlog 配置不满足 Debezium 要求,connector 立即失败

Debezium 不是“连上 MySQL 就能跑”,它对 binlog 格式、row image、server_id 都有硬性要求。错一条,日志里就报 Failed to start connector 或直接 Error Task threw an uncaught and unrecoverable exception

实操建议:

立即学习Python免费学习笔记(深入)”;

  • MySQL 必须开启 binlog_format=ROWbinlog_row_image=FULL —— MINIMALNOBLOB 会导致字段缺失或解析失败
  • server_id 必须是非零整数(不能是字符串或 0),且每个 MySQL 实例唯一;如果用 docker,注意不要多个容器共用同一个 server_id
  • 账号权限必须包含:selectRELOADSHOW databaseSREPLICATION SLAVEREPLICATION CLIENT;少 REPLICATION SLAVE 会报 access denied; you need (at least one of) the SUPER or SYSTEM_VARIABLES_ADMIN privilege(s)
  • 首次启动前,执行 FLUSH LOGS,确保 Debezium 从最新 binlog 开始读;否则可能因 position 偏移错乱而卡住

python 应用消费 Kafka CDC 数据时,json.loads()JSONDecodeError

不是数据坏了,是 Debezium 默认输出的 value 是嵌套结构,外层带 schemapayload 字段,直接 json.loads(value) 解的是整个消息体,不是业务数据本身。

实操建议:

立即学习Python免费学习笔记(深入)”;

  • 先用 kafka-console-consumer.sh 看一眼原始消息:./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mysql.inventory.products --from-beginning --max-messages 1 --value-deserializer org.apache.kafka.common.serialization.StringDeserializer,确认结构是不是 {"schema":{...},"payload":{...}}
  • Python 里正确解法是:先 json.loads(value),再取 data = parsed["payload"];如果 payloadNULL(比如 delete 事件),要判空,否则 KeyError
  • 如果想跳过 schema 直接消费 payload,可在 connector 配置加 "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",但要注意这会丢失操作类型(op 字段)和时间戳,得靠 "transforms.unwrap.delete.handling.mode": "rewrite" 补 DELETE 场景

Kafka Connect JVM 内存不足,OutOfMemoryError: Metaspace 频发

Debezium connector 加载大量 class(尤其 MySQL connector 依赖多),默认 JVM 参数撑不住,Worker 会反复 crash,日志里出现 java.lang.OutOfMemoryError: Metaspace,而不是内存溢出。

实操建议:

立即学习Python免费学习笔记(深入)”;

  • 在启动 Connect 的脚本里显式加大 Metaspace:加 JVM 参数 -XX:MetaspaceSize=512m -XX:MaxMetaspaceSize=1g,别只调 -Xmx
  • 避免在一个 Worker 上部署过多 connector(尤其不同数据库类型混跑),每个 connector 都会加载独立类加载器,Metaspace 消耗翻倍
  • MySQL connector 升级到 2.4+ 后,可启用 "database.history.skip.initial.state": "true" 减少启动时的 schema 扫描压力,间接降低类加载量
  • 如果用 Confluent Platform,注意 cp-kafka-connect 镜像默认没开 JMX,加 KAFKA_JMX_OPTS 方便后续用 jstat 查 Metaspace 使用率

真正麻烦的不是配错哪一项,而是几个配置项之间存在隐含依赖——比如 transforms.unwrap 开了,但没配 delete.handling.mode,DELETE 事件就会丢;又比如 offset.storage.topic 创建了但副本数小于 offset.storage.replication.factor,Worker 就永远卡在 REBALANCING。这些坑不会明说,只在日志里埋个 WARN,等数据对不上才回头翻。

text=ZqhQzanResources