mirrormaker是kafka自带的topic复制工具,可以让两个集群之间的数据互相导入同步。 很多时候,我们需要把其中一个集群的某个topic数据,复制到另一个集群,但使用不同名字的topic。
官方好像并没有提供这样功能,但留了一个message.handler的入口,我们可以通过额外的插件来实现这样的功能。
首先放上这个插件地址: https://github.com/opencore/mirrormaker_topic_rename.git
使用方法:
直接下载编译好的jar文件:
1
2wget https://github.com/opencore/mirrormaker_topic_rename/files/2024649/mirrormaker_topic_rename.zip
unzip mirrormaker_topic_rename.zip添加环境变量:
1
2
3
4
5
6
7
8# 由于执行mirrormaker可能需要导出比较大的数据量,所以建议适当加些内存。这个根据需要来调试。
export KAFKA_HEAP_OPTS="-Xmx3G"
# 2.这个必写,很关键
export CLASSPATH=/mirrormaker_topic_rename/target/mmchangetopic-1.0-SNAPSHOT.jar
# 3. 源kafka集群 后面均用其来代替
export SOURCE_KAFKAS=xxx:9092
# 4. 目标kafka集群 后面均用其来代替
export SOURCE_KAFKAS=xxx:9092在目标kafka集群 提前创建好topic
1
./kafka-topics.sh --create --topic test_sink_topic --bootstrap-server $SINK_KAFKAS --partitions 3 --replication-factor 3 --config retention.ms=86400000
执行同步命令
1
kafka-mirror-maker.sh --consumer.config /consumer.properties --producer.config producer.properties --whitelist test_source_topic --message.handler com.opencore.RenameTopicHandler --message.handler.args 'test_source_topic,test_sink_topic'
consumer.properties 和 producer.properties
这两个文件需要提前自己创建出来,我发现其实官方好像文档写的也不全。 我这里先贴出来我自己用的。
consumer.properties
1 | bootstrap.servers=$SOURCE_KAFKAS(这个地方需要看着改改) |
producer.properties
1 | bootstrap.servers=$SOURCE_KAFKAS(这个地方需要看着改改) |