【问题标题】:GCSToBigQueryOperator not working in composer-2.1.0-airflow-2.3.4GCSToBigQueryOperator 在 composer-2.1.0-airflow-2.3.4 中不起作用
【发布时间】:2023-02-02 15:58:10
【问题描述】:

在最近升级到 composer-2.1.0-airflow-2.3.4 之后,GCSToBigQueryOperator 无法再在存储桶中找到要上传到 BigQuery 的数据。

DAG 的所有其他方面仍然有效。

用法如下

    gcs_to_bq = GCSToBigQueryOperator(
        task_id                             = f"transfer_{data_type}_to_bq_task",
        bucket                              = os.environ["GCS_BUCKET"],
        source_objects                      = file_names,
        destination_project_dataset_table   = os.environ["GCP_PROJECT"] + f".creditsafe.{data_type}",
        schema_object                       = f"dags/schema/creditsafe/{data_type}.json",
        source_format                       = "CSV",
        field_delimiter                     = '|',
        quote_character                     = "",
        max_bad_records                     = 0,
        create_disposition                  = "CREATE_IF_NEEDED",
        ignore_unknown_values               = True,
        allow_quoted_newlines               = True,
        allow_jagged_rows                   = True,
        write_disposition                   = "WRITE_TRUNCATE",
        gcp_conn_id                         = 'google_cloud_default',
        skip_leading_rows                   = 1,
        dag                                 = dag
    )

来自 API 的错误是

google.api_core.exceptions.NotFound: 404 GET

{ "error": { "code": 400, "message": "Unknown output format: media:", "errors": [ { "message": "Unknown output format: media:", "domain": "global", "reason": "invalidAltValue", "locationType": "parameter", "location": "alt" } ] } }

Cloud Composer 传递的错误是

google.api_core.exceptions.NotFound: 404 GET https://storage.googleapis.com/download/storage/v1/b/[BUCKET_HIDDEN]/o/data%2Fcreditsafe%2FCD01%2Ftxt%2F%2A.txt?alt=media: No such object: [BUCKET_HIDDEN]/data/creditsafe/CD01/txt/*.txt: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)

我看不出错误的原因。对 GCS 位置的引用没有改变并且看起来是正确的,而 gcp_conn_id 似乎足以完成所有其他任务。我不知所措。

【问题讨论】:

  • 嗨@David Kane,你能澄清一下你是如何定义GCS_BUCKET的吗?
  • 我不。它由 Cloud Composer 自动提供。它是 Cloud Composer 使用的存储桶的名称。这是包含 DAG 和数据文件夹的存储桶。
  • 我做了更多的挖掘,问题是通配符的使用在升级后不再有效。如果我选择一个文件,一切都像以前一样工作。通配符隐藏在代码中,但我可以使“file_names”引用单个文件,或使用通配符运算符。
  • 嗨@David Kane,如果我的回答解决了您的问题,请考虑接受并投票。如果没有,请告诉我,以便我改进我的答案。接受答案也将有助于社区成员的研究。

标签: google-bigquery google-cloud-storage airflow google-cloud-composer


【解决方案1】:

现已修复上述问题

https://github.com/apache/airflow/pull/28444

目前尚不清楚将其集成到 Cloud Composer 库中需要多长时间。

【讨论】:

    【解决方案2】:

    GCSToBigQueryOperator 不支持通配符 *.csv。根据您的要求,您可以尝试以下步骤:

    • 您可以通过运行以下命令附加到 composer 环境中的 pod:
    1. gcloud container clusters get-credentials --region __GCP_REGION__ __GKE_CLUSTER_NAME__

    2. kubectl get pods -n [Namespace]

    3. kubectl exec -it [Worker] -n [Namespace] -- bash

      • 您可以运行以下命令来识别 google 提供程序包,

        pip list | grep -i goo | grep provider

        如果上述命令的输出与 8.3.0 不同,则将版本更改为 apache-airflow-providers-google ==8.3.0。

    【讨论】:

    • 谢谢,我们无法真正实施您的解决方案,因为它涉及 python 包 spaghetti。但是,您的回答是正确的,许多其他用户也遇到了同样的重大变化。我不再愿意使用 Airflow,而是会使用更现代的无服务器工具直接使用 Google Python 库进行编码。
    【解决方案3】:

    无需通过 gcloud/kubectl 连接来更改 apache-airflow-providers-google 版本,您可以通过 PyPi Packages 页面直接在 Composer UI 中更改它。

    我可以确认在最新的(今天)作曲家:composer-1.20.4-airflow-2.4.3 配置 apache-airflow-providers-google ==8.8.0(最新)为我解决了问题:

    configuring custom PyPI packages

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-23
      • 2016-06-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多