【问题标题】:Reading peek topic from kafka streams从 kafka 流中读取 peek 主题
【发布时间】:2020-06-11 16:54:25
【问题描述】:

我有一个主题名称 push-processing-KSTREAM-PEEK-0000000014-repartition,这是 kafka 的内部主题。我没有创建这个主题,我在重新分区后使用.peek() 方法并使用 peek 方法 3-4 次。

我的问题是我可以阅读主题topic read push-processing-KSTREAM-PEEK-0000000014-repartition,但是当我说topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning 时我无法阅读。

这个内部话题是因为peek 方法而创建的,对吧?

或者它与其他重新分区流代码有关,但它的名字是KSTREEAM-PEEK

它有 50 个分区。因为peek是无状态操作,所以不应该创建内部主题,但是为什么名称和peek相关,为什么我不能从头开始阅读?

有什么建议/

这是第一个拓扑:

   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [appconnect_deviceIds_exported_for_push])
      --> KSTREAM-FLATMAP-0000000004
    Processor: KSTREAM-FLATMAP-0000000004 (stores: [])
      --> KSTREAM-PEEK-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-PEEK-0000000005 (stores: [])
      --> KSTREAM-FILTER-0000000007
      <-- KSTREAM-FLATMAP-0000000004
    Processor: KSTREAM-FILTER-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-PEEK-0000000005
    Sink: KSTREAM-SINK-0000000006 (topic: KSTREAM-PEEK-0000000005-repartition)
      <-- KSTREAM-FILTER-0000000007

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000008 (topics: [KSTREAM-PEEK-0000000005-repartition])
      --> KSTREAM-JOIN-0000000009
    Source: KSTREAM-SOURCE-0000000028 (topics: [KSTREAM-PEEK-0000000025-repartition])
      --> KSTREAM-JOIN-0000000029
    Processor: KSTREAM-JOIN-0000000009 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> KSTREAM-MAP-0000000010
      <-- KSTREAM-SOURCE-0000000008
    Processor: KSTREAM-JOIN-0000000029 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> KSTREAM-PEEK-0000000030
      <-- KSTREAM-SOURCE-0000000028
    Processor: KSTREAM-MAP-0000000010 (stores: [])
      --> KSTREAM-PEEK-0000000011
      <-- KSTREAM-JOIN-0000000009
    Processor: KSTREAM-PEEK-0000000030 (stores: [])
      --> KSTREAM-MAP-0000000031
      <-- KSTREAM-JOIN-0000000029
    Processor: KSTREAM-MAP-0000000031 (stores: [])
      --> KSTREAM-SINK-0000000032
      <-- KSTREAM-PEEK-0000000030
    Processor: KSTREAM-PEEK-0000000011 (stores: [])
      --> KSTREAM-SINK-0000000012
      <-- KSTREAM-MAP-0000000010
    Source: KSTREAM-SOURCE-0000000002 (topics: [appconnect_device_stream])
      --> KTABLE-SOURCE-0000000003
    Sink: KSTREAM-SINK-0000000012 (topic: appconnect_devices_exported_for_push)
      <-- KSTREAM-PEEK-0000000011
    Sink: KSTREAM-SINK-0000000032 (topic: appconnect_devices_exported_for_push)
      <-- KSTREAM-MAP-0000000031
    Processor: KTABLE-SOURCE-0000000003 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> none
      <-- KSTREAM-SOURCE-0000000002

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000013 (topics: [appconnect_userIds_exported_for_push])
      --> KSTREAM-FLATMAP-0000000017
    Processor: KSTREAM-FLATMAP-0000000017 (stores: [])
      --> KSTREAM-PEEK-0000000018
      <-- KSTREAM-SOURCE-0000000013
    Processor: KSTREAM-PEEK-0000000018 (stores: [])
      --> KSTREAM-FILTER-0000000020
      <-- KSTREAM-FLATMAP-0000000017
    Processor: KSTREAM-FILTER-0000000020 (stores: [])
      --> KSTREAM-SINK-0000000019
      <-- KSTREAM-PEEK-0000000018
    Sink: KSTREAM-SINK-0000000019 (topic: KSTREAM-PEEK-0000000018-repartition)
      <-- KSTREAM-FILTER-0000000020

  Sub-topology: 3
    Source: KSTREAM-SOURCE-0000000021 (topics: [KSTREAM-PEEK-0000000018-repartition])
      --> KSTREAM-JOIN-0000000022
    Processor: KSTREAM-JOIN-0000000022 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
      --> KSTREAM-PEEK-0000000023
      <-- KSTREAM-SOURCE-0000000021
    Processor: KSTREAM-PEEK-0000000023 (stores: [])
      --> KSTREAM-MAP-0000000024
      <-- KSTREAM-JOIN-0000000022
    Processor: KSTREAM-MAP-0000000024 (stores: [])
      --> KSTREAM-PEEK-0000000025
      <-- KSTREAM-PEEK-0000000023
    Processor: KSTREAM-PEEK-0000000025 (stores: [])
      --> KSTREAM-FILTER-0000000027
      <-- KSTREAM-MAP-0000000024
    Processor: KSTREAM-FILTER-0000000027 (stores: [])
      --> KSTREAM-SINK-0000000026
      <-- KSTREAM-PEEK-0000000025
    Source: KSTREAM-SOURCE-0000000015 (topics: [appconnect_user_stream])
      --> KTABLE-SOURCE-0000000016
    Sink: KSTREAM-SINK-0000000026 (topic: KSTREAM-PEEK-0000000025-repartition)
      <-- KSTREAM-FILTER-0000000027
    Processor: KTABLE-SOURCE-0000000016 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
      --> none
      <-- KSTREAM-SOURCE-0000000015

