【问题标题】:Start Dataflow job from Cloud Function - ModuleNotFoundError: No module named 'google.cloud.functions'从 Cloud Function 启动 Dataflow 作业 - ModuleNotFoundError: No module named 'google.cloud.functions'
【发布时间】:2021-09-08 20:18:24
【问题描述】:

出于学习目的,我正在尝试在 Dataflow 上启动一个简单的批处理 ETL 流程。这是我执行的逻辑:

Cloud Storage > PubSub > Cloud Function > DataFlow > Cloud Storage

每当有新文件上传到存储桶时,PubSub 主题都会发布一条消息。然后,CloudFunction 监听该主题的订阅,并启动 DataFlow 作业读取文件,执行数据处理并将其保存到同一存储桶上的新文件中。

我已经能够执行所有逻辑,但是我正在努力通过 CloudFunction 实例启动 Dataflow 作业。我的函数开始工作没有任何问题,但几分钟后工作人员显示以下错误消息:

Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/
dataflow_worker/batchworker.py", line 773, in run self._load_main_session(self.local_staging_directory) File
"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 514, in _load_main_session
pickler.load_session(session_file) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/
pickler.py", line 311, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.7/
site-packages/dill/_dill.py", line 368, in load_session module = unpickler.load() File "/usr/local/lib/
python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/
lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module return getattr(__import__(module,
None, None, [obj]), obj) ModuleNotFoundError: No module named 'google.cloud.functions'

错误的重要部分是:

ModuleNotFoundError: No module named 'google.cloud.functions'

我的 CloudFunction 目录如下所示:

/
  requirements.txt
  main.py
  pipeline.py

requirements.txt

# Function dependencies, for example:
# package>=version
apache-beam[gcp]

ma​​in.py

import base64
import json
from pipeline import run

def start_job(event, context):
  message = base64.b64decode(event['data']).decode('utf-8')
  message = json.loads(message)
  bucket = message['bucket']
  filename = message['name']

  if filename.startswith('raw/'):
    run(bucket, filename)
    print('Job sent to Dataflow')
  else:
    print('File uploaded to unknow directory: {}'.format(source_file))

pipeline.py

import apache_beam as beam
from datetime import datetime
import argparse

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions

options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "dpto-bigdata"
google_cloud_options.region = "europe-west1"
google_cloud_options.job_name = "pipeline-test"
google_cloud_options.staging_location = "gs://services-files/staging/"
google_cloud_options.temp_location = "gs://services-files/temp/"
#options.view_as(StandardOptions).runner = "DirectRunner"  # use this for debugging
options.view_as(StandardOptions).runner = "DataFlowRunner"
options.view_as(SetupOptions).save_main_session = True

output_suffix = '.csv'
output_header = 'Name,Total,HP,Attack,Defence,Sp_attack,Sp_defence,Speed,Average'

def run(bucket, filename):
  source_file = 'gs://{}/{}'.format(bucket, filename)
  now = datetime.now().strftime('%Y%m%d-%H%M%S')
  output_prefix = 'gs://{}/processed/{}'.format(bucket, now)

  with beam.Pipeline(options=options) as p:
    raw_values = (
      p
      | "Read from Cloud Storage" >> beam.io.ReadFromText(source_file, skip_header_lines=1)
      | "Split columns" >> beam.Map(lambda x: x.split(','))
      | "Cleanup entries" >> beam.ParDo(ElementCleanup())
      | "Calculate average stats" >> beam.Map(calculate_average)
      | "Format output" >> beam.Map(format_output)
      | "Write to Cloud Storage" >> beam.io.WriteToText(file_path_prefix=output_prefix, file_name_suffix=output_suffix, header=output_header)
    )

class ElementCleanup(beam.DoFn):
  def __init__(self):
    self.transforms = self.map_transforms()

  def map_transforms(self):
    return [
      [self.trim, self.to_lowercase],   # Name
      [self.trim, self.to_float],       # Total
      [self.trim, self.to_float],       # HP
      [self.trim, self.to_float],       # Attack
      [self.trim, self.to_float],       # Defence
      [self.trim, self.to_float],       # Sp_attack
      [self.trim, self.to_float],       # Sp_defence
      [self.trim, self.to_float]        # Speed
    ]

  def process(self, row):
    return [self.clean_row(row, self.transforms)]

  def clean_row(self, row, transforms):
    cleaned = []
    for idx, col in enumerate(row):
      for func in transforms[idx]:
        col = func(col)
      cleaned.append(col)
    return cleaned

  def to_lowercase(self, col:str):
    return col.lower()

  def trim(self, col:str):
    return col.strip()

  def to_float(self, col:str):
    return (float(col) if col != None else None)

def calculate_average(row):
    average = round(sum(row[2:]) / len(row[2:]), 2)
    row.append(average)
    return row

def format_output(row):
    row = [str(col) for col in row]
    return ','.join(row)

if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
      "--bucket",
      help="Bucket to read from."
  )
  parser.add_argument(
      "--filename",
      help="File to read from."
  )
  args = parser.parse_args()

  run(args.bucket, args.filename)

我一直在阅读有关此主题的文章。在此错误之前,我有一个类似的错误显示ModuleNotFoundError: No module named 'main'。我能够通过添加管道选项options.view_as(SetupOptions).save_main_session = True 来解决这个问题,但是我还没有找到任何解决我当前面临的错误的方法。

