【问题标题】:kafka connect convertor vs transformskafka 连接转换器与转换
【发布时间】:2017-12-04 21:53:28
【问题描述】:

我正在尝试创建以下工作流

  1. nginx 日志被 kafka 连接器提取并上传到主题
  2. 然后 hdfs 同步连接器将这些日志放入 hdfs
  3. 在 hdfs 数据上使用 Hive 进行分析(例如,按 IP 地址分组的访问次数等)

虽然我可以按照 hive 元存储所需的格式排列 nginx 日志(仅限空格或逗号分隔的必填字段),但我想知道这是否可以在不触及 nginx 日志格式的情况下完成

  1. 使用类似于org.apache.kafka.connect.json.JsonConverter的转换器
  2. 使用单个消息转换

这两种方法都需要自定义实现,而且很少有文档说明如何做同样的事情。

哪一种方法是实现这一目标的正确方法?是否有任何示例可用于在使用 kafka connect 将其写入主题时解析 nginx 日志输出/任何源数据。我正在使用独立的文件连接器。

【问题讨论】:

    标签: nginx apache-kafka apache-kafka-connect


    【解决方案1】:

    Kafka Connect 源连接器负责将消息从源中的表示(例如,nginx 日志消息格式)转换为内存中表示调用 SourceRecord,它使用 Kafka Connect 的 StructSchema 数据结构.然后,Kafka Connect 使用它的 converters 将记录的内存表示转换为实际写入 Kafka 的 byte[] 表示。

    这种职责分离非常重要,因为它允许您混合和匹配功能。写入主题的序列化消息的确切性质可以独立于连接器进行更改。例如,一些开发人员更喜欢使用 JSON 编写数据。许多其他人更喜欢使用通用模式注册表使用 Avro 序列化消息,其组合可以确保所有消息使用特定模式,同时让该模式以向后兼容的方式随时间演变,以便生产者可以演变到新版本的模式而消费者可以在以后的某个时间点适应该模式。使用 Avro 架构和架构注册表提供 tremendous benefits

    底线:不要创建了解上游数据源的自定义转换器。您将陷入过多的耦合,因为byte[] 表示将是自定义的,并且只能由同样知道此特定表示的消费者和应用程序使用。

    如果您需要稍微调整内存中的消息结构,请使用现有的源连接器和Single Message Transforms。在这种情况下,最好创建一个自定义源连接器(可能专门针对现有的基于文件的源连接器),将 nginx 日志消息格式调整为结构化的内存表示。

    【讨论】:

      猜你喜欢
      • 2019-10-03
      • 1970-01-01
      • 2020-06-03
      • 1970-01-01
      • 2019-06-15
      • 2019-05-11
      • 2021-10-16
      • 2017-11-09
      • 2021-04-07
      相关资源
      最近更新 更多