【问题标题】:getting running job id from BigQueryOperator using xcom使用 xcom 从 BigQueryOperator 获取正在运行的作业 ID
【发布时间】:2019-06-20 10:01:52
【问题描述】:

我想从 BigQueryOperator 获取 BigQuery 的作业 ID。

我在 bigquery_operator.py 文件中看到以下行:

context['task_instance'].xcom_push(key='job_id', value=job_id)

我不知道这是气流的作业 ID 还是 BigQuery 作业 ID,如果是 BigQuery 作业 ID,我如何使用下游任务中的 xcom 获取它?。

我尝试在下游 Pythonoperator 中执行以下操作:

def write_statistics(**kwargs):
  job_id = kwargs['templates_dict']['job_id']
  print('tamir')
  print(kwargs['ti'].xcom_pull(task_ids='create_tmp_big_query_table',key='job_id'))
  print(kwargs['ti'])
  print(job_id)

t3 = BigQueryOperator(
        task_id='create_tmp_big_query_table',
        bigquery_conn_id='bigquery_default',
        destination_dataset_table= DATASET_TABLE_NAME,
        use_legacy_sql=False,
        write_disposition='WRITE_TRUNCATE',
        sql = """
        #standardSQL...

【问题讨论】:

    标签: google-bigquery airflow


    【解决方案1】:

    UI 非常适合检查 XCom 是否已写入,我建议您在尝试在单独的任务中引用它之前执行此操作,这样您就不必担心是否正在获取正确与否。单击您的create_tmp_big_query_table 任务 -> 任务实例详细信息 -> XCom。如下所示:

    在您的情况下,代码在我看来是正确的,但我猜您的 Airflow 版本没有将保存作业 ID 添加到 XCom 中的更改。此功能是在 https://github.com/apache/airflow/pull/5195 中添加的,目前仅在 master 上,并且目前不包含在最新的稳定版本 (1.10.3) 中。在BigQueryOperator 的 1.10.3 版本中亲自查看。

    您的选择是等待它发布(...有时需要一段时间),使用该更改运行 master 的版本,或者临时复制较新版本的运算符作为自定义运算符.在最后一种情况下,我建议将其命名为 BigQueryOperatorWithXcom 之类的名称,并在发布后将其替换为内置运算符。

    【讨论】:

      【解决方案2】:

      bigquery_operator.py 中的 JOB ID 是 BQ JOB ID。看前面几行就可以理解了:

       if isinstance(self.sql, str):
                  job_id = self.bq_cursor.run_query(
                      sql=self.sql,
                      destination_dataset_table=self.destination_dataset_table,
                      write_disposition=self.write_disposition,
                      allow_large_results=self.allow_large_results,
                      flatten_results=self.flatten_results,
                      udf_config=self.udf_config,
                      maximum_billing_tier=self.maximum_billing_tier,
                      maximum_bytes_billed=self.maximum_bytes_billed,
                      create_disposition=self.create_disposition,
                      query_params=self.query_params,
                      labels=self.labels,
                      schema_update_options=self.schema_update_options,
                      priority=self.priority,
                      time_partitioning=self.time_partitioning,
                      api_resource_configs=self.api_resource_configs,
                      cluster_fields=self.cluster_fields,
                      encryption_configuration=self.encryption_configuration
                  )
              elif isinstance(self.sql, Iterable):
                  job_id = [
                      self.bq_cursor.run_query(
                          sql=s,
                          destination_dataset_table=self.destination_dataset_table,
                          write_disposition=self.write_disposition,
                          allow_large_results=self.allow_large_results,
                          flatten_results=self.flatten_results,
                          udf_config=self.udf_config,
                          maximum_billing_tier=self.maximum_billing_tier,
                          maximum_bytes_billed=self.maximum_bytes_billed,
                          create_disposition=self.create_disposition,
                          query_params=self.query_params,
                          labels=self.labels,
                          schema_update_options=self.schema_update_options,
                          priority=self.priority,
                          time_partitioning=self.time_partitioning,
                          api_resource_configs=self.api_resource_configs,
                          cluster_fields=self.cluster_fields,
                          encryption_configuration=self.encryption_configuration
                      )
                      for s in self.sql]
      

      最终,run_with_configuration 方法从 BQ 返回 self.running_job_id

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-08-16
        • 1970-01-01
        • 2014-06-15
        • 1970-01-01
        • 1970-01-01
        • 2020-01-22
        相关资源
        最近更新 更多