【发布时间】:2020-03-04 05:00:57
【问题描述】:
我们最近将基础架构迁移到 GCP,并且热衷于将 DataProc(Spark) 和 DataFlow(Apache Beam) 用于我们的数据管道。 Dataproc 可以直接让它工作,但运行 Dataflow 让我们很头疼:
我们如何从 JupyterNotebook(如 AI 笔记本)运行 Dataflow 作业
示例如下,我确实有一个巨大的数据集,我想通过 grou_by,然后进行过滤和一些计算,然后它应该在特定的存储桶中写入一个对象(现在这段代码,我不知道如何,正在删除存储桶,而不是做一些有用的事情)
import datetime, os
def preprocess(in_test_mode):
import shutil, os, subprocess
job_name = 'hola'
if in_test_mode:
print('Launching local job ... hang on')
OUTPUT_DIR = './preproc'
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
os.makedirs(OUTPUT_DIR)
else:
print('Launching Dataflow job {} ... hang on'.format(job_name))
OUTPUT_DIR = 'gs://experimentos-con-humanos/'.format(BUCKET)
try:
subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
except:
pass
options = {
'staging_location': os.path.join(OUTPUT_DIR, 'temp'),
'temp_location': os.path.join(OUTPUT_DIR, 'temp'),
'job_name': job_name,
'project': PROJECT,
'region': REGION,
'teardown_policy': 'TEARDOWN_ALWAYS',
'no_save_main_session': True,
'max_num_workers': 6
}
opts = beam.pipeline.PipelineOptions(flags = [], **options)
if in_test_mode:
RUNNER = 'DataflowRunner'
else:
RUNNER = 'DataflowRunner'
p = beam.Pipeline(RUNNER, options = opts)
(p
| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(table_spec))
| 'hashAsKey' >> beam.Map(lambda r: (r['afi_hash'], r))
| 'Transpose' >> beam.GroupByKey()
| 'Filtro menos de 12' >> beam.Filter(lambda r: len(r[1]) >= 12 )
| 'calculos' >> beam.Map(calculos)
#| 'Group and sum' >> beam.
#| 'Format results' >> beam.
| 'Write results' >> beam.Map(lambda r: print(r))
| '{}_out'.format(1) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(1))))
)
job = p.run()
if in_test_mode:
job.wait_until_finish()
print("Done!")
preprocess(in_test_mode = False)
1) 它不起作用,但它确实可以运行!
2) 如果我将'DataflowRunner' 更改为'DirectRunner',该代码可以工作,这意味着它可以在本地工作
3) 如果我不更改它,该作业将不会出现在 Dataflow 中,而是会删除它工作的 GCP 存储桶
PD:我确实拥有存储、数据流和 BigQuery 的管理员权限 PD2:该表确实存在,并且我有 cuadruple 的 Bucket 检查它是否具有确切的名称 PD3:我想让它在 Jupyter Notebook 上运行,但如果有人想知道,那就没有必要了
【问题讨论】:
-
您好,您可以在笔记本环境之外运行 Dataflow 作业吗?您能否再次检查您是否遵循了cloud.google.com/dataflow/docs/quickstarts/quickstart-python 上的 Python 快速入门指南中的指示?我想确保您启用了正确的 API 并正确设置了服务帐户密钥。
-
@Cubez tks 的响应,我们已经能够使用 .py 来做到这一点。我们已经启用了 APIS,给了 ai notebook 网络用户权限。关于服务密钥:我想如果它是一个人工智能笔记本理论上应该不需要它。当我们在笔记本外运行时,它会显示 ``` 工作流失败。原因:数据流服务帐户无法访问网络默认值或无法访问网络默认值。```
-
看起来您实际上是在删除第一个
if语句中的所有内容。在构建OUTPUT_DIR时,您不是缺少{}吗? -
是的,去掉了 if 语句,让它更适合 Jupyter 之类的工作。现在,我的问题似乎与此更相关:stackoverflow.com/questions/51362560/…。如果你更喜欢把它写成答案,我可以给你打绿勾!
-
感谢您的确认,现在添加答案
标签: google-cloud-platform jupyter-notebook google-cloud-dataflow apache-beam