【问题标题】:Kafka Connect S3 Sink Connector Partitioning large topics by id fieldKafka Connect S3 Sink Connector 按 id 字段对大型主题进行分区
【发布时间】:2021-03-16 18:25:11
【问题描述】:

过去几周,我们一直致力于将 Kafka Connect 添加到我们的数据平台,并认为这将是一种将数据从 Kafka 提取到 S3 数据湖中的有用方法。我们已经使用 FieldPartitioner 和 TimeBasePartitioner 并看到了一些相当不错的结果。

我们还需要按用户 ID 进行分区 - 但尝试在用户 ID 字段上使用 FieldPartitioner 后,连接器速度非常慢 - 尤其是与按日期分区等相比。我知道按 ID 分区会产生很多输出分区的数量,因此不会那么快 - 这很好,但它需要能够跟上生产者的步伐。

到目前为止,我们已经尝试增加内存和堆 - 但我们通常不会看到任何内存问题,除非我们将 flush.size 提高到一个很大的数字。我们还尝试了小刷新大小、非常小和大的 rotate.schedule.interval.ms 配置。我们还研究了网络,但这似乎很好 - 使用其他分区器网络保持正常。

在可能浪费大量时间之前,是否有人尝试或成功地使用 S3 Sink 连接器按 id 字段进行分区,尤其是在较大的主题上?或者有没有人在配置或设置方面有任何建议,可能是一个不错的地方?

【问题讨论】:

  • 你有多少任务和工人?我假设问题是每个任务都在处理高基数的 id(与只有几个分区或日期分区窗口内顺序增加的日期相比),并且没有很快达到任何给定 ID 分区的刷新大小
  • 嘿@OneCricketeer,感谢您的评论。在一个主题上,我们有三个分区,我将 max.tasks 设置为 3。我们还有 3 个处于分布式模式的连接工作人员。我怀疑刷新大小相同,并尝试将刷新大小设置为 1,但仍然看到吞吐量问题。
  • 那么,哪一部分实际上很慢?上传文件还是消费?换句话说,您是否在监控消费者滞后?从我从具有非常大的生产者数量和大刷新大小的连接器中看到的是,连接批处理数据一段时间,然后提交数千个偏移量,但所有数据在 S3 中似乎都很好;有时到达那里很慢
  • 我所说的慢是指消费者似乎永远无法跟上生产者的步伐。消费者很难通过当前主题中的数据,并且随着时间的推移滞后缓慢增长。与同一主题相比,使用 TimeBasedPartitioner 我们观察到延迟在几个小时内稳步减少。你是什​​么意思它有时到达那里很慢?欣赏 cmets!
  • 数据上传到 S3 很慢。但消费者确实在工作。只有在实际上传文件后才会提交偏移量(因此延迟只会减少)。这是仅向 S3 交付一次的副产品

标签: apache-kafka apache-kafka-connect s3-kafka-connector


【解决方案1】:

我不习惯 Kafka 的连接器,但我至少会尝试提供帮助。

我不知道您是否可以将连接器配置为 kafka 主题的分区级别;我假设这里有一些方法可以做到这一点。

一种可能的方法是关注您的客户向 Kafka 代理生产的步骤。我的建议是实现您自己的Partitioner,以便“进一步”控制您希望在 kafka 方面发送数据的位置。

这是您的自定义分区器的示例/简化。例如,您的生产者发送的key 具有以下格式:id_name_date。此自定义分区器尝试提取第一个元素 (id),然后选择所需的分区。

public class IdPartitioner implements Partitioner 
{       
   @Override
   public int partition(String topic, Object key, byte[] kb, 
                        Object v, byte[] vb, Cluster cl) 
   {
       try 
       {
         String pKey= (String) key;
         int id = Integer.parseInt(pKey.substring(0,pKey.indexOf("_")));
        
          /* getPartitionForId would decide which partition number corresponds
           for the received ID.You could also implement the logic directly here.*/

         return getPartitionForId(id);
       }
       catch (Exception e)
       {return 0;}
   }

   @Override
   public void close() 
   {
     //maybe some work here if needed
   }
}

即使您可能需要在KafkaConnect 方面进行更多调整,我相信此选项可能会有所帮助。假设您有一个包含 5 个分区的主题,并且 getPartitionForId 只检查 ID 的第一个数字以确定分区(为简化目的,最小 ID 为 100,最大 ID 为 599) .

所以如果接收到的key是f.e:123_tempdata_20201203,那么partition方法会返回0,也就是第一个partition。

(图片显示的是 P1 而不是 P0,因为我认为这样的示例看起来更自然,但请注意,第一个分区实际上定义为 partition 0 。老实说,我忘记了 P0 而画了这个并没有保存模板,所以我只好找借口,比如:看起来更自然)。

这基本上是在 S3 上传之前的预调整住宿

我知道这可能不是理想的答案,因为我不知道您系统的确切规格。 我的猜测是有可能将主题分区直接指向 s3 位置

如果没有可能这样做,至少我希望这能给你一些进一步的想法。干杯!

【讨论】:

  • 默认 S3 分区器使用 TopicPartitions
  • 嘿@aran!欣赏详细的帖子,并将对此进行研究。
猜你喜欢
  • 2021-10-27
  • 2017-09-13
  • 1970-01-01
  • 2021-02-02
  • 2022-07-12
  • 2019-09-21
  • 2020-06-03
  • 2021-07-15
  • 2021-07-26
相关资源
最近更新 更多