【问题标题】:Need to send records individually using Apache NiFi to Kafka需要使用 Apache NiFi 将记录单独发送到 Kafka
【发布时间】:2019-08-28 22:34:18
【问题描述】:

我正在从 nifi 用户日志文件本身读取流/批处理格式的记录以用于测试场景,并使用 ConvertRecord nifi 处理器转换为 Avro 格式,然后使用 PublishKafka_2_0 处理器发布到 kafka 主题。我还在使用 Schema 注册表进行模式构造。因此,现在当记录转储到 kafka 时,它是作为单个数组而不是单个记录完成的。有没有办法将记录分解为单个实体。我已经尝试过拆分记录处理器,它似乎没有达到预期的效果。有没有其他方法可以实现。

--更新

【问题讨论】:

  • 您在 Avro 转换之前要拆分吗?您的架构是数组吗?
  • 我使用了 SplitRecord 处理器,在拆分和覆盖中都已完成,因为我使用 grok 阅读器读取并使用 AvroRecordSetWrite 在 Avro 中写入......但它没有用。
  • 您使用哪个 Kafka 处理器发布记录?
  • 另外请描述您的数据的原始格式以及如何将其转换为 Avro 以及架构是什么。
  • 我已经根据你的 cmets 更新了答案,这里是用于相同的模式:{ "type": "record", "name": "nifi_logs", "namespace" :“xyz.abc”,“字段”:[{“名称”:“时间戳”,“类型”:“字符串”},{“名称”:“级别”,“类型”:“字符串”},{“名称”:“线程”,“类型”:“字符串”},{“名称”:“类”,“类型”:“字符串”},{“名称”:“消息”,“类型”:“字符串” }, { "name": "stackTrace", "type": "string" } ] }

标签: apache-kafka schema apache-nifi avro


【解决方案1】:

如需适当的反馈,请分享重现问题的流程,但以下步骤有望满足您的需求:

  • 在发送到Kafka之前直接拆分​​你的消息,并确认拆分确实成功
  • 尝试发布 Kafka 记录 2.0 而不是发布 Kafka 2.0
  • 将最大消息大小设置为 1 字节而不是 1MB?!

【讨论】:

    猜你喜欢
    • 2021-01-14
    • 2018-04-26
    • 2022-09-27
    • 2018-11-07
    • 2019-03-31
    • 2020-04-06
    • 1970-01-01
    • 1970-01-01
    • 2020-04-21
    相关资源
    最近更新 更多