【问题标题】:KafkaStreams Left Join DSL: inserting on outer null valueKafkaStreams Left Join DSL:插入外部空值
【发布时间】:2018-06-13 16:01:29
【问题描述】:

我有一个混合匹配的 DSL-PAPI 拓扑。 DSL 部分将页面浏览量(“页面浏览量”主题)与这些页面浏览量的用户(“用户”主题)连接起来。我想加入两者,所以如果用户是新用户,则从 pvs 信息中创建一个新的“用户”到“用户”主题中,否则什么都不做。

所以我试图在页面浏览量和用户之间进行左连接,如果用户为空,这意味着尚未使用此键创建用户,所以在这种情况下我创建一个。

在代码中,我将页面浏览量作为流和用户作为表,当用户在加入中为空时加入它们产生新用户,然后过滤并将这些新用户发送给“用户”。

    val builder = new StreamsBuilder()
    val pageviewsTopic: KStream[Key, Pageview] = builder.stream("pageviews")
      .map((muipk, pageview) => (new MerchantUserPartitionKey(muipk.merchantSiteId, muipk.uid) -> pageview))

    val usersTopic: KTable[MerchantUserPartitionKey, user] = builder.table("users")

    val joinedPageviewsWithUsers: KStream[MerchantUserPartitionKey, User] =
      pageviewsTopic.leftJoin(
        usersTopic,
        new ValueJoiner[Pageview, User, User] {
          override def apply(pageview: Pageview, user: User): User = {
            logger.info("JOIN PAGEVIEW-user")
            if (user == null) {
              new User(UUIDUtils.generateRandomId(), pageview.uid /*, some other data */)
            } else {
              logger.info("user already created.")
              null
            }
          }
        })
    // Generate users.
    joinedPageviewsWithUsers.
      filter((key, user) => user != null ).
      to("users")

生成的 DSL 拓扑如下所示:

  Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [pageviews])
      --> KSTREAM-MAP-0000000001
    Processor: KSTREAM-MAP-0000000001 (stores: [])
      --> KSTREAM-FILTER-0000000006
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FILTER-0000000006 (stores: [])
      --> KSTREAM-SINK-0000000005
      <-- KSTREAM-MAP-0000000001
    Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-MAP-0000000001-repartition)
      <-- KSTREAM-FILTER-0000000006
  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-MAP-0000000001-repartition])
      --> KSTREAM-LEFTJOIN-0000000008
    Processor: KSTREAM-LEFTJOIN-0000000008 (stores: [users-STATE-STORE-0000000002])
      --> KSTREAM-FILTER-0000000009
      <-- KSTREAM-SOURCE-0000000007
    Processor: KSTREAM-FILTER-0000000009 (stores: [])
      --> KSTREAM-SINK-0000000010
      <-- KSTREAM-LEFTJOIN-0000000008
    Source: KSTREAM-SOURCE-0000000003 (topics: [users])
      --> KTABLE-SOURCE-0000000004
    Sink: KSTREAM-SINK-0000000010 (topic: users)
      <-- KSTREAM-FILTER-0000000009
    Processor: KTABLE-SOURCE-0000000004 (stores: [user-STATE-STORE-0000000002])
      --> none
      <-- KSTREAM-SOURCE-0000000003

但是,当对具有相同键的多个综合浏览量运行此操作时,“用户”会创建新用户,但它总是以“空”连接。因此,看起来商店没有使用“用户”主题中新生成的数据进行更新,即使它使用user-STATE-STORE-0000000002 显示也是如此。

您是否需要做一些额外的事情才能将数据导入存储?这是否是某种 KafkaStreams 反模式(写入您之前加入的主题)?

更新更多信息:

  • 键不为空
  • ValueJoiner 代码已执行(显示打印输出),只有该用户值始终为空。
  • 用户被写入“用户”主题(在这种情况下,根据逻辑,它每次进入 ValueJoiner 时都会这样做,因为它总是发现外部值为空,因此会将用户插入“用户”)

【问题讨论】:

  • 您有任何null 密钥吗?你确认apply 被调用了吗?您是否验证数据是否写入主题“用户”?
  • 在更新中回答澄清(基本上是“否”、“是”、“是”)
  • 你能检查一下主题“用户”的滞后——KTable 赶上了吗?您可以使用交互式查询来检查表是否已更新?
  • 抱歉,已经重建集群并重新使用 PAPI。但是,根据您的回答,我知道这是支持的,是吗?用户主题滞后:在检查应用程序消费者组时,该主题向 StateStore 提供的消费如何显示?另外,我在这里没有使用 IQ。你的假设是什么?
  • 是的,这应该可以按预期工作。不确定atm可能是什么问题。如果您将某个主题读取为 KTable,则可以像监视任何其他主题(application.id == group.id)一样监控滞后。

标签: scala apache-kafka apache-kafka-streams rocksdb


【解决方案1】:

当一个流在一个子拓扑中查找另一个子拓扑中的表时,可能会涉及到常规的消费/生产延迟。例如,当您直接从主题定义流或表时,就会发生这种情况。如果您可以使用更有意义的指令,例如 through(写入主题但让拓扑知道它仍将用于此拓扑),它将帮助 KafkaStreams 了解这种关系是如何存在的。

【讨论】:

    猜你喜欢
    • 2020-10-11
    • 2012-08-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-13
    • 1970-01-01
    • 2021-06-20
    • 1970-01-01
    相关资源
    最近更新 更多