【发布时间】:2018-03-22 01:28:40
【问题描述】:
你好 Internet Hive Mind!
我需要使用 nifi 查询 AWS Athena,但是我需要为每个发送的查询更改暂存目录(将保存结果的 S3 存储桶和文件夹)。
但是必须在 DBCPConnectionPool 控制器服务上设置 s3_staging_dir 属性。 如何更改每个不同流文件的该属性的值? 显然不能单独用表达式语言来获取。
谢谢!
【问题讨论】:
标签: apache-nifi
你好 Internet Hive Mind!
我需要使用 nifi 查询 AWS Athena,但是我需要为每个发送的查询更改暂存目录(将保存结果的 S3 存储桶和文件夹)。
但是必须在 DBCPConnectionPool 控制器服务上设置 s3_staging_dir 属性。 如何更改每个不同流文件的该属性的值? 显然不能单独用表达式语言来获取。
谢谢!
【问题讨论】:
标签: apache-nifi
我不确定每个查询都依赖于不同暂存目录的流程的性质,但有几点需要牢记。
DBCPConnectionPool 控制器服务确实允许评估表达式语言的动态属性,但是当控制器服务启用时会执行表达式语言评估,因此每次启动/停止“一次”。 来自Apache NiFi DBCPConnectionPool documentation:
动态属性:
动态属性允许用户指定名称和值 属性。
...
指定要在 JDBC 上设置的属性名称和值 连接。如果使用表达式语言,评估将是 在启用控制器服务时执行。注意没有流量 文件输入(例如属性)可用于表达式 这些属性的语言结构。 支持表达式 语言:真
由于您要求每个请求的 S3 暂存目录都不同,我认为在这种情况下,您需要采用以下选项之一:
DBCPConnectionPool 不支持您的用例)AthenaConnectionPool 控制器服务扩展DBCPConnectionPool 控制器服务。构建自己的 NiFi 组件的教程有很多,但 NiFi Developer Guide > Developing Controller Services 是最好的起点。您可以创建一个控制器服务,在执行表达式语言执行时评估传入的流文件属性,但您需要手动触发它,因为控制器服务在其生命周期中没有@OnTrigger 阶段。如果您还编写了自定义处理器,您可以从处理器的onTrigger() 方法调用控制器服务中的一些“重新评估”方法,但现有处理器不会调用此方法。相反,您可以使用执行程序 theoretically put a high frequency refresher in the controller service 本身,但这肯定会影响性能DBCPConnectionPool 实例和 SQL 处理器(按 1 - 3 的顺序可行,否则很糟糕)ExecuteStreamCommand 处理器和awscli 使用命令行工具执行查询。这使您无法使用 NiFi 原生 SQL 工具,但允许在每次调用时进行自定义查询,因为 ExecuteStreamCommand 可以解释流文件特定的属性并在查询中使用它们【讨论】:
您不必在DBCPConnectionPool 中设置属性。您在 SQL 处理器中设置的查询会将 Athena 的结果作为流文件输出。您可以将 SQL 处理器连接到 PutS3Object 并指定存储桶名称和其他必要的属性。这会将您的 SQL 查询结果写入 S3 暂存目录。
【讨论】: