Kafka Connect如何从文件系统读取XML并推送到Topic

11次阅读

需借助第三方组件实现kafka Connect读取xml:一、用FilePulse Connector配合Confluent XML Converter,配置XPath解析与jsON转换;二、开发自定义Source Connector,基于dom/SAX解析并构造SourceRecord;三、以Logstash为中间层,通过XML Filter提取字段后推送至Kafka。

Kafka Connect如何从文件系统读取XML并推送到Topic

如果您希望使用Kafka Connect从本地或网络文件系统读取XML格式数据并将其写入Kafka Topic,则需借助支持XML解析的Connector。Kafka Connect原生不支持XML解析,必须通过第三方转换器或自定义Source Connector实现。以下是可行的操作路径:

一、使用Confluent XML Converter配合FilePulse Connector

FilePulse Connector是开源的通用文件源连接器,支持多种格式解析,配合Confluent提供的XML Converter可将XML内容结构化为json Schema格式记录。

1、下载FilePulse Connector JAR包并放入Kafka Connect插件目录(如connect-plugins/)。

2、下载confluentinc/kafka-connect-xml并确保其JAR文件位于同一插件路径下。

3、启动Connect分布式集群后,提交以下REST配置:

4、在POST请求体中指定"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"

5、设置"tasks.max": "1""fs.scan.Directory.path": "/path/to/xml/files"指向XML所在目录。

6、配置"file.filter.Regex.pattern": ".*\.xml$"仅匹配XML文件。

7、启用XML解析:"format.type": "xml",并设置"xml.xpath.expression": "/root/record"提取每条记录节点。

8、指定"transforms": "unwrap""transforms.unwrap.type": "org.apache.kafka.connect.transforms.Flatten$Value"展平嵌套字段。

9、将"key.converter": "org.apache.kafka.connect.storage.StringConverter""value.converter": "io.confluent.connect.xml.XmlConverter"设为对应转换器类。

10、设置"value.converter.schema.registry.url": "http://schema-registry:8081"(若启用Schema Registry)。

二、编写自定义XML Source Connector

当标准Connector无法满足复杂XML结构(如混合文本/属性/命名空间)时,需开发继承SourceConnectorSourceTaskjava实现,直接解析DOM/SAX/StAX流并构造SourceRecord对象

1、创建maven模块,依赖kafka-connect-apiwoodstox-core(用于稳健XML解析)。

2、在XmlSourceConnector中重写configDef()方法,暴露xml.input.pathxml.record.xpath等配置项。

3、于XmlSourceTask中初始化XMLInputFactory,逐文件打开输入流。

4、使用XPath定位目标记录节点,对每个匹配节点调用node.toString()或序列化为map结构。

5、将每条解析结果封装Struct,依据预定义Schema生成SourceRecord实例。

6、在poll()方法中返回记录列表,并设置offset以支持断点续传(如记录文件名+行号或最后修改时间戳)。

7、打包为fat JAR,放入Connect插件目录并重启worker进程。

8、通过rest api注册该Connector,填入"connector.class": "com.example.kafka.connect.xml.XmlSourceConnector"

9、配置"xml.input.path": "/data/inbound""topic": "xml-ingest-topic"

10、确认status返回"RUNNING"且日志中出现"Processed N records from file X.xml"

三、使用Logstash + Kafka Output作为替代管道

若Kafka Connect部署受限或需快速验证流程,可用Logstash作为中间解析层:它内置XML filter插件,支持XPath提取、字段映射与嵌套展开,再通过kafka output插件推送至Topic。

1、安装Logstash并确保logstash-filter-xmllogstash-output-kafka已加载。

2、创建配置文件xml-to-kafka.conf,在input段设置file插件,指定path => "/var/log/xml/*.xml"

3、添加start_position => "beginning"sincedb_path => "/dev/NULL"确保首次读取全部内容。

4、在filter段加入xml插件,设置source => "message"target => "parsed"

5、配置xpath => { "/root/item/text()" => "item_text" }提取关键字段。

6、使用mutate插件重命名或删除冗余字段,如remove_field => ["message", "@version"]

7、在output段配置kafka插件,指定bootstrap_servers => "kafka:9092"topic_id => "xml-topic"

8、设置codec => json确保消息体为JSON格式。

9、执行logstash -f xml-to-kafka.conf启动管道。

10、监控Kafka Topic是否接收到结构化JSON消息,字段与XPath提取一致。

text=ZqhQzanResources