【问题标题】:Modify MinimalWordCount example to read from BigQuery修改 MinimalWordCount 示例以从 BigQuery 中读取
【发布时间】:2017-10-07 20:38:03
【问题描述】:

我正在尝试修改 Apache Beam 的 MinimalWordCount python 示例以从 BigQuery 表中读取。我已经进行了以下修改,并且我似乎使查询正常工作,但示例。

这里的原始示例:

 with beam.Pipeline(options=pipeline_options) as p:

    # Read the text file[pattern] into a PCollection.
    lines = p | ReadFromText(known_args.input)

    # Count the occurrences of each word.
    counts = (
        lines
        | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                      .with_output_types(unicode))
        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum))

    # Format the counts into a PCollection of strings.
    output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))

    # Write the output using a "Write" transform that has side effects.
    # pylint: disable=expression-not-assigned
    output | WriteToText(known_args.output)

而不是 ReadFromText 我正在尝试调整它以从 BigQuery 表中的列中读取。为此,我已将 lines = p | ReadFromText(known_args.input) 替换为以下代码:

query = 'SELECT text_column FROM `bigquery.table.goes.here` '
lines = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))

当我重新运行管道时,出现错误:“WARNING:root:A task failed with exception. expected string or buffer [while running 'Split']

我认识到“拆分”操作需要一个字符串,但显然没有得到一个字符串。如何修改“ReadFromBigQuery”以使其传递字符串/缓冲区?我是否需要提供表架构或其他东西来将“ReadFromBigQuery”的结果转换为字符串缓冲区?

【问题讨论】:

    标签: python google-cloud-platform google-cloud-dataflow apache-beam


    【解决方案1】:

    这是因为BigQuerySource 返回字典的PCollection (dict),其中字典中的每个键代表一列。对于您的情况,最简单的做法就是在beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True) 之后应用beam.Map,如下所示:

    lines = (p 
    |"ReadFromBigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)) 
    | "Extract text column" >>  beam.Map(lambda row: row.get("text_column"))
             )
    

    如果您遇到列名问题,请尝试将其更改为u"text_column"

    或者您可以修改您的 Split 转换以提取那里的列值:

    'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x.get("text_column")))
                          .with_output_types(unicode))
    

    【讨论】:

    • 谢谢!实际上,我刚刚尝试了以下同样有效的方法'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x["text_column"])).with_output_types(unicode)) 这只是语法问题还是它的操作方式有什么真正的区别?
    • 这不是语法,而是这些来源的不同行为 - 一个输出原始文本,另一个输出字典。您可以让来源返回任何内容。
    • 应该以哪种格式编写查询?我尝试了多种格式,但看不到结果:'SELECT * FROM `xxx,TABLE`''SELECT * FROM xxx.TABLE' ?
    猜你喜欢
    • 1970-01-01
    • 2017-08-18
    • 1970-01-01
    • 2015-04-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-21
    • 1970-01-01
    相关资源
    最近更新 更多