【问题标题】:Parse NiFi Data Packet using spark使用 spark 解析 NiFi 数据包
【发布时间】:2017-05-23 19:29:26
【问题描述】:

我正在使用 Apache NiFi 和 Apache Spark 为大学做一个小项目。我想使用 NiFi 创建一个从 HDFS 读取 TSV 文件的工作流,并使用 Spark Streaming 我可以处理文件并将我需要的信息存储在 MySQL 中。我已经在 NiFi 中创建了我的工作流程,并且存储部分已经在工作。问题是我无法解析 NiFi 包,所以我可以使用它们。

文件包含这样的行:

linea1File1 TheReceptionist 653 Entertainment   424 13021   4.34    1305    744 DjdA-5oKYFQ NxTDlnOuybo c-8VuICzXtU

每个空格都是一个制表符(“\t”)

这是我在 Spark 中使用 Scala 编写的代码:

 val ssc = new StreamingContext(config, Seconds(10))
 val packet = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY))
 val file = packet.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))

在此之前,我可以在单个字符串中获取我的整个文件(7000 多行)......不幸的是,我无法将该字符串拆分为行。我需要按行获取整个文件,这样我就可以在一个对象中解析它,对其应用一些操作并存储我想要的内容

有人可以帮我吗?

【问题讨论】:

    标签: scala apache-spark streaming spark-streaming apache-nifi


    【解决方案1】:

    每个数据包将是来自 NiFi 的一个流文件的内容,因此如果 NiFi 从 HDFS 提取一个具有很多行的 TSV 文件,则所有这些行都将在一个数据包中。

    很难说没有看到您的 NiFi 流程,但您可能会使用行数为 1 的 SplitText 在 NiFi 中拆分您的 TSV,然后才能触发流式传输。

    【讨论】:

    • 非常感谢...这完全解决了我的问题...我从没想过用 NiFi 解决它...我专注于 Spark...
    猜你喜欢
    • 1970-01-01
    • 2021-11-03
    • 2018-07-24
    • 2021-09-24
    • 2017-07-23
    • 1970-01-01
    • 2015-12-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多