【发布时间】:2020-03-27 17:26:09
【问题描述】:
我有 2 个数据流流式处理管道(pubsub 到 bigquery),代码如下:
class transform_class(beam.DoFn):
def process(self, element, publish_time=beam.DoFn.TimestampParam, *args, **kwargs):
logging.info(element)
yield element
class identify_and_transform_tables(beam.DoFn):
#Adding Publish Timestamp
#Since I'm reading from a topic that consist data from multiple tables,
#function here is to identify the tables and split them apart
def run(pipeline_args=None):
# `save_main_session` is set to true because some DoFn's rely on
# globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True)
with beam.Pipeline(options=pipeline_options) as pipeline:
lines = (pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic='topic name',with_attributes = True)
| 'Transforming Messages' >> beam.ParDo(transform_class())
| 'Identify Tables' >> beam.ParDo(identify_and_transform_tables()).with_outputs('table_name'))
table_name = lines.table_name
table_name = (table_name
| 'Write table_name to BQ' >> beam.io.WriteToBigQuery(
table='table_name',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = pipeline.run()
这两个管道都从同一个 pubsub 主题中读取。 在协调时,我发现丢失了一些数据,并且两个管道的丢失数据不同。 例如,
管道 1 中缺少第 56-62 行,但存在于管道 2 中
管道 2 中缺少第 90-95 行,但存在于管道 1 中
因此,这意味着数据存在于 pubsub 主题中。
正如您在代码中看到的,第一个功能是将 pubsub 消息直接记录到 stackdriver 中。
除了 bigquery,我仔细检查了堆栈驱动程序日志中丢失的数据。
我发现的另一件事是这些丢失的数据是在一段时间内发生的。 例子, 第 56-62 行的时间戳为“2019-12-03 05:52:18.754150 UTC”并接近(毫秒)
因此,我唯一的结论是数据流 readfrompubsub 有时会丢失数据?
非常感谢任何帮助。
【问题讨论】:
标签: python google-bigquery google-cloud-dataflow apache-beam google-cloud-pubsub