【问题标题】:How to discover and filter out duplicate records in Kafka Streams如何发现和过滤 Kafka Streams 中的重复记录
【发布时间】:2018-09-26 21:53:01
【问题描述】:

假设你有一个带有空键的主题,值为

{id:1, name:Chris, age:99}

假设您想按姓名计算人数。您可以执行以下操作:

nameStream.groupBy((key,value) -> value.getName())
           .count();

现在让我们说它是有效的,你可以获得重复的记录,你可以根据 id 判断它是重复的。

例如:

{id:1, name:Chris, age:99}
{id:1, name:Chris, age:xx}

应该导致计数为 1 和

   {id:1, name:Chris, age:99}
   {id:2, name:Chris, age:xx}

应该导致计数为 2。

您将如何实现这一目标?我认为 reduce 会起作用,但误解了它的工作原理。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    您可以使用多个属性进行分组。通过连接创建自定义键并作为键传递:

    KTable<String,String> modifiedTable =  nameStream.groupBy((key,value) -> value.getName()+value.getId()).reduce((aggVal,newval) -> aggVal);
    

    KTable 上方将为具有给定名称和 ID 的任何记录提供更新状态。 所以对于{id:1,name:Chris.....},它在KTable中只有一条记录:

    虽然在以下情况下,两条记录都会出现:

    <Chris1,  {id:1, name:Chris, age:99}> 
    <Chris2,   {id:2, name:Chris, age:xx}> 
    

    现在您想使用名称属性进行计数操作。因此,将键更改为 name 并重新分组表并执行 count()。

    KTable countTable = modifiedTable.groupBy((k,v)-> KeyValue.pair(v.getName(), v)).count();
    

    这里 count() 将在 KTable 之上执行。 KTable 是任何给定 ID 的更新视图。
    因此,对于以下输入,modifiedTable 一次将有 1 条记录作为键“Chris1”的更新值,您将获得 count=>1

    <Chris,1> // Here key will be Chris1
    

    以下输入将产生 **count=>2

    {id:1, name:Chris, age:99}  // Here key was be Chris1
    {id:2, name:Chris, age:xx}  // Here key was be Chris2
    

    【讨论】:

    • 谢谢尼修。为什么我们需要reduce?此外,如果我们有多个分区/线程,我不确定这是否可行。 Chris1 和 Chris2 现在可以散列到不同的线程,导致计数不正确。
    • 您可以使用 aggregate() 或 reduce()。由于未修改值类型,因此 reduce() 将起作用。如果需要修改值类型,请使用aggregate()。没错,Chris1 和 Chris2 可以位于不同的分区中。但在执行 count() 之前,有一个名为 name 属性的 groupBy() 操作。这将确保您从所有分区中获取给定键的所有事件。
    • 是的,这实际上是我所做的。 :-) 开始明白了。
    • 他们的键(不是双关语)是使用名称 id 和主题键的组合。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-12-07
    • 2019-09-05
    • 1970-01-01
    • 2020-02-18
    • 2017-10-31
    • 2015-11-20
    相关资源
    最近更新 更多