这是第二步,

   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-PEEK-0000000014-repartition])
      --> KSTREAM-JOIN-0000000018
    Processor: KSTREAM-JOIN-0000000018 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
      --> KSTREAM-FILTER-0000000019
      <-- KSTREAM-SOURCE-0000000017
    Processor: KSTREAM-FILTER-0000000019 (stores: [])
      --> KSTREAM-SINK-0000000020
      <-- KSTREAM-JOIN-0000000018
    Source: KSTREAM-SOURCE-0000000001 (topics: [appconnect_push_processing_submissions])
      --> KTABLE-SOURCE-0000000002
    Sink: KSTREAM-SINK-0000000020 (topic: appconnect_push_send_bulk)
      <-- KSTREAM-FILTER-0000000019
    Processor: KTABLE-SOURCE-0000000002 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
      --> none
      <-- KSTREAM-SOURCE-0000000001

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000003 (topics: [appconnect_devices_exported_for_push])
      --> KSTREAM-MAP-0000000007
    Processor: KSTREAM-MAP-0000000007 (stores: [])
      --> KSTREAM-PEEK-0000000008
      <-- KSTREAM-SOURCE-0000000003
    Processor: KSTREAM-PEEK-0000000008 (stores: [])
      --> KSTREAM-FILTER-0000000010
      <-- KSTREAM-MAP-0000000007
    Processor: KSTREAM-FILTER-0000000010 (stores: [])
      --> KSTREAM-SINK-0000000009
      <-- KSTREAM-PEEK-0000000008
    Sink: KSTREAM-SINK-0000000009 (topic: KSTREAM-PEEK-0000000008-repartition)
      <-- KSTREAM-FILTER-0000000010

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000011 (topics: [KSTREAM-PEEK-0000000008-repartition])
      --> KSTREAM-LEFTJOIN-0000000012
    Processor: KSTREAM-LEFTJOIN-0000000012 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
      --> KSTREAM-KEY-SELECT-0000000013
      <-- KSTREAM-SOURCE-0000000011
    Processor: KSTREAM-KEY-SELECT-0000000013 (stores: [])
      --> KSTREAM-PEEK-0000000014
      <-- KSTREAM-LEFTJOIN-0000000012
    Processor: KSTREAM-PEEK-0000000014 (stores: [])
      --> KSTREAM-FILTER-0000000016
      <-- KSTREAM-KEY-SELECT-0000000013
    Processor: KSTREAM-FILTER-0000000016 (stores: [])
      --> KSTREAM-SINK-0000000015
      <-- KSTREAM-PEEK-0000000014
    Source: KSTREAM-SOURCE-0000000005 (topics: [appconnect_user_stream])
      --> KTABLE-SOURCE-0000000006
    Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-PEEK-0000000014-repartition)
      <-- KSTREAM-FILTER-0000000016
    Processor: KTABLE-SOURCE-0000000006 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
      --> none
      <-- KSTREAM-SOURCE-0000000005

而且我所有的这些操作都使用相同的 KEY。对于所有主题,我有 5 个代理和 50 个分区。我有 10 个并发,我将我的应用程序扩展到 5。但就像我说的那样,我正在对同一个键进行 3-4 次重新分区和传输数据。这意味着我所有与 flatMap 相关的值,映射操作都转到同一个分区。 1 或 2 次我使用不同的密钥,因此消息分发到不同的分区,只有 1-2 次。这会影响我的表现吗?或者我绝对应该分布在不同的分区上以提高我的性能。

