【问题标题】:Apache Beam/GCP Dataflow running from AINotebook/Jupyter从 AINotebook/Jupyter 运行的 Apache Beam/GCP 数据流
【发布时间】:2020-03-04 05:00:57
【问题描述】:

我们最近将基础架构迁移到 GCP,并且热衷于将 DataProc(Spark) 和 DataFlow(Apache Beam) 用于我们的数据管道。 Dataproc 可以直接让它工作,但运行 Dataflow 让我们很头疼:

我们如何从 JupyterNotebook(如 AI 笔记本)运行 Dataflow 作业

示例如下,我确实有一个巨大的数据集,我想通过 grou_by,然后进行过滤和一些计算,然后它应该在特定的存储桶中写入一个对象(现在这段代码,我不知道如何,正在删除存储桶,而不是做一些有用的事情)

import datetime, os

def preprocess(in_test_mode):
    import shutil, os, subprocess
    job_name = 'hola'

    if in_test_mode:
        print('Launching local job ... hang on')
        OUTPUT_DIR = './preproc'
        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
        os.makedirs(OUTPUT_DIR)
    else:
        print('Launching Dataflow job {} ... hang on'.format(job_name))
        OUTPUT_DIR = 'gs://experimentos-con-humanos/'.format(BUCKET)
        try:
            subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
        except:
            pass

    options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'temp'),
      'temp_location': os.path.join(OUTPUT_DIR, 'temp'),
      'job_name': job_name,
      'project': PROJECT,
      'region': REGION,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
      'max_num_workers': 6
    }
    opts = beam.pipeline.PipelineOptions(flags = [], **options)

    if in_test_mode:
        RUNNER = 'DataflowRunner'
    else:
        RUNNER = 'DataflowRunner'

    p = beam.Pipeline(RUNNER, options = opts)
    (p 
         | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(table_spec))    
         | 'hashAsKey' >> beam.Map(lambda r: (r['afi_hash'], r))
         | 'Transpose' >> beam.GroupByKey()
         | 'Filtro menos de 12' >> beam.Filter(lambda r: len(r[1]) >= 12 )    
         | 'calculos' >> beam.Map(calculos)
            #| 'Group and sum' >> beam.
            #| 'Format results' >> beam.
         | 'Write results' >> beam.Map(lambda r: print(r))
         | '{}_out'.format(1) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(1))))
        )

    job = p.run()
    if in_test_mode:
        job.wait_until_finish()
        print("Done!")

preprocess(in_test_mode = False)

1) 它不起作用,但它确实可以运行! 2) 如果我将'DataflowRunner' 更改为'DirectRunner',该代码可以工作,这意味着它可以在本地工作 3) 如果我不更改它,该作业将不会出现在 Dataflow 中,而是会删除它工作的 GCP 存储桶

PD:我确实拥有存储、数据流和 BigQuery 的管理员权限 PD2:该表确实存在,并且我有 cuadruple 的 Bucket 检查它是否具有确切的名称 PD3:我想让它在 Jupyter Notebook 上运行,但如果有人想知道,那就没有必要了

【问题讨论】:

  • 您好,您可以在笔记本环境之外运行 Dataflow 作业吗?您能否再次检查您是否遵循了cloud.google.com/dataflow/docs/quickstarts/quickstart-python 上的 Python 快速入门指南中的指示?我想确保您启用了正确的 API 并正确设置了服务帐户密钥。
  • @Cubez tks 的响应,我们已经能够使用 .py 来做到这一点。我们已经启用了 APIS,给了 ai notebook 网络用户权限。关于服务密钥:我想如果它是一个人工智能笔记本理论上应该不需要它。当我们在笔记本外运行时,它会显示 ``` 工作流失败。原因:数据流服务帐户无法访问网络默认值或无法访问网络默认值。```
  • 看起来您实际上是在删除第一个 if 语句中的所有内容。在构建OUTPUT_DIR 时,您不是缺少{} 吗?
  • 是的,去掉了 if 语句,让它更适合 Jupyter 之类的工作。现在,我的问题似乎与此更相关:stackoverflow.com/questions/51362560/…。如果你更喜欢把它写成答案,我可以给你打绿勾!
  • 感谢您的确认,现在添加答案

标签: google-cloud-platform jupyter-notebook google-cloud-dataflow apache-beam


【解决方案1】:

正如 cmets 中所说,问题似乎出在预处理部分。特别是这部分在本地工作或使用DataflowRunner时执行的不同:

if in_test_mode:
    print('Launching local job ... hang on')
    OUTPUT_DIR = './preproc'
    shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
    os.makedirs(OUTPUT_DIR)
else:
    print('Launching Dataflow job {} ... hang on'.format(job_name))
    OUTPUT_DIR = 'gs://experimentos-con-humanos/'.format(BUCKET)
    try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
    except:
        pass

似乎负责删除存储桶内容(用于输出、临时文件等)。另请注意,在示例中,您实际上并未将 BUCKET 添加到 OUTPUT_DIR

【讨论】:

    猜你喜欢
    • 2020-05-01
    • 1970-01-01
    • 2021-05-20
    • 2020-12-22
    • 2020-03-22
    • 1970-01-01
    • 1970-01-01
    • 2020-02-21
    • 1970-01-01
    相关资源
    最近更新 更多