1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| { "name": "source-test", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "$MYSQL_HOST", "database.port": "3306", "database.user": "xxx", "database.password": "xxx", "database.server.name": "source-test", "database.include.list": "testdatabase", "table.exclude.list": "trade_service.django_admin_log,trade_service.django_content_type", "database.history.kafka.bootstrap.servers": "$KAFKAS", "time.precision.mode": "connect", "database.history.kafka.topic": "schema-changes.source-test", "max.queue.size": "81290", "max.batch.size": "20480", "snapshot.locking.mode": "none", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://kafka-connect-schema-registry.kafka-connect", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://kafka-connect-schema-registry.kafka-connect", "topic.creation.default.cleanup.policy": "delete", "topic.creation.default.replication.factor": 3, "topic.creation.default.partitions": 1, "topic.creation.default.retention.ms": 7776000000, "producer.override.compression.type": "zstd", "producer.override.buffer.memory": 67108864, "producer.override.max.request.size": 62914560, "producer.override.acks":"1" } }
创建源同步配置: curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://kafka-connect-base.kafka-connect/connectors/ -d '{"name":"source-test","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"$MYSQL_HOST","database.port":"3306","database.user":"xxx","database.password":"xxx","database.server.name":"source-test","database.include.list":"testdatabase","table.exclude.list":"trade_service.django_admin_log,trade_service.django_content_type","database.history.kafka.bootstrap.servers":"$KAFKAS","time.precision.mode":"connect","database.history.kafka.topic":"schema-changes.source-test","max.queue.size":"81290","max.batch.size":"20480","snapshot.locking.mode":"none","value.converter":"io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url":"http://kafka-connect-schema-registry.kafka-connect","key.converter":"io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url":"http://kafka-connect-schema-registry.kafka-connect","topic.creation.default.cleanup.policy":"delete","topic.creation.default.replication.factor":3,"topic.creation.default.partitions":1,"topic.creation.default.retention.ms":7776000000,"producer.override.compression.type":"zstd","producer.override.buffer.memory":67108864,"producer.override.max.request.size":62914560,"producer.override.acks":"1"}}'
查看结果
curl -H "Accept:application/json" http://kafka-connect-base.kafka-connect/connectors
./kafka-topics.sh --list --bootstrap-server $KAFKAS
kafka-console-consumer.sh --bootstrap-server $KAFKAS --topic source_connector.data_source.customers --from-beginning
|