【问题标题】:Read JMS message as json and not text by using Kafka ActiveMQ Source Connector使用 Kafka ActiveMQ 源连接器将 JMS 消息读取为 json 而不是文本
【发布时间】:2020-11-26 12:29:40
【问题描述】:

过去几个月我一直在使用 Kafka Connect 最近我包含了 ActiveMQ 源插件,以便读取一些 JMS 主题消息,其中包含一个 json 文件,将它们放在 kafka 主题中,然后在 Ksqldb 中创建一个流/表,将 json 文件的一些键用作列. 问题是该插件将 JMS 消息作为带双引号的文本插入,因此在 Ksqldb 中无法正确识别。 我在配置中尝试了各种方法来修复它,但到目前为止没有任何效果。 我还想在 kafka connect 中使用 json 格式而不是 Avro(也没有运行模式注册表)。 出于测试目的,我还尝试通过将标头内容指定为“application/json”来发送 JMS 消息,但仍然没有运气。

这是我的 ActiveMQ 插件的样子

 "config": {"connector.class":"ActiveMQSourceConnector", "tasks.max":"1", "kafka.topic":"activemq", "activemq.url":"tcp://localhost:61616","activemq.username":"admin","activemq.password":"admin","jms.destination.name":"topic.2","jms.destination.type":"topic","jms.message.format":"json","jms.message.converter":"org.apache.kafka.connect.json.JsonConverter","confluent.license":"","confluent.topic.bootstrap.servers":"localhost:9092"}}

这是我的 Kafka 连接配置的样子

bootstrap.servers=localhost:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1


config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000

plugin.path=/opt/kafka_2.13-2.5.0/plugins

这里还有一个 Kafka 如何使用消息的示例

{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": "{\"widget\": {     \"debug\": \"on\",    \"window\": {        \"title\": \"Sample Konfabulator Widget\",        \"name\": \"main_window\",        \"width\": 500,        \"height\": 500    },    \"image\": {        \"src\": \"Images/Sun.png\",        \"name\": \"sun1\",        \"hOffset\": 250,        \"vOffset\": 250,        \"alignment\": \"center\"    },    \"text\": {        \"data\": \"Click Here\",        \"size\": 36,        \"style\": \"bold\",        \"name\": \"text1\",        \"hOffset\": 250,        \"vOffset\": 100,        \"alignment\": \"center\",        \"onMouseUp\": \"sun1.opacity = 39\"} }}\n"
}

如果需要任何其他信息,请告诉我 任何帮助将不胜感激。

更新:最终最好的解决方案是能够以某种方式将连接器配置为转义有效负载中的引号。 同样不幸的是,转义引号是从activeMQ本身生成的,不是初始消息的一部分

所以消息看起来像这样

{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": {
   "widget": {
    "debug": "on",
    "window": {
      "title": "Sample Konfabulator Widget",
      "name": "main_window",
      "width": 500,
      "height": 500
    },
    "image": {
     "src": "Images/Sun.png",
     "name": "sun1",
     "hOffset": 250,
     "vOffset": 250,
     "alignment": 
     "center"
    }

}

【问题讨论】:

  • 您可能想在Confluent community slack 上宣传您的问题。有一个#connect 频道。也许在 SO 寻求帮助时发布一个指向您的问题的链接

标签: apache-kafka activemq apache-kafka-connect ksqldb


【解决方案1】:

欢迎 Elen1no1Yami!

在我看来,问题在于消息的 text 字段是一个包含您感兴趣的 JSON 有效负载的字符串,但该有效负载的双引号用 \ 字符转义。

我假设 ActiveMQ 中的数据本身没有 \ 字符,但如果你能澄清这一点会很好。

我认为解决此问题的方法是:

  1. 能够将连接器配置为不对负载中的引号进行转义。使消息看起来更像:
{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": {
   "widget": {
    "debug": "on",
    "window": {
      "title": "Sample Konfabulator Widget",
      "name": "main_window",
      "width": 500,
      "height": 500
    },
    "image": {
     "src": "Images/Sun.png",
     "name": "sun1",
     "hOffset": 250,
     "vOffset": 250,
     "alignment": 
     "center"
    },
    ... etc
}
  1. 或以某种方式让 ksqlDB 处理该消息,因为它仍然可以访问 text 字段中的 JSON。

这是否概括了您正在寻找的内容?如果是这样,请更新您的问题以反映这一点。 (最好在您的问题中包含此类详细信息,以便清楚您在问什么。

至于答案……

  1. 我不是 Connect 专家,因此无法真正发表评论,也无法真正看到 connector's config 的详细信息中可能允许您更改 text 的内容的任何内容。其他对 Connect 有更多了解的人可能会提供更多帮助。

  2. 为了能够访问 ksqlDB 中嵌入/转义的 JSON,您首先需要删除转义。有关使用 ksqlDB 执行此操作的方法,请参见下文

使用 ksqlDB 访问转义的 JSON

在我们可以访问text 中的 JSON 文档之前,我们必须删除转义。

我能想到两种方法:

编写自定义 UDF

最好的方法是 write a custom UDF 'unescape_json` 可以消除转义。

-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
  message STRING
 ) WITH (
  KAFKA_TOPIC=<something>,
  VALUE_FORMAT='KAFKA'
 );


-- Use custom UDF to process this and write it back as a properly formatted JSON document:
CREATE STREAM JSONIFIED AS
  SELECT MY_CUSTOM_UDF(message) FROM RAW;

如果编写正确,自定义 UDF 方法将不会遭受基于 REPLACE 的解决方案所遭受的潜在数据损坏问题。

使用REPLACE 去除转义

注意:这个解决方案很脆弱:字符替换可以匹配和替换不应该匹配的内容,具体取决于您的消息内容!

让我们使用更简单的测试数据来解释需要什么,例如我们要转换:

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": "{\"widget\": 10}"
}

收件人:

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": {"widget": 10}
}

这需要三件事:

  1. 将开口"text": "{ 替换为"text": {
  2. 将所有\" 替换为"
  3. 将关闭}" 替换为}

我们可以使用REPLACE 函数来做到这一点,或者REGEXP_REPLACE 函数:

-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
  message STRING
 ) WITH (
  KAFKA_TOPIC=<something>,
  VALUE_FORMAT='KAFKA'
 );


-- Use REPLACE to remove reformat:
CREATE STREAM JSONIFIED AS 
  SELECT 
    REPLACE(
      REPLACE(
        REPLACE(message, 
          '"text": "{', '"text": {'), 
          '\"', '"'), 
          '"}', '}')
  FROM RAW;

当然,如果您的数据包含以下任何搜索词:"text": "{\""},则此解决方案可能会损坏您的数据,例如

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": "{\"widget\": \"hello \\\"} world\"}"
}

会被错误地转换为

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": {"widget": "hello \\}world"}
}

这就是为什么自定义 UDF 更可取的原因。

一旦您更正了输入内容(并将其写入新主题),您就可以照常导入数据:

CREATE STREAM DATA (
   messageId STRING,
   text STRUCT<Widget INT>
 ) WITH (
   kafka_topic='JSONIFIED',
   value_format='JSON'
 );

【讨论】:

  • 感谢您的回复。我更新了我的问题,以便更清楚地了解我想要什么。解决方案1)是最可取的,但我也不知道该怎么做。我也在连接器的配置页面中搜索过,但不幸的是没有找到任何有用的东西。如果没有其他方法,我将尝试解决方案 2)。
猜你喜欢
  • 2011-12-25
  • 2018-12-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-12-24
  • 2015-06-16
  • 1970-01-01
  • 2013-01-24
相关资源
最近更新 更多