【发布时间】:2019-09-14 23:48:57
【问题描述】:
我希望有人可以帮助我,因为我现在处于死胡同????我正在学习 Apache Beam 并正在研究从 PubSub 源读取的流式管道(Python 3.7、apache-beam v 2.15、DirectRunner)。
对同一个 PCollection 应用多个转换过程以产生多个输出,最终将输出下沉到 2 个不同的 BQ 表中。
但是,当我运行管道时,似乎这些转换中的每一个都在相互影响(通过改变元素),从而在执行 bigquery.WriteToBigQuery 转换时导致架构不匹配
每个 PubSub 消息以及消息属性的格式如下:
{'data': json.dumps({ "key1": "val1", "key2": "val2"}).encode(),
'attributes':{'evt_time': '2019-09-14T22:12:43.323546Z'}}
我想阅读这些消息并转储到 2 个 BQ 表:
table1
raw_table架构:evt_time,date(存储原始数据以及从消息属性中提取的 event_time){'evt_time': '2019-09-14T22:12:43.323546Z', 'payload': '{"key1": "val1", "key2": "val2"}'}table2
parsed_table架构:evt_time,key1,key2(将数据中的每个 KV 对解析并存储为平面表以及 event_time 提取的表单消息属性){'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}
到目前为止这是我的伪代码
1)
消息被读入管道,然后应用映射变换,它基本上将data 提取到payload 字段和evt_time 到evt_time 字段
row = p
| "read_sub" >> beam.Create([ {'data': json.dumps({ "key1": "val1", "key2": "val2"}).encode(),
'attributes':{'evt_time': '2019-09-14T22:12:43.323546Z'}}])
| "add_timestamps" >> beam.Map(add_timestamps)`
2)
然后我将row 存储到table1 raw_table
row | "raw_stream_to_bq" >>bigquery.WriteToBigQuery(project = PROJECT,
dataset = BQ_DATASET, table ="test_raw", schema=SCHEMA_RAW_TABLE)
3) 在将每个消息元素强到 table2 parsed_table 之前,对 row 提取 KV 对进行进一步解析
row | "parse" >> beam.Map(parse_payload) \
| "parsed_stream_to_bq" >> bigquery.WriteToBigQuery(project=PROJECT,
dataset=BQ_DATASET, table="test_parsed", schema=SCHEMA_PARSED_TABLE)
以下映射函数add_timestamps和parse_payload定义如下:
def add_timestamps(e,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
payload = e['data'].decode()
evt_time = e['attributes']['evt_time']
row = {'evt_time' : evt_time, 'payload' : payload}
return row
def parse_payload(e,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
payload = json.loads(e.pop('payload'))
e.update(payload)
return e
但是,当我运行管道时,它会给出如下 BQ 流插入错误,并且由于下游步骤 #3,我感觉到,当应用步骤 #2 BQ 写入转换时,每个消息行元素已经被解析/变异,并且因此,由于模式不匹配,第 2 步中的 BQ 流插入失败。
错误是 [
错误:[
调试信息:''
位置:'key1'
消息:'没有这样的字段。'
原因:'无效'>]
索引:0>]
更新:
1) 当我使用 WriteToText 转换而不是 WriteToBigQuery 时,它可以正常工作,正如预期的那样
第 1 步输出:
{'evt_time': '2019-09-14T22:12:43.323546Z', 'payload': '{"key1": "val1", "key2": "val2"}'}第 3 步地图变换输出:
{'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}
2) 但是,当使用WriteToBigQuery 时,上述两个步骤都会输出相同的已解析 KV 对 {'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}
更新 2 - 已解决
我在下面的@Guillem 建议之后做了两个更改(现在它也适用于 DirectRunner)
3) 避免就地修改元素
4) 从bigquery.WriteToBigQuery 中删除schema= 参数并在启动管道之前在首次写入时注意到以下错误后预构建BQ 表
文件“/PATH_TO_PY/python3.7/site-packages/apitools/base/py/base_api.py”,第 604 行,在 __ProcessHttpResponse 中
http_response、method_config=method_config、request=request) apitools.base.py.exceptions.HttpConflictError: HttpError 访问https://www.googleapis.com/bigquery/v2/projects/arctic-rite-234823/datasets/pipeline/tables?alt=json: response: ,内容
“错误”:{
“代码”:409,
"message": "已经存在:表 PROJECT_ID:DATASET_ID.TABLE_ID",
“错误”:[
{ "message": "Already Exists: Table PROJECT_ID:DATASET_ID.TABLE_ID", "domain": "global", "reason": "duplicate" }],
“状态”:“ALREADY_EXISTS”
}
【问题讨论】:
标签: python-3.x google-bigquery apache-beam