一旦我开始管道作业,我希望 Dataflow 工作人员不要依赖 CloudFunction,但似乎他们仍在尝试以某种方式与之通信。

【问题讨论】:

    标签: python google-cloud-platform google-cloud-functions apache-beam dataflow


    【解决方案1】:

    我认为这里最好的方法是使用templates,因为您更改的不是代码而是路径。获得模板后,您只需调用 API 即可启动它们。设置起来肯定会更轻松,而且可能更有弹性,因为您不会那么依赖 Cloud Functions。

    我认为还有另一种方法会更好,它不需要 Cloud Functions。您可以使用Java 中的MatchAll.continuously 之类的东西。如果您需要/想要 Python,目前还没有对应的 Python,但我冒昧地创建了一个执行相同操作的版本并发送 Pull Request 以获取新的 Ptransform。

    这个想法是每 X 秒检查一次新文件并根据您的管道处理它们。

    如果您不希望合并拉取请求(如果是的话),您可以复制 DoFn:

    class MatchContinuously(beam.PTransform):
      def __init__(
          self,
          file_pattern,
          interval=360.0,
          has_deduplication=True,
          start_timestamp=Timestamp.now(),
          stop_timestamp=MAX_TIMESTAMP):
    
        self.file_pattern = file_pattern
        self.interval = interval
        self.has_deduplication = has_deduplication
        self.start_ts = start_timestamp
        self.stop_ts = stop_timestamp
    
      def expand(self, pcol):
        impulse = pcol | PeriodicImpulse(
            start_timestamp=self.start_ts,
            stop_timestamp=self.stop_ts,
            fire_interval=self.interval)
    
        match_files = (
            impulse
            | beam.Map(lambda x: self.file_pattern)
            | MatchAll())
    
        if self.has_deduplication:
          match_files = (
              match_files
              # Making a Key Value so each file has its own state.
              | "To KV" >> beam.Map(lambda x: (x.path, x))
              | "Remove Already Read" >> beam.ParDo(_RemoveDuplicates()))
    
        return match_files
    
    class _RemoveDuplicates(beam.DoFn):
    
      FILES_STATE = BagStateSpec('files', StrUtf8Coder())
    
      def process(self, element, file_state=beam.DoFn.StateParam(FILES_STATE)):
        path = element[0]
        file_metadata = element[1]
        bag_content = [x for x in file_state.read()]
    
        if not bag_content:
          file_state.add(path)
          _LOGGER.info("Generated entry for file %s", path)
          yield file_metadata
        else:
          _LOGGER.info("File %s was already read", path)
    

    一个示例管道:

    (p | MatchContinuously("gs://apache-beam-samples/shakespeare/*", 180)
       | Map(lambda x: x.path)
       | ReadAllFromText()
       | Map(lambda x: logging.info(x))
    )
    

    第三种方法是继续使用 GCS 通知并使用 PubSub + MatchAll。管道看起来像:

    (p | ReadFromPubSub(topic)
       | MatchAll())
    )
    

    根据新文件的频率以及是否要使用通知,您可以在三种方法之间做出选择。

    【讨论】:

    • 感谢您的回答。通过使用模板和 API 调用,我能够正确启动数据流作业。还有一件事,我的客户每天或每周都会上传 .CSV 文件。更好的是,每次上传文件时启动批处理作业,或者启动监听 PubSub 订阅的流式作业,该订阅每天/每周只发布一次消息?谢谢
    • 很高兴我能帮上忙。如果您打算每天阅读一次,我肯定会使用批处理管道。顺便说一句,我编辑了答案以添加另一个选项,即在 GCS 中具有流式传输 + PubSub + 通知,但我忘了添加它。无论哪种方式,如果您打算每天只发送几个文件,那么批处理是可行的方法。
    【解决方案2】:

    不要那样做。

    理由是:

    • 云功能(无限扩展)- 适合处理不常发生的事件。
    • 云运行 - 类似于云功能,但您可以同时处理多个事件 - 适用于突发事件(在这种情况下,它比云功能便宜)
    • 从 PubSub 读取数据流 -> 处理频繁/稳定的数据批次 (fils) 或流 (PubSub/Kafka)。

    为每个文件触发数据流作业确实在时间和成本(分钟和美元)方面效率低下。

    如果您需要持续使用 Dataflow 响应文件通知(完成、删除等),您应该将存储通知发送到 pubsub 主题并从数据流订阅中读取它们。请注意,这仅适用于流式传输。

    如果那是您使用ReadFromPubsub 来读取存储通知:

        with beam.Pipeline(options=pipeline_options) as pipeline:
    
            pubsub_msgs = pipeline | (
                'Read PubSub Messages' >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=global_vars.input_subscription) ) 
    

    【讨论】:

    • 我考虑过以这种方式实现它,因为我的客户每天甚至每周都会上传文件。考虑到这一点,每次文件到达我的存储桶时部署流式 Dataflow 作业而不是批处理作业是否仍然更好(在效率和费用方面)?
    • 如果数据源源不断,流式传输非常棒。如果没有,那就太贵了。如果作业很小,您甚至可以每天从计算实例将其作为流式作业运行,然后在 1-2 小时后将其关闭(这不是最好的解决方案,但它很便宜并且您保留代码库)
    猜你喜欢
    • 1970-01-01
    • 2022-12-27
    • 2022-12-27
    • 2022-12-19
    • 2022-12-27
    • 2020-02-10
    • 1970-01-01
    • 1970-01-01
    • 2022-08-24
    相关资源
    最近更新 更多