【发布时间】:2018-10-29 05:17:45
【问题描述】:
我正在尝试加入一个
- KStream:从主题创建,主题具有JSON值。我使用
两个归因于价值。示例值(json 的 sn-p)。我创建了一个自定义 pojo 类并使用自定义 serdes.
{"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}
键映射为:
map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))
我查看了 KStream 并打印了两个密钥 以及我使用的属性。 看起来都不错。
- KTable:我从一个主题创建一个 ktable,我正在使用 python 脚本写入主题,主题的键是
KYZ1ifHCInOctets,设备名称和指标名称的组合(来自上面)。 我做一个 toStream 然后查看生成的流。键和值似乎 很好。
现在,当我进行内部连接并查看或通过/查看某个主题时,我看到键和值不匹配。加入似乎不起作用,
KStream<String, MyPojoClass> joined= datastream.join(table,
(data,table)->data
,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
);
key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}
我通过 ksql 也有同样的事情,但想做我自己的流应用程序。
【问题讨论】:
-
“看到键值不匹配”是什么意思?另外,您是否会遇到时间戳同步问题?请注意,KTable 是根据更改日志记录时间戳构建的,并且构建表的进度与处理流的时间进度同步。没有“首先加载表”然后开始处理的概念。 (在即将发布的 2.1 版本中对此进行了改进)
-
键是从值派生的(设备和指标名称的组合)。所以初始流中的数据类似于 Key=XYZ1s1_TotalDiscards, device = XYZ1, indicator =s1_TotalDiscards。如果您注意到关键是设备和指示器的串联。但在加入后,结果流不匹配,数据类似于 key = XYZ1s1_totalHCOctets, device = XYZ1, indicator =ifOutDiscards
-
@MatthiasJ.Sax 我解释了吗?
-
是的。如果潜在的对象重用,我想到的唯一想法。您可以尝试通过
(data,table)->""+data复制您在连接中计算的输出吗?老实说,这是一个黑暗的镜头...... -
@MatthiasJ.Sax 我注意到在流中重新键入数据后看起来一切都很好,但是当流被写入主题并重新读取键和值时不再匹配。加入也是如此。不确定这可能是什么,如果底层 kafka 是版本 0.11.0.2 而 kafka 流代码是使用 kafka 2.0 编写的。
标签: apache-kafka apache-kafka-streams ksqldb