【问题标题】:kafka stream to ktable joinkafka 流到 ktable 连接
【发布时间】:2018-10-29 05:17:45
【问题描述】:

我正在尝试加入一个

  • KStream:从主题创建,主题具有JSON值。我使用 两个归因于价值。示例值(json 的 sn-p)。我创建了一个自定义 pojo 类并使用自定义 serdes.{"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}

键映射为:

map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))

我查看了 KStream 并打印了两个密钥 以及我使用的属性。 看起来都不错。

  • KTable:我从一个主题创建一个 ktable,我正在使用 python 脚本写入主题,主题的键是KYZ1ifHCInOctets,设备名称和指标名称的组合(来自上面)。 我做一个 toStream 然后查看生成的流。键和值似乎 很好。

现在,当我进行内部连接并查看或通过/查看某个主题时,我看到键和值不匹配。加入似乎不起作用,

  KStream<String, MyPojoClass> joined= datastream.join(table, 
          (data,table)->data
          ,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
          );

key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}

我通过 ksql 也有同样的事情,但想做我自己的流应用程序。

【问题讨论】:

  • “看到键值不匹配”是什么意思?另外,您是否会遇到时间戳同步问题?请注意,KTable 是根据更改日志记录时间戳构建的,并且构建表的进度与处理流的时间进度同步。没有“首先加载表”然后开始处理的概念。 (在即将发布的 2.1 版本中对此进行了改进)
  • 键是从值派生的(设备和指标名称的组合)。所以初始流中的数据类似于 Key=XYZ1s1_TotalDiscards, device = XYZ1, indicator =s1_TotalDiscards。如果您注意到关键是设备和指示器的串联。但在加入后,结果流不匹配,数据类似于 key = XYZ1s1_totalHCOctets, device = XYZ1, indicator =ifOutDiscards
  • @MatthiasJ.Sax 我解释了吗?
  • 是的。如果潜在的对象重用,我想到的唯一想法。您可以尝试通过(data,table)-&gt;""+data 复制您在连接中计算的输出吗?老实说,这是一个黑暗的镜头......
  • @MatthiasJ.Sax 我注意到在流中重新键入数据后看起来一切都很好,但是当流被写入主题并重新读取键和值时不再匹配。加入也是如此。不确定这可能是什么,如果底层 kafka 是版本 0.11.0.2 而 kafka 流代码是使用 kafka 2.0 编写的。

标签: apache-kafka apache-kafka-streams ksqldb


【解决方案1】:

现在听起来很愚蠢,错误是什么,我的 PoJo 类几乎没有静态属性:-(,导致错误的键。

【讨论】:

  • 很高兴你明白了:)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-11-04
  • 2017-08-13
  • 1970-01-01
  • 2021-03-13
  • 2021-12-03
  • 2019-03-10
相关资源
最近更新 更多