【问题标题】:NiFi How to get the current processor Name and Processor group name through the custom processor using (Java)NiFi如何使用(Java)通过自定义处理器获取当前处理器名称和处理器组名称
【发布时间】:2020-08-19 18:14:21
【问题描述】:

我正在使用 Java 创建 NiFi 自定义处理器, 其中一项要求是使用 java 代码获取先前的处理器名称和处理器组(如面包屑)。

【问题讨论】:

    标签: apache-nifi


    【解决方案1】:

    之前的处理器名称和进程组名称不会立即(也不意味着)可供处理器使用,您能否详细解释一下您的用例?您也许可以使用 SiteToSiteProvenanceReportingTask 将出处信息发送回您自己的 NiFi 实例(例如输入端口)并找到对应于 FlowFiles 进入您的自定义处理器的事件,这些事件应该具有源(前一个)处理器和目标(您的自定义)处理器。

    如果您使用带有 Groovy 的 InvokeScriptedProcessor 编写自定义处理器,那么您可以“改变规则”并获取以前的处理器名称等,因为 Groovy 允许访问私有成员并且您可以假设实现onTrigger 中的 ProcessContext 是 StandardProcessContext 的一个实例,因此您可以获取它的成员,其中包括上游连接以及前一个处理器。不过,对于特定的 FlowFile,我不确定您是否可以使用这种方法来了解它来自哪个上游处理器。

    或者,您可以在每个“前一个处理器”之后添加一个 UpdateAttribute,以使用有关该处理器的信息设置属性,但这必须进行硬编码并应用于流程的每个相应部分。

    【讨论】:

    • Matty,我认为绝对需要添加通过上下文访问处理器名称和其他属性的能力。它对于异常跟踪和其他情况可能非常有用。它可能是标准属性之一,例如 filename
    • NiFi所基于的基于流的编程(FBP)的概念与此截然相反。只要传入的数据满足组件的要求,根据定义,源就无关紧要。考虑 *nix 命令行工具的概念——head 将对来自lscat 等的数据进行操作,无论来源如何。 FBP 的目的是将这些组件相互解耦,并消除它们之间的依赖关系。
    【解决方案2】:

    我前段时间遇到过这个问题。我使用InvokeHTTP 处理器并使用nifi-api/process-groups/${process_group_id} Web Service

    这就是我的实现方式:

    1. 确定应该在其中进行错误处理的进程组。 [行动组]
    2. 在操作组旁边创建一个新的进程组 [错误处理组],并添加将文件传输到错误处理组的关系。
    3. 使用InvokeHTTP 处理器并将HTTP Method 设置为GET
    4. Remote URL 设置为http://{nifi-instance}:{port}/nifi-api/process-groups/${action_group_process_group_id}
    5. 您将收到 JSON 格式的响应,您必须根据自己的需要进行自定义

    如果您需要我正在使用的 XML 文件,请告诉我。我可以分享。它对我来说很好用

    【讨论】:

    • 感谢@Malik 的回复,但我只能通过 JAVA 代码使用自定义处理器,而不是使用任何其他处理器
    • 酷,然后在您的 JAVA 代码中调用该 Web 服务。我确信这是可能的。但这是您应该使用的网络服务。
    猜你喜欢
    • 2021-10-27
    • 2013-10-11
    • 1970-01-01
    • 2019-08-17
    • 1970-01-01
    • 2011-07-22
    • 1970-01-01
    • 2021-12-16
    • 1970-01-01
    相关资源
    最近更新 更多