【问题标题】:How to maintain unique timestamp at PUTFILE processor in NIFI如何在 NIFI 中的 PUTFILE 处理器上维护唯一时间戳
【发布时间】:2021-03-04 13:36:25
【问题描述】:

我有一些 csv 数据需要放入具有唯一文件名和特定文件命名约定的位置。

流程顺序

  1. 合并记录 -> 在这里,csv 行将被合并并转发到更新属性处理器。

  2. 更新属性 -> 当合并的流文件内容(csv 行的集合)流经更新属性处理器时,具有以下语法的当前时间戳将被分配给“文件名”流文件属性。 语法:Test-${now():format("yyyyMMddHHmmssSSS", "IST")}.csv

  3. PutSftp -> 现在在 putsftp 服务器上,无论接收到表单更新属性的流文件是否发布到远程服务器。

问题陈述:

我的远程服务器对文件名格式有限制,它应该具体定义如下: 语法:Test-${now():format("yyyyMMddHHmmssSSS", "IST")}.csv 鸡蛋:Test-202104041836555.csv

因此,在更新属性处理器中,多个流文件(合并的内容)被分配了相同的文件名,因为线程在同一时间戳实例中处理多个流文件,并且在放入 sftp 服务器时,csv 文件无法处理并放入远程服务器,因为已经存在同名文件。

注意: 我与 putsftp 处理器中的冲突解决策略无关,因为我的 sftp 服务器客户端对文件命名格式有严格的依赖。

处理器序列流参考图片:

更新属性处理器属性:

【问题讨论】:

  • PutSFTP 设置Conflict Resolution=FAIL 属性,添加ControlRate 并将失败的文件从PutSFTP 提供给ControlRate。配置ControlRate等待一分钟后再反馈给ChangeFileName。基本上它是一种循环,只为失败的文件获取新的当前时间戳。
  • 您的解决方案听起来不错,但这里又出现了一个极端情况-假设文件进入失败状态,然后再次进入更新属性(现在如果处理中也可能存在另一个流文件),再次出现这种情况可能会发生吧?

标签: apache-spark hadoop etl apache-nifi


【解决方案1】:

我通过引入 ControrRate 处理器找到了解决方案。 ControlRate 处理器的配置:

通过在一个实例/时间点仅将一个流文件抽取到 udpate 属性处理器,能够为每个流文件分配唯一的时间戳。

更新流程图:

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-10-26
    • 1970-01-01
    • 2016-04-21
    • 2010-12-12
    相关资源
    最近更新 更多