【问题标题】:How to perform operations on kstreams with null keys?如何使用空键对 kstream 执行操作?
【发布时间】:2018-10-26 10:48:18
【问题描述】:

我有一个这样的 kafka 流对象:

[KSTREAM-SOURCE-0000000000]: null, {"id": 1, "name": "john", "age": 26, "updated_at": 1525774480752}
[KSTREAM-SOURCE-0000000000]: null, {"id": 2, "name": "jane", "age": 24, "updated_at": 1525774480784}
[KSTREAM-SOURCE-0000000000]: null, {"id": 3, "name": "julia", "age": 25, "updated_at": 1525774480827}

当我尝试执行地图操作时:

KStream<String, mysql> locationsStream = pgsql_users.map((k, v) -> new KeyValue<String, mysql>(k.toString(), new mysql ((Integer) v.get("id"), (String) v.get("name").toString(), (Integer) v.get("age")) ));

我遇到了一个例外:

[KSTREAM-SOURCE-0000000000]: null, {"id": 1, "name": "john", "age": 26, "updated_at": 1525774480752}

线程异常 “测试应用程序-0d15a00c-51dc-4c1a-8293-82f47fc7ef90-StreamThread-1” java.lang.NullPointerException 在 com.aail.kafka_stream.lambda$main$0(kafka_stream.java:86) 在 org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) 在 org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) 在 org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) 在 org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) 在 org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) 在 org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) 在 org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) 在 org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) 在 org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) 在 org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) 在 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) 在 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

我想我得到这个是因为密钥为空。谁能帮我解决这个问题。

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api apache-kafka-streams


    【解决方案1】:

    您正在使用k.toString(),但k 为空。

    您可能希望按原样使用k,在新密钥中再次使用null,或者如果您确实想要单词"null",则使用String.valueOf(k)(不推荐,因为这会将所有消息发送到单个分区)

    此外,由于键为空,您可以将byte[] 设置为流的键对象。无需使用反序列化器

    【讨论】:

      猜你喜欢
      • 2011-03-30
      • 2017-11-17
      • 1970-01-01
      • 2015-07-14
      • 1970-01-01
      • 2017-10-29
      • 1970-01-01
      • 2011-08-04
      • 1970-01-01
      相关资源
      最近更新 更多