【发布时间】:2019-02-10 22:43:05
【问题描述】:
我已从 Dataprep 导出了 Cloud Dataflow 模板,如下所述:
https://cloud.google.com/dataprep/docs/html/Export-Basics_57344556
在 Dataprep 中,流程通过通配符从 Google Cloud Storage 中提取文本文件,转换数据,并将其附加到现有 BigQuery 表中。一切按预期工作。
但是,当尝试从导出的模板启动 Dataflow 作业时,我似乎无法正确设置启动参数。错误消息并不过分具体,但很明显,一方面,我没有正确获取位置(输入和输出)。
此用例的唯一 Google 提供的模板(位于 https://cloud.google.com/dataflow/docs/guides/templates/provided-templates#cloud-storage-text-to-bigquery)不适用,因为它使用 UDF 并且还在批处理模式下运行,覆盖任何现有的 BigQuery 表而不是追加。
从 Dataprep 检查原始 Dataflow 作业详细信息会显示许多参数(可在元数据文件中找到),但我无法让这些参数在我的代码中工作。以下是此类失败配置的示例:
import time
from google.cloud import storage
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
def dummy(event, context):
pass
def process_data(event, context):
credentials = GoogleCredentials.get_application_default()
service = build('dataflow', 'v1b3', credentials=credentials)
data = event
gsclient = storage.Client()
file_name = data['name']
time_stamp = time.time()
GCSPATH="gs://[path to template]
BODY = {
"jobName": "GCS2BigQuery_{tstamp}".format(tstamp=time_stamp),
"parameters": {
"inputLocations" : '{{\"location1\":\"[my bucket]/{filename}\"}}'.format(filename=file_name),
"outputLocations": '{{\"location1\":\"[project]:[dataset].[table]\", [... other locations]"}}',
"customGcsTempLocation": "gs://[my bucket]/dataflow"
},
"environment": {
"zone": "us-east1-b"
}
}
print(BODY["parameters"])
request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
response = request.execute()
print(response)
以上示例指示无效字段(“location1”,我从已完成的 Dataflow 作业中提取。我知道我需要指定 GCS 位置、模板位置和 BigQuery 表,但在任何地方都没有找到正确的语法. 如上所述,我在作业生成的元数据文件中找到了字段名称和示例值。
我意识到这个特定的用例可能不会敲响任何警钟,但总的来说,如果有人成功确定并为从 Dataprep 导出的 Dataflow 作业使用了正确的启动参数,我将不胜感激了解更多相关信息。谢谢。
【问题讨论】:
-
问题很可能是您如何为参数设置模板(您需要使用 ValueProvider,...)。在创建管道之前显示管理参数的代码。另请查看:cloud.google.com/dataflow/docs/guides/templates/…
-
@JohnHanley 感谢您的回复。在这种情况下,我没有使用 Beam SDK,而是使用 Cloud Function 中的通用 Python(通过 Google API 客户端模块)。看起来这些错误可能就像没有在 BODY json 中转义/引用值一样愚蠢,因为我在其他地方找到了一些示例,这些示例已经让我完成了 80% 的工作。 Dataflow 作业现在正在正确启动(并且看似正在运行),直到它失败的输出步骤...... Stackdriver 中的错误是“无法获取 location1 的值”,这让我认为输出参数是错误的......我更新了我上面的代码以显示最新版本。
-
深入挖掘 Stackdriver 中的错误,我认为这是错误的直接原因:'Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{ ' (code 123)): was expecting double-quote to start field name' 所以很明显,我在别处找到的方法不太奏效。问题仍然是如何正确地将这些字符串放在一起,以便作业启动器对其进行解析......
-
为什么会有这个:
"inputLocations" : '{{\"location1\":\"[my bucket]/{filename}\"}}'.format(filename=file_name),?数据流参数就像命令行参数 (--input=example.csv)。没什么复杂的。显示处理命令行的代码,这样我们就不会猜测了。 -
没有命令行 -- 这是 REST API 模板启动方法使用的语法(尽管目前已损坏):cloud.google.com/dataflow/docs/reference/rest/v1b3/… 调用它的代码在请求和响应位中最后。
标签: google-cloud-dataflow google-cloud-dataprep