【发布时间】:2021-03-04 13:36:25
【问题描述】:
我有一些 csv 数据需要放入具有唯一文件名和特定文件命名约定的位置。
流程顺序
-
合并记录 -> 在这里,csv 行将被合并并转发到更新属性处理器。
-
更新属性 -> 当合并的流文件内容(csv 行的集合)流经更新属性处理器时,具有以下语法的当前时间戳将被分配给“文件名”流文件属性。 语法:Test-${now():format("yyyyMMddHHmmssSSS", "IST")}.csv
-
PutSftp -> 现在在 putsftp 服务器上,无论接收到表单更新属性的流文件是否发布到远程服务器。
问题陈述:
我的远程服务器对文件名格式有限制,它应该具体定义如下: 语法:Test-${now():format("yyyyMMddHHmmssSSS", "IST")}.csv 鸡蛋:Test-202104041836555.csv
因此,在更新属性处理器中,多个流文件(合并的内容)被分配了相同的文件名,因为线程在同一时间戳实例中处理多个流文件,并且在放入 sftp 服务器时,csv 文件无法处理并放入远程服务器,因为已经存在同名文件。
注意: 我与 putsftp 处理器中的冲突解决策略无关,因为我的 sftp 服务器客户端对文件命名格式有严格的依赖。
处理器序列流参考图片:
更新属性处理器属性:
【问题讨论】:
-
为
PutSFTP设置Conflict Resolution=FAIL属性,添加ControlRate并将失败的文件从PutSFTP 提供给ControlRate。配置ControlRate等待一分钟后再反馈给ChangeFileName。基本上它是一种循环,只为失败的文件获取新的当前时间戳。 -
您的解决方案听起来不错,但这里又出现了一个极端情况-假设文件进入失败状态,然后再次进入更新属性(现在如果处理中也可能存在另一个流文件),再次出现这种情况可能会发生吧?
标签: apache-spark hadoop etl apache-nifi