【问题标题】:NiFi Flowfile Attributes from KafkaConsumer来自 KafkaConsumer 的 NiFi 流文件属性
【发布时间】:2017-11-11 21:59:00
【问题描述】:

我一直在尝试从 Spark Streaming 中的 Kafka 消息中访问 NiFi Flowfile 属性。我使用 Java 作为语言。

场景是 NiFI 使用 GetSFTP 处理器从 FTP 位置读取二进制文件,并使用 publishKafka 处理器将 byte[] 消息发布到 Kafka。使用 Spark Streaming 作业将这些 byte[] 属性转换为 ASCII 数据,并将这些解码后的 ASCII 写入 Kafka 进行进一步处理,并使用 NiFi 处理器保存到 HDFS。

我的问题是我无法跟踪二进制文件名和解码的 ASCII 文件。我必须在解码的 ASCII 中添加一个标题部分(用于文件名、文件大小、记录数等),但我无法弄清楚如何从 KafkaConsumer 对象的 NiFi Flowfile 访问文件名。有没有办法可以使用标准的 NiFi 处理器来做到这一点?或者请分享任何其他建议以实现此功能。谢谢。

【问题讨论】:

    标签: java apache-spark apache-kafka kafka-consumer-api apache-nifi


    【解决方案1】:

    所以你的数据流是:

    FTP -> NiFi -> Kafka -> Spark Streaming -> Kafka -> NiFi -> HDFS ?

    目前 Kafka 在每条消息上都没有元数据属性(尽管我相信这可能会在 Kafka 0.11 中出现),因此当 NiFi 向主题发布消息时,它目前无法通过流文件属性传递消息。

    您必须构建某种类型的包装数据格式(可能是 JSON 或 Avro),其中包含原始内容 + 您需要的附加属性,以便您可以将整个内容作为一条消息的内容发布到 Kafka。

    另外,我不知道您在 Spark 流式传输工作中到底在做什么,但您是否有理由不能只在 NiFi 中做这部分工作?这听起来并不像涉及窗口或连接的任何复杂操作,因此您可以稍微简化一些事情并让 NiFi 进行解码,然后让 NiFi 将其写入 Kafka 并写入 HDFS。

    【讨论】:

    • 是的,这是数据流。我希望元数据属性在 Kafka 0.11 中可用。我会考虑整理数据。关于 Spark Streaming 作业,我将执行窗口化以触发一些事件。我是 NiFi 的新手,你能指导我如何在 NiFi 中包装我的 byte[] 吗?
    • 最简单的方法可能是编写一个 Groovy 或 Jython 脚本并通过 ExecuteScript 处理器运行它。它不漂亮,但也许您可以读取流文件字节并 Base64 对其进行编码,然后生成一个 JSON 文档,如 { "data" : , "filename" : "the-flow-file-name" }
    猜你喜欢
    • 1970-01-01
    • 2018-01-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多