【问题标题】:what is the best way to pass data from mysql to kafka with springboot使用spring boot将数据从mysql传递到kafka的最佳方法是什么
【发布时间】:2020-07-22 01:06:32
【问题描述】:

将数据从 mysql 传递到 kafka 产品的最佳方法是什么

我知道有很好的 kafka 连接器。 但是我想使用springboot框架。因为我需要转换数据并与其他 http 结果合并然后我想将数据生成到 kafka。

那么有什么好的例子吗,github,blog wahtever~?

【问题讨论】:

  • Kafka Connect 将数据导入 Kafka,然后使用 Kafka Streams / ksqlDB 进行处理。试图用 Spring 在一个地方完成这一切通常不是这样做的。

标签: spring-boot apache-kafka producer


【解决方案1】:

最简单的方法是使用 Camel 和 camel-debezium 组件。文档中的一个示例是 following。在推送到 Kafka 主题之前,您可以使用所有 Camel 功能进行任何类型的转换或丰富。我认为如果 mysql 中的源数据不代表您的真实来源并且您需要在推送到 Kafka 之前进行过滤/转换,那么这可能是一个合法的选择。

监听事件:

from("debezium-mysql:dbz-test-1?offsetStorageFileName=/usr/offset-file-1.dat&databaseHostName=localhost&databaseUser=debezium&databasePassword=dbz&databaseServerName=my-app-connector&databaseHistoryFileName=/usr/history-file-1.dat")
    .log("Event received from Debezium : ${body}")
    .log("    with this identifier ${headers.CamelDebeziumIdentifier}")
    .log("    with these source metadata ${headers.CamelDebeziumSourceMetadata}")
    .log("    the event occured upon this operation '${headers.CamelDebeziumSourceOperation}'")
    .log("    on this database '${headers.CamelDebeziumSourceMetadata[db]}' and this table '${headers.CamelDebeziumSourceMetadata[table]}'")
    .log("    with the key ${headers.CamelDebeziumKey}")
    .log("    the previous value is ${headers.CamelDebeziumBefore}")

Produce 阅读/转换后发送给 Kafka:

.to("kafka:{{topic}}")

另外,请查看Spring boot guide

<dependency>
  <groupId>org.apache.camel.springboot</groupId>
  <artifactId>camel-debezium-mysql-starter</artifactId>
  <version>x.x.x</version>
  <!-- use the same version as your Camel core version -->
</dependency>

包含 Camel 转换的详细 github 示例是 here

【讨论】:

    猜你喜欢
    • 2019-01-30
    • 1970-01-01
    • 2018-10-09
    • 1970-01-01
    • 2018-06-12
    • 1970-01-01
    • 2023-03-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多