【问题标题】:merging of flow files in the specified order按指定顺序合并流文件
【发布时间】:2019-01-10 23:40:59
【问题描述】:

我是 nifi 新手(使用 1.8.0 版)。我需要使用 kafka 消息,其中包含每条消息的 lat,lon 形式的车辆位置。由于每条消息都将作为流文件到达,因此我需要合并所有这些流文件并制作一个 json 文件,其中包含车辆后跟的完整路径。我正在使用消耗 kafka 处理器来订阅消息,更新属性处理器(添加的属性是文件名:${getStateValue("seq")},seq:${getStateValue("seq"):plus(1)})添加一个序列号作为文件名(例如,文件名是 1,2,3 等)并放置文件处理器以将这些文件写入指定目录。我已经在上述处理器之间的所有成功关系上配置了FIFO优先级队列。曾经,我收到了我想要合并所有流文件的所有消息。为此,我知道我必须分别使用获取文件、强制执行顺序、合并内容(合并策略:bin 打包算法、合并格式:二进制连接)和放置文件处理器。我的方法正确吗?我应该如何确定文件的合并按其名称的顺序进行,因为文件名是一个序列号。我应该在强制订单处理器中放入什么订单属性?应该在组标识符中放入什么?执行订单处理器中是否要添加更多自定义字段?

【问题讨论】:

    标签: apache-nifi


    【解决方案1】:

    EnforceOrder 处理器documentation

    1.组标识符

    • 此属性评估您的案例使用的每个流文件 UpdateAttribute 处理器,添加 group_name 属性并使用相同的 ${group_name} 属性在组标识符属性值中。

    2.订单属性

    • 不支持表达语言。

    • 您可以使用 filename(或)在 UpdateAttribute 处理器并在您的 Order Attribute 属性值。

    对于强制执行命令处理器的参考/使用,请使用this 模板并上传到您的NiFi 实例。

    【讨论】:

      猜你喜欢
      • 2023-02-13
      • 1970-01-01
      • 2014-11-23
      • 2015-09-22
      • 2018-10-31
      • 2021-10-16
      • 1970-01-01
      • 1970-01-01
      • 2018-11-13
      相关资源
      最近更新 更多