【发布时间】:2015-03-11 20:21:57
【问题描述】:
我想filter 输出字段“状态”不等于“确定”的 RDD 元素。我从 HDFS 上的一组 CSV 文件创建我的 RDD,然后在尝试 filter 之前使用map 获得我想要的结构:
import csv, StringIO
files = "/hdfs_path/*.csv"
fields = ["time", "status"]
dial = "excel"
default = {'status': 'OK', 'time': '2014-01-01 00:00:00'}
def loadRecord(line, fieldnames, dialect):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames = fieldnames, dialect = dialect)
try:
line = reader.next()
if line is None:
return default
else:
return line
except:
return default
harmonics = sc.textFile(files) \
.map(lambda x: loadRecord(x, fields, dial)) \
.filter(lambda x: "OK" not in x['status'])
我可以对这个 RDD 做其他事情——例如另一个map 到get 仅某些字段等。但是,当我使用filter 运行我的代码时,其中一个任务总是失败,并在我的filter lambda 函数中出现异常:
'NoneType object is not iterable'
我认为这意味着filter lambda 正在接收None,因此我将代码添加到loadRecord 以避免返回None。但是,我仍然遇到同样的错误。它确实适用于小样本数据集,但我的实际数据足够大,我不确定如何检测它的哪些部分可能导致问题。
任何意见表示赞赏!
【问题讨论】:
标签: apache-spark pyspark