【问题标题】:Sending bigquery data by email -airflow通过电子邮件发送 bigquery 数据 -airflow
【发布时间】:2021-12-10 22:48:28
【问题描述】:

我在 BQ 有一个项目,包括每天发送 csv 格式的报告。 这些报告是对 bigquery 的查询结果,然后将其压缩为 csv 并通过邮件发送。

使用以下问题的实现来解决我的问题。 How to run a BigQuery query and then send the output CSV to Google Cloud Storage in Apache Airflow?

现在,我正在尝试更改该实现。

原因是因为:

  1. 主要是因为我不喜欢创建临时表来导出结果的想法。我还没有找到导出查询结果的运算符。

  2. 我不需要将数据存储起来,尤其是如果我还是要将其下载到本地气流目录时。

尝试使用“get_pandas_df” bigquery_hook,然后通过 xcom 将结果传递给另一个负责压缩到 csv 的任务。由于 Dataframe 的重量,这是不可能的。

你知道如何直接做吗?

【问题讨论】:

    标签: airflow google-cloud-composer


    【解决方案1】:

    在 Airflow 中,使用现有的运算符以及编写自己的运算符同样容易。这都是 Python。 Airflow 对外部服务有两层方法——它有 Operator(每个 Operator 执行一个操作)和 Hooks(这是一个超级易用的接口,提供 API 与外部服务通信。

    在您的情况下,您应该通过使用多个钩子来创建自己的运算符,而不是组合现有的运算符。例如,一个 Hook 可以将数据读取到 pandas 框架,然后使用一些 Python 代码以可以附加到邮件的形式提取数据,然后使用 util 中的“send_email”发送电子邮件(没有单独的 Hook用于发送电子邮件,因为发送电子邮件也是 Airflow Core 的标准功能)。您可以查看 EmailOperator 代码以了解 send_email 的使用方式以及 BigQueryOperators 以了解如何使用 BigQueryHook。

    您可以通过两种方式做到这一点:

    1. 经典 - 将您自己的运算符定义为对象(您可以在 youd DAG 文件中执行此操作并在您的 DAG 中使用它)。
    class MyOperator(BaseOperator):
         __init__.....
         def execute():
              bq_hook = BigQueryHook(.....)
              ... do stuff ...
              send_email(....)
    
    1. 任务流 API(更 Python 化/功能性更强,样板更少):
    @dag(...)
    def my_dag():
        @task()
        def read_data_and_send_email():
              bq_hook = BigQueryHook(.....)
              ... do stuff ...
              send_email(....)
    

    我认为任务流程更适合您的需求:请参阅http://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-12-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-10
      • 2012-01-07
      • 2012-04-27
      相关资源
      最近更新 更多