【发布时间】:2021-09-08 20:18:24
【问题描述】:
出于学习目的,我正在尝试在 Dataflow 上启动一个简单的批处理 ETL 流程。这是我执行的逻辑:
Cloud Storage > PubSub > Cloud Function > DataFlow > Cloud Storage
每当有新文件上传到存储桶时,PubSub 主题都会发布一条消息。然后,CloudFunction 监听该主题的订阅,并启动 DataFlow 作业读取文件,执行数据处理并将其保存到同一存储桶上的新文件中。
我已经能够执行所有逻辑,但是我正在努力通过 CloudFunction 实例启动 Dataflow 作业。我的函数开始工作没有任何问题,但几分钟后工作人员显示以下错误消息:
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/
dataflow_worker/batchworker.py", line 773, in run self._load_main_session(self.local_staging_directory) File
"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 514, in _load_main_session
pickler.load_session(session_file) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/
pickler.py", line 311, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.7/
site-packages/dill/_dill.py", line 368, in load_session module = unpickler.load() File "/usr/local/lib/
python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/
lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module return getattr(__import__(module,
None, None, [obj]), obj) ModuleNotFoundError: No module named 'google.cloud.functions'
错误的重要部分是:
ModuleNotFoundError: No module named 'google.cloud.functions'
我的 CloudFunction 目录如下所示:
/
requirements.txt
main.py
pipeline.py
requirements.txt
# Function dependencies, for example:
# package>=version
apache-beam[gcp]
main.py
import base64
import json
from pipeline import run
def start_job(event, context):
message = base64.b64decode(event['data']).decode('utf-8')
message = json.loads(message)
bucket = message['bucket']
filename = message['name']
if filename.startswith('raw/'):
run(bucket, filename)
print('Job sent to Dataflow')
else:
print('File uploaded to unknow directory: {}'.format(source_file))
pipeline.py
import apache_beam as beam
from datetime import datetime
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "dpto-bigdata"
google_cloud_options.region = "europe-west1"
google_cloud_options.job_name = "pipeline-test"
google_cloud_options.staging_location = "gs://services-files/staging/"
google_cloud_options.temp_location = "gs://services-files/temp/"
#options.view_as(StandardOptions).runner = "DirectRunner" # use this for debugging
options.view_as(StandardOptions).runner = "DataFlowRunner"
options.view_as(SetupOptions).save_main_session = True
output_suffix = '.csv'
output_header = 'Name,Total,HP,Attack,Defence,Sp_attack,Sp_defence,Speed,Average'
def run(bucket, filename):
source_file = 'gs://{}/{}'.format(bucket, filename)
now = datetime.now().strftime('%Y%m%d-%H%M%S')
output_prefix = 'gs://{}/processed/{}'.format(bucket, now)
with beam.Pipeline(options=options) as p:
raw_values = (
p
| "Read from Cloud Storage" >> beam.io.ReadFromText(source_file, skip_header_lines=1)
| "Split columns" >> beam.Map(lambda x: x.split(','))
| "Cleanup entries" >> beam.ParDo(ElementCleanup())
| "Calculate average stats" >> beam.Map(calculate_average)
| "Format output" >> beam.Map(format_output)
| "Write to Cloud Storage" >> beam.io.WriteToText(file_path_prefix=output_prefix, file_name_suffix=output_suffix, header=output_header)
)
class ElementCleanup(beam.DoFn):
def __init__(self):
self.transforms = self.map_transforms()
def map_transforms(self):
return [
[self.trim, self.to_lowercase], # Name
[self.trim, self.to_float], # Total
[self.trim, self.to_float], # HP
[self.trim, self.to_float], # Attack
[self.trim, self.to_float], # Defence
[self.trim, self.to_float], # Sp_attack
[self.trim, self.to_float], # Sp_defence
[self.trim, self.to_float] # Speed
]
def process(self, row):
return [self.clean_row(row, self.transforms)]
def clean_row(self, row, transforms):
cleaned = []
for idx, col in enumerate(row):
for func in transforms[idx]:
col = func(col)
cleaned.append(col)
return cleaned
def to_lowercase(self, col:str):
return col.lower()
def trim(self, col:str):
return col.strip()
def to_float(self, col:str):
return (float(col) if col != None else None)
def calculate_average(row):
average = round(sum(row[2:]) / len(row[2:]), 2)
row.append(average)
return row
def format_output(row):
row = [str(col) for col in row]
return ','.join(row)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"--bucket",
help="Bucket to read from."
)
parser.add_argument(
"--filename",
help="File to read from."
)
args = parser.parse_args()
run(args.bucket, args.filename)
我一直在阅读有关此主题的文章。在此错误之前,我有一个类似的错误显示ModuleNotFoundError: No module named 'main'。我能够通过添加管道选项options.view_as(SetupOptions).save_main_session = True 来解决这个问题,但是我还没有找到任何解决我当前面临的错误的方法。
一旦我开始管道作业,我希望 Dataflow 工作人员不要依赖 CloudFunction,但似乎他们仍在尝试以某种方式与之通信。
【问题讨论】:
标签: python google-cloud-platform google-cloud-functions apache-beam dataflow