【问题标题】:Parse Pyspark RDD of Key/Value Pairs to .csv Format将键/值对的 Pyspark RDD 解析为 .csv 格式
【发布时间】:2017-08-02 15:41:58
【问题描述】:

我正在构建一个解析器,它接受“key”="value" 对的原始文本文件并使用 PySpark 写入表格/.csv 结构。

我卡住的地方是,我可以在一个函数中访问它们的键和值来构造每个csv_row,甚至检查键是否等于预期键的列表 (col_list),但正如我所说的那样函数 processCsv 在 lambda 中,我不知道如何将每个 csv_row 附加到用于保存 .csv 行的最终列表的全局列表 l_of_l 中。

如何以键/值格式遍历 RDD 的每条记录并解析为 .csv 格式?如您所见,我的最终列表 (l_of_l) 是空的,但我可以得到循环中的每一行......令人沮丧。

感谢所有建议!

原始文本结构(foo.log):

"A"="foo","B"="bar","C"="baz"
"A"="oof","B"="rab","C"="zab"
"A"="aaa","B"="bbb","C"="zzz"

到目前为止的方法:

from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import Row

sc=SparkContext('local','foobar')
sql = SQLContext(sc)

# Read raw text to RDD
lines=sc.textFile('foo.log')
records=lines.map(lambda x: x.replace('"', '').split(","))

print 'Records pre-transform:\n'
print records.take(100)
print '------------------------------\n'

def processRecord(record, col_list):    
    csv_row=[]
    for idx, val in enumerate(record):
        key, value = val.split('=')        
        if(key==col_list[idx]):
            # print 'Col name match'
            # print value
            csv_row.append(value)
        else:
            csv_row.append(None)
            print 'Key-to-Column Mismatch, dropping value.'
    print csv_row
    global l_of_l
    l_of_l.append(csv_row)

l_of_l=[]
colList=['A', 'B', 'C']
records.foreach(lambda x: processRecord(x, col_list=colList))

print 'Final list of lists:\n'
print l_of_l

输出:

Records pre-transform:
[[u'A=foo', u'B=bar', u'C=baz'], [u'A=oof', u'B=rab', u'C=zab'], [u'A=aaa', u'B=bbb', u'C=zzz']]
------------------------------

[u'foo', u'bar', u'baz']
[u'oof', u'rab', u'zab']
[u'aaa', u'bbb', u'zzz']

Final list of lists:
[]

【问题讨论】:

  • 你能举个例子说明l_of_l 应该是什么,即,确切的期望输出是什么?
  • 当然@desertnaut,感谢您的关注。我希望print l_of_l 产生:[['foo', 'bar', 'baz'], ['oof', 'rab', 'zab'], ['aaa', 'bbb', 'zzz']]
  • 由于对问题的简洁明了的阐述而被赞成 - 现在在 SO 中很少见,尤其是来自新用户......
  • 感谢@desertnaut,我在 java-forum 和类似的地方 xD 上学到了关于 SSCCE 的艰难方法

标签: python parsing apache-spark lambda pyspark


【解决方案1】:

试试这个功能:

def processRecord(record, col_list):    
    csv_row=list()
    for idx, val in enumerate(record):
        key, value = val.split('=')        
        if(key==col_list[idx]):
            # print 'Col name match'
            # print value
            csv_row.append(value)
        else:
            csv_row.append(None)
            # print 'Key-to-Column Mismatch, dropping value.'
    return csv_row

然后

colList=['A', 'B', 'C']
l_of_l = records.map(lambda x: processRecord(x, col_list=colList)).collect()

print 'Final list of lists:\n'
print l_of_l

应该给

Final list of lists: 
[[u'foo', u'bar', u'baz'], [u'oof', u'rab', u'zab'], [u'aaa', u'bbb', u'zzz']]

【讨论】:

  • 高手!非常感谢,我从来没有想过将 l_of_l 设置为 records.map* 显然我还有很多东西要学——PySpark 的整个匿名函数方面对我来说是一个挑战,你能推荐任何好的学习材料/来源? (再次感谢!:))
  • 不客气。不幸的是,除了文档之外,我不知道 PySpark 有任何好的来源......
  • 好吧,我想它就像任何新事物一样,通过猛击它来学习,并在 SO 上的朋友的慷慨帮助下学习 :) 。希望一旦我对 PySpark 有了更好的掌握,我自己就可以在这里做出贡献 - 使用 PySpark 的第一周。
  • 有没有办法在数据框中做到这一点?
猜你喜欢
  • 2016-01-15
  • 2021-03-03
  • 2021-03-11
  • 2020-10-08
  • 1970-01-01
  • 2019-11-02
  • 2019-12-22
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多