【问题标题】:Apache Nifi 1.4.0 questions and issuesApache Nifi 1.4.0 问题和问题
【发布时间】:2018-03-31 12:29:44
【问题描述】:

我正在尝试将数据从 S3 复制到 HDFS,发现了几个问题并且有几个问题。

问题

  1. 处理器 ConvertJSONToAvro - 如果流文件不是有效的 JSON,则处理器会陷入无限循环并出现以下错误。

    ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] failed to process session due to java.lang.RuntimeException: Unexpected character ('"' (code 34)): was expecting comma to separate OBJECT entries
     at [Source: org.apache.nifi.controller.repository.io.FlowFileAccessInputStream@2ad7d50d; line: 8, column: 14]: Unexpected character ('"' (code 34)): was expecting comma to separate OBJECT entries
     at [Source: org.apache.nifi.controller.repository.io.FlowFileAccessInputStream@2ad7d50d; line: 8, column: 14]
    
    16:45:35 UTC
    WARNING
    c09f4c27-0160-1000-6c29-1a31afc5a8d4
    
    ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] Processor Administratively Yielded for 1 sec due to processing failure
    
  2. 处理器 FetchS3Object - 无论设置为“对象键”的值如何,它总是选择 ${filename} 的值。例如,如果“对象键”设置为“${Newfilename}”,它会忽略设置的值并仅选择 ${filename}。

问题

  1. 是否可以从以前的处理器中引用流文件?我的用例是 FetchS3Object(file1) -> EvaluateJsonPath -> FetchS3Object(file2) -> PutHDFS -> FetchS3Object(file1) -> PutHDFS。 在这种情况下,不是多次加载 file1,而是可以在整个流程中存储和引用它。

  2. 以上几点,文件file1和file2是一个单元。是否有任何选项可以复制两个文件或两者都失败

  3. ListS3 处理器根据时间戳加载文件。如果文件被加载并且在任何其他步骤中失败,则需要再次加载以进行重新处理。一个选项是更新文件的时间戳,以便在下次轮询期间可用于 ListS3。我们如何在 S3 中更新文件的时间戳?或者有任何其他选项来处理这样的用例。

【问题讨论】:

    标签: apache-nifi


    【解决方案1】:

    问题

    1. 某些处理器发出失败事件,您必须将其映射到失败关系。 EvaluateJSONPath 处理器等其他处理器进入消息的无限循环循环。这是一个与少数处理器有关的未解决问题。

      这里有一篇文章描述了构建自定义 NiFi 处理器的过程:https://community.hortonworks.com/articles/4318/build-custom-nifi-processor.html

      如果您想使用现有的处理器/代码并根据自己的目的进行修改,那么可以在此处找到许多标准处理器:https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard

    2. 请参阅此处FetchS3Object 的实现以获得更多说明。

    问题

    1. 使用 Nifi UpdateAtttribute 将属性添加到流文件。参考这个https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.attributes.UpdateAttribute/index.html

    2. 上面的 UpdateAttribute 处理器应该可以解决你的问题。

    3. 向 LoadS3 添加故障关系,然后将其传递给 ExecuteScript 处理器,进行相应的转换并反馈给 LoadS3 处理器。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-06-02
      • 1970-01-01
      • 1970-01-01
      • 2021-10-22
      • 2020-06-07
      相关资源
      最近更新 更多