【发布时间】:2020-11-26 12:29:40
【问题描述】:
过去几个月我一直在使用 Kafka Connect 最近我包含了 ActiveMQ 源插件,以便读取一些 JMS 主题消息,其中包含一个 json 文件,将它们放在 kafka 主题中,然后在 Ksqldb 中创建一个流/表,将 json 文件的一些键用作列. 问题是该插件将 JMS 消息作为带双引号的文本插入,因此在 Ksqldb 中无法正确识别。 我在配置中尝试了各种方法来修复它,但到目前为止没有任何效果。 我还想在 kafka connect 中使用 json 格式而不是 Avro(也没有运行模式注册表)。 出于测试目的,我还尝试通过将标头内容指定为“application/json”来发送 JMS 消息,但仍然没有运气。
这是我的 ActiveMQ 插件的样子
"config": {"connector.class":"ActiveMQSourceConnector", "tasks.max":"1", "kafka.topic":"activemq", "activemq.url":"tcp://localhost:61616","activemq.username":"admin","activemq.password":"admin","jms.destination.name":"topic.2","jms.destination.type":"topic","jms.message.format":"json","jms.message.converter":"org.apache.kafka.connect.json.JsonConverter","confluent.license":"","confluent.topic.bootstrap.servers":"localhost:9092"}}
这是我的 Kafka 连接配置的样子
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/opt/kafka_2.13-2.5.0/plugins
这里还有一个 Kafka 如何使用消息的示例
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": "{\"widget\": { \"debug\": \"on\", \"window\": { \"title\": \"Sample Konfabulator Widget\", \"name\": \"main_window\", \"width\": 500, \"height\": 500 }, \"image\": { \"src\": \"Images/Sun.png\", \"name\": \"sun1\", \"hOffset\": 250, \"vOffset\": 250, \"alignment\": \"center\" }, \"text\": { \"data\": \"Click Here\", \"size\": 36, \"style\": \"bold\", \"name\": \"text1\", \"hOffset\": 250, \"vOffset\": 100, \"alignment\": \"center\", \"onMouseUp\": \"sun1.opacity = 39\"} }}\n"
}
如果需要任何其他信息,请告诉我 任何帮助将不胜感激。
更新:最终最好的解决方案是能够以某种方式将连接器配置为不转义有效负载中的引号。 同样不幸的是,转义引号是从activeMQ本身生成的,不是初始消息的一部分
所以消息看起来像这样
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": {
"widget": {
"debug": "on",
"window": {
"title": "Sample Konfabulator Widget",
"name": "main_window",
"width": 500,
"height": 500
},
"image": {
"src": "Images/Sun.png",
"name": "sun1",
"hOffset": 250,
"vOffset": 250,
"alignment":
"center"
}
}
【问题讨论】:
-
您可能想在Confluent community slack 上宣传您的问题。有一个#connect 频道。也许在 SO 寻求帮助时发布一个指向您的问题的链接
标签: apache-kafka activemq apache-kafka-connect ksqldb