【发布时间】:2019-01-10 23:40:59
【问题描述】:
我是 nifi 新手(使用 1.8.0 版)。我需要使用 kafka 消息,其中包含每条消息的 lat,lon 形式的车辆位置。由于每条消息都将作为流文件到达,因此我需要合并所有这些流文件并制作一个 json 文件,其中包含车辆后跟的完整路径。我正在使用消耗 kafka 处理器来订阅消息,更新属性处理器(添加的属性是文件名:${getStateValue("seq")},seq:${getStateValue("seq"):plus(1)})添加一个序列号作为文件名(例如,文件名是 1,2,3 等)并放置文件处理器以将这些文件写入指定目录。我已经在上述处理器之间的所有成功关系上配置了FIFO优先级队列。曾经,我收到了我想要合并所有流文件的所有消息。为此,我知道我必须分别使用获取文件、强制执行顺序、合并内容(合并策略:bin 打包算法、合并格式:二进制连接)和放置文件处理器。我的方法正确吗?我应该如何确定文件的合并按其名称的顺序进行,因为文件名是一个序列号。我应该在强制订单处理器中放入什么订单属性?应该在组标识符中放入什么?执行订单处理器中是否要添加更多自定义字段?
【问题讨论】:
标签: apache-nifi