因此,基本上 kafka 在仅使用主题之间的分区执行 3-4 次连接或重新分区操作时表现出更好的性能,因为 kafka 将仅从一个分区读取,并且实际上知道在哪里读取并立即读取所有数据因为磁盘上物理并行的数据(我的意思是ssd或hdd)。或者我的第二种情况;我肯定应该使用更多的分区来在分区之间并行读取?

而且我还认为使用 peek 会减慢我的进程。

【问题讨论】:

  • 抱歉,我已经编辑了我的问题并添加了拓扑

标签: apache-kafka apache-kafka-streams


【解决方案1】:

peek() 操作无关。查看您发布的程序的拓扑描述(部分)如下:

KStream inputUser = builder.stream().flatMap().peek().filter();
KStream inputDevice = builder.stream().flatMap().peek().filter();
inputUser.join(inputDevice,...)

(如果您也将代码发布在问题中会更容易)。

因为您调用flatMap() Kafka Streams 假定您更改了密钥,因此调用join() 会触发数据重新分区。重新分区主题名称由上游 operatore 生成(我不是 100% 确定为什么选择 PEEK 而不是 FILTER 是公平的。)

而且我所有这些操作都使用相同的 KEY。

对于这种情况,您可能希望使用flatMapValues() 而不是flatMap()。对于这种情况,Kafka Streams 知道密钥没有改变,因此它不会创建重新分区主题。

同样,如果密钥未更改以避免不必要的重新分区,您可能希望使用 mapValues() 而不是 map()

我的问题是我可以阅读主题“主题阅读推送处理-KSTREAM-PEEK-0000000014-repartition”,但当我说“主题阅读推送处理-KSTREAM-PEEK-0000000014-repartition--从头开始”

我不确定你的意思。是什么

当我说“主题读取 push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning”时

是什么意思?你是指命令行工具bin/kafka-consumer.sh吗?一般来说,是的,您可以从重新分区主题中读取,但我不确定为什么这会有用?

【讨论】:

  • 是的,我的意思是 bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic test --from-beginning。我无法从一开始就阅读内部主题。根据我的逻辑,我不能使用 flatMapValues 代替 flatMap。我的操作是这样的。 .flatMap().peek().join().map().peek().to(y topic)...然后我正在使用 kafka-streams 听那个 y 主题,就像听 y 主题和做.peek().leftJoin().selectKey().peek().join().filter().to() 我的操作就完成了。我担心所有选择新键操作的人都使用相同的键。
  • 我可以将我的消息(大约 2-3 百万条记录)分发到不同的分区,但仅使用分区会更快吗?我对 RocksDB 和 kafka 磁盘操作以及它们如何针对 1 个分区或多个分区工作感到好奇。根据我的拓扑;我的所有数据都进入同一个分区,使用 1 个分区时 KStream 是否做得更好,或者我应该明确使用分布式分区?因为我正在做 3-4 次重新分区并且只选择 1 个相同的键。所以我是否通过不分散我的数据来阻止这个过程。我不确定哪种策略更快
  • I can't read internal topics -from -beginning -- 究竟是什么问题?你得到什么错误?你为什么要开始这样做? -- 为什么不能使用flatMapValues()? -- 如果你说“相同的键”,你的意思是你不修改输入键吗?还是您的意思是所有消息都映射到一个键?
  • 所有消息都映射到一个键,所以它们都去同一个分区,大约有 2-3 百万条记录。我必须做一个连接操作,所以我没有太多选择。问题是当我说“bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic push-processing-KSTREAM-PEEK-0000000014-repartition”时没关系,但是当我添加 - - 从一开始我就无法阅读。我很好奇为什么我不能从内部主题开始阅读。
  • yes sir my question is, does kafka speed up my process if I distribute my data to different partitions -- 如果所有键都相同,则不会发生这种情况,而且这样做也是不正确的。 -- 如果您创建的主题有超过 1 个分区,则只会使用 1 个分区,因此您不会看到任何性能提升
猜你喜欢
  • 1970-01-01
  • 2016-02-28
  • 2015-08-24
  • 2019-06-24
  • 2023-03-18
  • 2021-03-01
  • 1970-01-01
  • 2021-09-12
相关资源
最近更新 更多