【问题标题】:Applying multiple transforms to the PCollection :: Error WriteToBigQuery将多个转换应用于 PCollection :: Error WriteToBigQuery
【发布时间】:2019-09-14 23:48:57
【问题描述】:

我希望有人可以帮助我,因为我现在处于死胡同????我正在学习 Apache Beam 并正在研究从 PubSub 源读取的流式管道(Python 3.7、apache-beam v 2.15、DirectRunner)。

对同一个 PCollection 应用多个转换过程以产生多个输出,最终将输出下沉到 2 个不同的 BQ 表中。

但是,当我运行管道时,似乎这些转换中的每一个都在相互影响(通过改变元素),从而在执行 bigquery.WriteToBigQuery 转换时导致架构不匹配

每个 PubSub 消息以及消息属性的格式如下:

{'data': json.dumps({ "key1": "val1", "key2": "val2"}).encode(), 'attributes':{'evt_time': '2019-09-14T22:12:43.323546Z'}}

我想阅读这些消息并转储到 2 个 BQ 表:

  • table1 raw_table 架构:evt_time,date(存储原始数据以及从消息属性中提取的 event_time){'evt_time': '2019-09-14T22:12:43.323546Z', 'payload': '{"key1": "val1", "key2": "val2"}'}

  • table2 parsed_table 架构:evt_time,key1,key2(将数据中的每个 KV 对解析并存储为平面表以及 event_time 提取的表单消息属性){'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}

到目前为止这是我的伪代码

1) 消息被读入管道,然后应用映射变换,它基本上将data 提取到payload 字段和evt_timeevt_time 字段

row = p 
| "read_sub" >> beam.Create([ {'data': json.dumps({ "key1": "val1", "key2": "val2"}).encode(),
'attributes':{'evt_time': '2019-09-14T22:12:43.323546Z'}}])
| "add_timestamps" >> beam.Map(add_timestamps)`

2) 然后我将row 存储到table1 raw_table

row | "raw_stream_to_bq" >>bigquery.WriteToBigQuery(project = PROJECT,
 dataset = BQ_DATASET, table ="test_raw", schema=SCHEMA_RAW_TABLE)

3) 在将每个消息元素强到 table2 parsed_table 之前,对 row 提取 KV 对进行进一步解析

row | "parse" >> beam.Map(parse_payload) \
    | "parsed_stream_to_bq" >> bigquery.WriteToBigQuery(project=PROJECT, 
dataset=BQ_DATASET, table="test_parsed", schema=SCHEMA_PARSED_TABLE)

以下映射函数add_timestampsparse_payload定义如下:

def add_timestamps(e,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
        payload = e['data'].decode()
        evt_time = e['attributes']['evt_time']
        row = {'evt_time' : evt_time, 'payload' : payload}
        return row
def parse_payload(e,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
        payload = json.loads(e.pop('payload'))
        e.update(payload)
        return e

但是,当我运行管道时,它会给出如下 BQ 流插入错误,并且由于下游步骤 #3,我感觉到,当应用步骤 #2 BQ 写入转换时,每个消息行元素已经被解析/变异,并且因此,由于模式不匹配,第 2 步中的 BQ 流插入失败。

错误是 [

错误:[

调试信息:''

位置:'key1'

消息:'没有这样的字段。'

原因:'无效'>]

索引:0>]

更新

1) 当我使用 WriteToText 转换而不是 WriteToBigQuery 时,它可以正常工作,正如预期的那样

  • 第 1 步输出:{'evt_time': '2019-09-14T22:12:43.323546Z', 'payload': '{"key1": "val1", "key2": "val2"}'}

  • 第 3 步地图变换输出:{'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}

2) 但是,当使用WriteToBigQuery 时,上述两个步骤都会输出相同的已解析 KV 对 {'evt_time': '2019-09-14T22:12:43.323546Z', 'key1': 'val1', 'key2': 'val2'}

更新 2 - 已解决

我在下面的@Guillem 建议之后做了两个更改(现在它也适用于 DirectRunner)

3) 避免就地修改元素

4) 从bigquery.WriteToBigQuery 中删除schema= 参数并在启动管道之前在首次写入时注意到以下错误后预构建BQ 表

文件“/PATH_TO_PY/python3.7/site-packages/apitools/base/py/base_api.py”,第 604 行,在 __ProcessHttpResponse 中

http_response、method_config=method_config、request=request) apitools.base.py.exceptions.HttpConflictError: HttpError 访问https://www.googleapis.com/bigquery/v2/projects/arctic-rite-234823/datasets/pipeline/tables?alt=json: response: ,内容

“错误”:{

“代码”:409,

"message": "已经存在:表 PROJECT_ID:DATASET_ID.TABLE_ID",

“错误”:[

 {

   "message": "Already Exists: Table PROJECT_ID:DATASET_ID.TABLE_ID",

   "domain": "global",

   "reason": "duplicate"

 }

],

“状态”:“ALREADY_EXISTS”

}

【问题讨论】:

    标签: python-3.x google-bigquery apache-beam


    【解决方案1】:

    注释掉第二次写入可以让第一次成功,所以这似乎是parse_payloadDirectRunner 的问题。

    要修复它,您可以通过在 Map 函数内复制来避免就地修改元素:

    def parse_payload(element,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
            e = element.copy()
            payload = json.loads(e.pop('payload'))
            e.update(payload)
            return e
    

    使用DataflowRunner 无需更改即可正常工作:

    由于图表按预期构建(两个单独的分支):

    完整复制code.

    【讨论】:

    • 感谢您的建议。已解决,请参阅上面的 更新 2,工作 code。它有效,但我仍然没有得到 #3) 我认为每个变换都是不可变的,因此下游的任何更改都应该影响上游步骤中的元素 #4) 似乎多个工作人员同时尝试创建一个新表,如果它不存在并因此出现错误...
    猜你喜欢
    • 2021-05-15
    • 2010-11-09
    • 1970-01-01
    • 2022-12-31
    • 2022-08-16
    • 1970-01-01
    • 1970-01-01
    • 2021-12-14
    • 2023-02-03
    相关资源
    最近更新 更多