【问题标题】:How can I pass a flowfile attribute to a controller service?如何将流文件属性传递给控制器​​服务?
【发布时间】:2018-03-22 01:28:40
【问题描述】:

你好 Internet Hive Mind!

我需要使用 nifi 查询 AWS Athena,但是我需要为每个发送的查询更改暂存目录(将保存结果的 S3 存储桶和文件夹)。

但是必须在 DBCPConnectionPool 控制器服务上设置 s3_staging_dir 属性。 如何更改每个不同流文件的该属性的值? 显然不能单独用表达式语言来获取。

谢谢!

【问题讨论】:

    标签: apache-nifi


    【解决方案1】:

    我不确定每个查询都依赖于不同暂存目录的流程的性质,但有几点需要牢记。

    1. DBCPConnectionPool 控制器服务确实允许评估表达式语言的动态属性,但是当控制器服务启用时会执行表达式语言评估,因此每次启动/停止“一次”。
    2. 控制器服务上的动态属性评估流文件属性。

    来自Apache NiFi DBCPConnectionPool documentation

    动态属性:

    动态属性允许用户指定名称和值 属性。

    ...

    指定要在 JDBC 上设置的属性名称和值 连接。如果使用表达式语言,评估将是 在启用控制器服务时执行。注意没有流量 文件输入(例如属性)可用于表达式 这些属性的语言结构。 支持表达式 语言:真

    由于您要求每个请求的 S3 暂存目录都不同,我认为在这种情况下,您需要采用以下选项之一:

    1. File a Jira 在 NiFi 中请求原生 Athena 支持(彻底解释为什么现有的 DBCPConnectionPool 不支持您的用例)
    2. 使用您自己的AthenaConnectionPool 控制器服务扩展DBCPConnectionPool 控制器服务。构建自己的 NiFi 组件的教程有很多,但 NiFi Developer Guide > Developing Controller Services 是最好的起点。您可以创建一个控制器服务,在执行表达式语言执行时评估传入的流文件属性,但您需要手动触发它,因为控制器服务在其生命周期中没有@OnTrigger 阶段。如果您还编写了自定义处理器,您可以从处理器的onTrigger() 方法调用控制器服务中的一些“重新评估”方法,但现有处理器不会调用此方法。相反,您可以使用执行程序 theoretically put a high frequency refresher in the controller service 本身,但这肯定会影响性能
    3. 为每个暂存目录创建多个 DBCPConnectionPool 实例和 SQL 处理器(按 1 - 3 的顺序可行,否则很糟糕)
    4. 使用ExecuteStreamCommand 处理器和awscli 使用命令行工具执行查询。这使您无法使用 NiFi 原生 SQL 工具,但允许在每次调用时进行自定义查询,因为 ExecuteStreamCommand 可以解释流文件特定的属性并在查询中使用它们
    5. 重新评估您的流程设计,看看是否有一种方法可以执行查询,而不允许在单个查询执行时使用任意 S3 暂存目录

    【讨论】:

    • 使用#4。似乎是最好的解决方案。
    【解决方案2】:

    您不必在DBCPConnectionPool 中设置属性。您在 SQL 处理器中设置的查询会将 Athena 的结果作为流文件输出。您可以将 SQL 处理器连接到 PutS3Object 并指定存储桶名称和其他必要的属性。这会将您的 SQL 查询结果写入 S3 暂存目录。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-05-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多