【问题标题】:Google cloud dataflow unable to open flex template file谷歌云数据流无法打开弹性模板文件
【发布时间】:2022-01-27 02:59:56
【问题描述】:

在运行部署脚本以启动数据流弹性作业后,我得到

“未能读取作业文件:gs://dataflow-staging-europe-west2/------/staging/template_launches/{JOBNAME}/job_object 错误消息:(7ea9e263ad5cddb5):无法打开模板文件:gs://dataflow-staging-europe-west2-644733586574/staging/template_launches/{JOBNAME}/job_object..

控制台日志显示“模板启动成功”,并且云构建日志中没有 pythonic 错误。

这是我的云存储 Python 代码的主要结构,用于解析 csv 文件,对原始数据执行一些转换/计算,然后创建数据存储实体管道如下所示:

文件结构:

    ├── pipeline
    │   ├── runner.py
    │   ├── setup.py
    │   ├── ingestion
    │   │   ├── transformer.py
    │   │   ├── custom.py

编辑器中的文件图像:1

码头文件

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
 
RUN apt-get update
# Upgrade pip and install the requirements.
RUN pip3 install --no-cache-dir --upgrade pip
RUN pip3 install apache-beam==2.35.0
RUN pip3 install google-cloud-logging
 
WORKDIR /
RUN mkdir -p /dataflow/template
WORKDIR /dataflow/template

COPY ingestion ${WORKDIR}/ingestion
COPY setup.py ${WORKDIR}/setup.py
COPY runner.py ${WORKDIR}/runner.py
 
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/runner.py"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
 
# Since we already downloaded all the dependencies, there's no need to rebuild everything.
ENV PIP_NO_DEPS=True
# std libs
import os

import logging
import datetime

# helper modules
from ingestion.all_settings import *
from ingestion.avg_helpers import *
from ingestion.transform import *

# Data-flow modules
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1new.datastoreio import WriteToDatastore


# MAIN function, to run dataflow pipeline module
def dataflow():
    JOB_NAME = f"datastore-upload-{datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S')}"

    #wildcard expression for file storage bucket containing the subject's data 
    file_ex = [gs//bucket-example-csv-file]

    #variable to store pipeline options to be passed into beam function later
    pipeline_options = {
        'runner': 'DirectRunner',
        'project': PROJECT,
        'region': 'europe-west-b',
        'job_name': JOB_NAME,
        'staging_location': TEST_BUCKET + '/staging',
        'temp_location': TEST_BUCKET + '/temp',
        'save_main_session': False,
        'streaming': False,
        'setup_file': '/dataflow/template/setup.py',
    }

    options = PipelineOptions.from_dictionary(pipeline_options)
    with beam.Pipeline(options=options) as p:
        for i,filename in enumerate(file_ex):
            (p 
            | 'Reading input files' >> beam.io.ReadFromText(filename, skip_header_lines = 1)
            | 'Converting from csv to dict' >> beam.ParDo(ProcessCSV(), harvard_medical_headers)
            | 'Create entities for minute averages' >> beam.ParDo(BuildMinuteEntities(),filename)
            | 'Write entities into Datastore' >> WriteToDatastore(PROJECT)
            )
            p.run().wait_until_finish()


if __name__ == '__main__':
    dataflow()

【问题讨论】:

    标签: python google-cloud-platform apache-beam-io


    【解决方案1】:

    可能需要在您的光束选项中提及设置文件名:

    ...
        #variable to store pipeline options to be passed into beam function later
        pipeline_options = {
            'runner': 'DirectRunner',
            'project': PROJECT,
            'region': 'europe-west-b',
            'job_name': JOB_NAME,
            'staging_location': TEST_BUCKET + '/staging',
            'temp_location': TEST_BUCKET + '/temp',
            'save_main_session': False,
            'streaming': False,
            'setup_file'='/dataflow/template/setup.py',
        }
    

    【讨论】:

    • 啊,谢谢,我会试试的!
    • 我尝试了上述方法,但是没有解决问题:( - 也许我的其他部分有误。我在上面添加了一些关于我的文件结构和 docker 文件的详细信息,如果这样可以清除什么我在做
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-16
    • 1970-01-01
    相关资源
    最近更新 更多