【问题标题】:Sending bigquery data by email -airflow通过电子邮件发送 bigquery 数据 -airflow
【发布时间】:2021-12-10 22:48:28
【问题描述】:
【问题讨论】:
标签:
airflow
google-cloud-composer
【解决方案1】:
在 Airflow 中,使用现有的运算符以及编写自己的运算符同样容易。这都是 Python。 Airflow 对外部服务有两层方法——它有 Operator(每个 Operator 执行一个操作)和 Hooks(这是一个超级易用的接口,提供 API 与外部服务通信。
在您的情况下,您应该通过使用多个钩子来创建自己的运算符,而不是组合现有的运算符。例如,一个 Hook 可以将数据读取到 pandas 框架,然后使用一些 Python 代码以可以附加到邮件的形式提取数据,然后使用 util 中的“send_email”发送电子邮件(没有单独的 Hook用于发送电子邮件,因为发送电子邮件也是 Airflow Core 的标准功能)。您可以查看 EmailOperator 代码以了解 send_email 的使用方式以及 BigQueryOperators 以了解如何使用 BigQueryHook。
您可以通过两种方式做到这一点:
- 经典 - 将您自己的运算符定义为对象(您可以在 youd DAG 文件中执行此操作并在您的 DAG 中使用它)。
class MyOperator(BaseOperator):
__init__.....
def execute():
bq_hook = BigQueryHook(.....)
... do stuff ...
send_email(....)
- 任务流 API(更 Python 化/功能性更强,样板更少):
@dag(...)
def my_dag():
@task()
def read_data_and_send_email():
bq_hook = BigQueryHook(.....)
... do stuff ...
send_email(....)
我认为任务流程更适合您的需求:请参阅http://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html