【问题标题】:Beam/Google Cloud Dataflow ReadFromPubsub Missing DataBeam/Google Cloud Dataflow ReadFromPubsub 缺少数据
【发布时间】:2020-03-27 17:26:09
【问题描述】:

我有 2 个数据流流式处理管道(pubsub 到 bigquery),代码如下:

class transform_class(beam.DoFn):

    def process(self, element, publish_time=beam.DoFn.TimestampParam, *args, **kwargs):
        logging.info(element)
        yield element

class identify_and_transform_tables(beam.DoFn):
    #Adding Publish Timestamp
    #Since I'm reading from a topic that consist data from multiple tables, 
    #function here is to identify the tables and split them apart


def run(pipeline_args=None):
    # `save_main_session` is set to true because some DoFn's rely on
    # globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        lines = (pipeline 
                | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic='topic name',with_attributes = True)
                | 'Transforming Messages' >> beam.ParDo(transform_class())
                | 'Identify Tables' >> beam.ParDo(identify_and_transform_tables()).with_outputs('table_name'))

        table_name = lines.table_name
        table_name = (table_name 
                        | 'Write table_name to BQ' >> beam.io.WriteToBigQuery(
                        table='table_name',
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
                        )

    result = pipeline.run()

这两个管道都从同一个 pubsub 主题中读取。 在协调时,我发现丢失了一些数据,并且两个管道的丢失数据不同。 例如,

管道 1 中缺少第 56-62 行,但存在于管道 2 中
管道 2 中缺少第 90-95 行,但存在于管道 1 中

因此,这意味着数据存在于 pubsub 主题中。
正如您在代码中看到的,第一个功能是将 pubsub 消息直接记录到 stackdriver 中。 除了 bigquery,我仔细检查了堆栈驱动程序日志中丢失的数据。

我发现的另一件事是这些丢失的数据是在一段时间内发生的。 例子, 第 56-62 行的时间戳为“2019-12-03 05:52:18.754150 UTC”并接近(毫秒)

因此,我唯一的结论是数据流 readfrompubsub 有时会丢失数据?
非常感谢任何帮助。

【问题讨论】:

    标签: python google-bigquery google-cloud-dataflow apache-beam google-cloud-pubsub


    【解决方案1】:

    我不确定在这种情况下发生了什么,但这是防止数据丢失的重要规则:

    • 不要阅读主题,如beam.io.ReadFromPubSub(topic='topic name')
    • 从订阅中读取,如beam.io.ReadFromPubSub(subscription='subscription name')

    这是因为在重新启动的情况下,将在第一种情况下创建一个新订阅 - 此订阅可能仅包含创建后收到的数据。如果您事先创建订阅,数据将保留在其中,直到它被读取(或过期)。

    【讨论】:

    • 您好,我听从了您的建议,并创建了一个订阅同一主题的订阅。然后,我从所述订阅中创建了第三个管道读数。我最终得到了相同的结果,这次第三个管道(从订阅读取)也丢失了与第一和第二个管道不同的数据。有什么想法吗?
    • 这真的很有趣!因此,在通过第三个管道时,每一行都记录在 Stack Driver 中,但它从未登陆 BigQuery?传输了多少数据? (如果我想复制场景)
    • 不,先生,stackdriver 也缺少行。它从未被阅读过。我的数据量并不大,目前高峰时每秒大约1到2k条记录,慢速时不到20条记录。
    • 如果行在 stackdriver 但不在 bigquery 中,我会得出结论认为我的代码有问题,但目前情况并非如此
    • @FelipeHoffa,好建议,但在某些情况下,在消息到达目的地之前发送 ACK 消息(例如用 BQ 编写)
    猜你喜欢
    • 1970-01-01
    • 2018-01-25
    • 1970-01-01
    • 2019-05-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-22
    • 1970-01-01
    相关资源
    最近更新 更多