【问题标题】:How to query Google Big Query in Apache Airflow and return results as a Pandas Dataframe?如何在 Apache Airflow 中查询 Google Big Query 并将结果作为 Pandas Dataframe 返回?
【发布时间】:2019-11-02 12:03:48
【问题描述】:

我正在尝试将 bigquery 查询保存到自定义 Airflow 运算符中的数据框。

我尝试过使用 airflow.contrib.hooks.bigquery_hook 和 get_pandas_df 方法。该任务卡在身份验证上,因为它希望我手动访问一个 url 进行身份验证。

因此,我在身份验证中进行了硬编码。这可行,但绝对不理想。

工作但不理想(凭证是硬编码的):

def execute(self, context):
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'my-file-location.json'
        client = bigquery.Client()

        job_config = bigquery.QueryJobConfig()

        df = client.query(
            self.query,
            location="US",
            job_config=job_config,).to_dataframe()

不工作:

def execute(self, context):
    bq  = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=None,use_legacy_sql=True, location='US')
    df = bq.get_pandas_df(self.query)

此代码无法进行身份验证。这是日志:[2019-06-19 12:56:05,526] {logging_mixin.py:95} INFO - 请访问此 URL 以授权此应用程序。

【问题讨论】:

    标签: google-bigquery airflow


    【解决方案1】:

    似乎没有为钩子指定服务帐户或密钥路径。

    这是设置 GCP 连接的指南。 https://github.com/apache/airflow/blob/1.10.3/docs/howto/connection/gcp.rst

    在您的气流配置文件中设置AIRFLOW_CONN_BIGQUERY_DEFAULT 环境变量。

    如果凭据在气流过程可访问的路径中可用,则可以使用key_path 查询参数的方式。
    否则,将 key_dict 查询参数设置为凭证文件的 URL 编码 JSON 内容。

    AIRFLOW_CONN_BIGQUERY_DEFAULT=google-cloud-platform://?extra__google_cloud_platform__key_path=%2Fkeys%2Fkey.json&extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform&extra__google_cloud_platform__project=airflow&extra__google_cloud_platform__num_retries=5
    

    【讨论】:

    • 谢谢!我通过将 AIRFLOW_CONN_BIGQUERY_DEFAULT 添加到airflow.cfg 来尝试您的解决方案,但我遇到了同样的错误。我在“管理员 - > 连接”下的 UI 中设置了凭据,包括密钥路径。创建 BigQueryHook 对象有效,get_pandas() 仍然存在身份验证问题。
    【解决方案2】:

    补充@Oluwafemi 的响应,现在您拥有 BigQueryHook 的凭据,您可以使用它们来实例化BigQueryPandasConnector。根据文档,此连接器:

    ...允许 Airflow 将 BigQuery 与 Pandas 一起使用,而无需强制使用三足 OAuth 连接...

    这是一个例子:

    def execute(self, context):
    
        bq = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=None,use_legacy_sql=True, location='US')
        pd = BigQueryPandasConnector(bq._get_field('project'), bq.get_service())
        df = pd.read_gbq(self.query)
    

    【讨论】:

      【解决方案3】:

      不知怎的,我无法让BigQueryPandasConnector 工作。我最终得到的是使用 BigQueryHook 的凭据通过 BigQuery 的官方 Python 客户端创建一个普通的bigquery.client.Client

      这是一个例子:

      from google.cloud import bigquery
      
      bq_hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, use_legacy_sql=False)
      bq_client = bigquery.Client(project = bq_hook._get_field("project"), credentials = bq_hook._get_credentials())
      df = bq_client.query(sql).to_dataframe()
      

      【讨论】:

      • 能够突破到正常的bigquery.client.Cloud 似乎是一个明显的需求。该文档有一个hook.get_client(),它应该提供一个经过身份验证的底层 bigquery 客户端。这在我的气流实例中不起作用,没有找到属性错误。这个解决方案挽救了一天。
      猜你喜欢
      • 2018-09-12
      • 2015-06-20
      • 2020-11-12
      • 1970-01-01
      • 2014-06-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多