【问题标题】:how do you turn a single csv file into multiple pcollections for apache beam如何将单个 csv 文件转换为 apache Beam 的多个 pcollections
【发布时间】:2020-03-29 14:50:29
【问题描述】:

我有一个 csv 文件,其中前几行作为 ids 和标签,其余行是实际数据。将前几行与 map 函数共享以将后续行与实际数据进行转换的最佳方法是什么?总的来说,我正在做类似于this question 的事情,但我不只是在顶部有标签,我还有一排额外的 ID。

数据看起来像这样:

-- ,id1 , id1 , id1 , id2 , id2 , id2
-- ,label,label,label,label,label,label
time1,data, data, data, data, data, data
time2,data, data, data, data, data, data

然后对于每个唯一的 id,我想将 id/time/dataobject 记录写入 bigquery。

基本上,我假设我需要有一个中间管道步骤,它将文件转换为多个 pcollections,我可以让下一步根据顶部行的值实际转换所有文件行。如果是这种情况,实现这一目标的最佳方法是什么?如果没有,我还能将前几行的值提供给其他行项的映射函数吗?

【问题讨论】:

    标签: python csv google-cloud-dataflow apache-beam


    【解决方案1】:

    一种可能的解决方案是修改上一个问题的自定义源。否则,您可以对数据进行初始传递,以将标题保存为主要处理步骤的辅助输入:

    input = p | 'Read CSV file' >> ReadFromText("input.csv")
    headers = input | 'Parse headers' >> beam.ParDo(ParseHeadersFn())
    rows = input | 'Parse data rows' >> beam.ParDo(ParseRowsFn(), beam.pvalue.AsList(headers))
    

    其中ParseHeadersFn 检查行是否以-- 开头以限定为标题,如果不需要则丢弃第一个字段:

    class ParseHeadersFn(beam.DoFn):
        """ParDo to output only the headers"""
        def process(self, element):
            if '--' in element.split(',')[0]:
              yield [','.join(element.split(',')[1:])]
    

    然后,在ParseRowsFn 中,我们可以访问headers 侧输入:

    class ParseRowsFn(beam.DoFn):
        """ParDo to process data rows according to header metadata"""
        def process(self, element, headers):
          if 'time1' in element.split(',')[0]:
            for id in headers[0]:
              print 'ids: ' + id
            for label in headers[1]:
              print 'labels: ' + label
    

    请注意,我假设 id 行将出现在标签行之前,但这可能不是真的,因为 Dataflow 是一个分布式系统。最好做一些更强的检查。

    如果我们的input.csv 是:

    --,id1,id1,id1,id2,id2,id2
    --,label1,label2,label3,label1,label2,label3
    time1,data1,data2,data3,data4,data5,data6
    time2,data7,data8,data9,data10,data11,data12
    

    示例输出:

    ids: id1 , id1 , id1 , id2 , id2 , id2
    labels: label1,label2,label3,label1,label2,label3
    

    使用的代码:gist中的script.py

    ParseRowsFn 可以用dict(zip(...)) 修改以获得所需的输出,但我不确定我是否理解输出结构。你需要这样的东西吗?

    id1,time1,data1,data2,data3
    id1,time2,data7,data8,data9
    id2,time1,data4,data5,data6
    id2,time2,data10,data11,data12
    

    如果是这种情况,我们可以使用answer 中的技巧来确定 ID 发生变化的位置并采取相应措施:

    class ParseRowsFn(beam.DoFn):
        """ParDo to process data rows according to header metadata"""
        def process(self, element, headers):
          # changing ids as per https://stackoverflow.com/a/28242076/6121516
          fields = element.split(',')
    
          if '--' not in fields[0]:
            ids = headers[0][0].split(',')
            labels = headers[1][0].split(',')
            id_changes = [i for i in range(1,len(ids)) if ids[i]!=ids[i-1]]
            id_changes.append(len(ids))
    
            for idx, change in enumerate(id_changes):
              row = {'timestamp': fields[0], 'id': ids[change - 1]}
              low = max(idx - 1, 0)
              row.update(dict(zip(labels[low:change], fields[low+1:change+1])))
              print row
              yield [row]
    

    示例输出:

    {'timestamp': u'time1', u'label2': u'data2', u'label3': u'data3', 'id': u'id1', u'label1': u'data1'}
    {'timestamp': u'time1', u'label2': u'data5', u'label3': u'data6', 'id': u'id2', u'label1': u'data4'}
    {'timestamp': u'time2', u'label2': u'data8', u'label3': u'data9', 'id': u'id1', u'label1': u'data7'}
    {'timestamp': u'time2', u'label2': u'data11', u'label3': u'data12', 'id': u'id2', u'label1': u'data10'}
    

    使用的代码:output.py 同gist

    【讨论】:

    • 我可能用 ids 东西把你送到了错误的路径……主要只是想要数据行的 map 函数可用的不同标题行。这似乎可以解决问题。我唯一的问题是,在每个项目上执行 ParDo 是否会有任何重大的性能问题只是为了获得前两个?正要测试它
    • 没问题,这就是为什么我分享了一个使元数据可用的最小示例和显示可能用例的扩展示例。确实,两次读取数据感觉有点多余,但我想到的其他解决方案会有其他问题(例如确保在读取数据之前解析标题)。最有效的方法是修改自定义源以使用接受多行标题的 csvreader
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-05
    • 1970-01-01
    相关资源
    最近更新 更多