【问题标题】:Dataflow Streaming using Python SDK: Transform for PubSub Messages to BigQuery Output使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出
【发布时间】:2018-04-01 22:11:00
【问题描述】:

我正在尝试使用数据流来读取 pubsub 消息并将其写入大查询。 Google 团队为我提供了 alpha 访问权限,并且提供的示例可以正常工作,但现在我需要将其应用到我的场景中。

Pubsub 有效负载:

Message {
    data: {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
    attributes: {}
}

大查询架构:

schema='mac:STRING, status:INTEGER, datetime:TIMESTAMP',

我的目标是简单地读取消息有效负载并插入 bigquery。我正在努力了解转换以及如何将键/值映射到大查询模式。

我对此很陌生,因此感谢您提供任何帮助。

当前代码:https://codeshare.io/ayqX8w

谢谢!

【问题讨论】:

    标签: python google-bigquery google-cloud-dataflow apache-beam dataflow


    【解决方案1】:

    我能够通过定义一个将其加载到 json 对象中的函数来成功解析 pubsub 字符串(请参阅 parse_pubsub())。我遇到的一个奇怪问题是我无法在全局范围内导入 json。我收到“NameError:未定义全局名称'json'”错误。我必须在函数中导入 json。

    在下面查看我的工作代码:

    from __future__ import absolute_import
    
    import logging
    import argparse
    import apache_beam as beam
    import apache_beam.transforms.window as window
    
    '''Normalize pubsub string to json object'''
    # Lines look like this:
      # {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
    def parse_pubsub(line):
        import json
        record = json.loads(line)
        return (record['mac']), (record['status']), (record['datetime'])
    
    def run(argv=None):
      """Build and run the pipeline."""
    
      parser = argparse.ArgumentParser()
      parser.add_argument(
          '--input_topic', required=True,
          help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
      parser.add_argument(
          '--output_table', required=True,
          help=
          ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
           'or DATASET.TABLE.'))
      known_args, pipeline_args = parser.parse_known_args(argv)
    
      with beam.Pipeline(argv=pipeline_args) as p:
        # Read the pubsub topic into a PCollection.
        lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                    | beam.Map(parse_pubsub)
                    | beam.Map(lambda (mac_bq, status_bq, datetime_bq): {'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq})
                    | beam.io.WriteToBigQuery(
                        known_args.output_table,
                        schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP',
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
                )
    
    if __name__ == '__main__':
      logging.getLogger().setLevel(logging.INFO)
      run()
    

    【讨论】:

    【解决方案2】:

    写入 Python SDK 的 BigQuery 接收器的数据应采用字典的形式,其中字典的每个键给出 BigQuery 表的一个字段,对应的值给出要写入该字段的值。对于 BigQuery RECORD 类型,值本身应该是具有对应键值对的字典。

    我提交了一个 JIRA 来改进 Beam 中相应 python 模块的文档:https://issues.apache.org/jira/browse/BEAM-3090

    【讨论】:

    • 感谢您的反馈。经过一些更多的实验后,看起来传入的发布/订阅消息是作为字符串传入的(显然)。我必须应用将线条对象转换为字典的转换。我在数据流中遇到的错误消息是:Group: Expected Tuple[TypeVariable[K], TypeVariable[V]], got
    【解决方案3】:

    我有一个类似的用例(从 PubSub 将行作为字符串读取,将它们转换为 dicts 然后处理它们)。

    我正在使用ast.literal_eval(),这似乎对我有用。此命令将评估字符串,但以比eval() 更安全的方式(请参阅here)。它应该返回一个 dict,其键是字符串,并且值被评估为最可能的类型(int、str、float ...)。不过,您可能希望确保这些值采用正确的类型。

    这会给你一个像这样的管道

    import ast
    lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | "JSON row to dict" >> beam.Map(
                            lambda s: ast.literal_eval(s))
                | beam.io.WriteToBigQuery( ... )
            )
    

    我还没有使用过 BigQuery,所以我无法在最后一行为您提供帮助,但您所写的内容乍一看似乎是正确的。

    【讨论】:

      猜你喜欢
      • 2018-11-03
      • 1970-01-01
      • 2018-10-13
      • 2018-09-05
      • 1970-01-01
      • 2018-05-29
      • 1970-01-01
      • 2018-12-10
      • 1970-01-01
      相关资源
      最近更新 更多