【问题标题】:pull xcom in BigQueryOperator在 BigQueryOperator 中拉取 xcom
【发布时间】:2019-04-01 19:26:34
【问题描述】:

我正在尝试运行一个 BigQueryOperator,该参数基于先前使用 xcom 的任务(我设法使用带有 xcom_push=True 的 BashOperator 推送它)

我认为使用以下方法可以解决问题

def get_next_run_date(**context):
    last_date = context['task_instance'].xcom_pull(task_ids=['get_autoplay_last_run_date'])[0].rstrip()
    last_date = datetime.strptime(last_date, "%Y%m%d").date()
    return last_date + timedelta(days=1)

t3 = BigQueryOperator(
    task_id='autoplay_calc',
    bql='autoplay_calc.sql',
    params={
            "env" : deployment
            ,"region" : region
            ,"partition_start_date" : get_next_run_date()
            },
    bigquery_conn_id='gcp_conn',
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    allow_large_results=True,
    #provide_context=True,
    destination_dataset_table=reporting_project + '.pa_reporting_public_batch.autoplay_calc',
    dag=dag
    )` 

但使用上述内容会为我提供带有“task_instance”错误的 Broken Dag 错误。

【问题讨论】:

    标签: google-bigquery airflow


    【解决方案1】:

    您是否尝试过使用 context['ti'].xcom_pull()?

    【讨论】:

      【解决方案2】:

      你用错了。

      您不能在params 中使用xcom。您需要在bql/sql 参数中使用它。你的 sql 文件,autoplay_calc.sql 可以包含类似

      select * from XYZ where date == "{{xcom_pull(task_ids=['get_autoplay_last_run_date'])[0].rstrip() }}"
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-05-03
        • 1970-01-01
        • 2022-10-02
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-03-28
        相关资源
        最近更新 更多