【问题标题】:Read CSV with Spark使用 Spark 读取 CSV
【发布时间】:2016-12-03 11:33:01
【问题描述】:

我正在使用以下命令通过 Spark 读取 csv 文件。

rdd=sc.textFile("emails.csv").map(lambda line: line.split(","))

我需要创建一个 Spark DataFrame。

我已使用以下命令将此 rdd 转换为 spark df:

dataframe=rdd.toDF()

但我需要在将 rdd 转换为 df 时指定 df 的架构。我试过这样做:(我只有 2 列文件和消息)

from pyspark import Row

email_schema=Row('file','message')

email_rdd=rdd.map(lambda r: email_schema(*r))

dataframe=sqlContext.createDataFrame(email_rdd)

但是,我收到错误消息: java.lang.IllegalStateException:输入行没有架构所需的预期值数量。需要 2 个字段,但提供 1 个值。

我还尝试使用以下方法读取我的 csv 文件:

rdd=sc.textFile("emails.csv").map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1]))

我收到错误:TypeError: 'list' object is not callable

我尝试使用 pandas 将我的 csv 文件读入 pandas 数据帧,然后将其转换为 spark DataFrame,但我的文件太大了。

我还补充了:

bin/pyspark --packages com.databricks:spark-csv_2.10:1.0.3

并使用以下内容读取我的文件:

df=sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('emails.csv')

我收到错误: java.io.IOException: (startline 1) EOF 在封装令牌完成之前到达

我已经浏览了其他几个相关的线程并按照上面的方法进行了尝试。谁能解释我哪里出错了?

[在 MacOSX 上使用 Python 2.7、Spark 1.6.2]

已编辑:

第 3 行如下。我只需要提取电子邮件的内容。我该怎么办?

1 艾伦-p/_sent_mail/1。 “消息 ID: 日期:2001 年 5 月 14 日星期一 16:39:00 -0700 (PDT) 来自:phillip.allen@enron.com 至:tim.belden@enron.com 主题: 哑剧版:1.0 内容类型:文本/纯文本; charset=us-ascii 内容传输编码:7bit X-来自:菲利普·K·艾伦 X-To:蒂姆·贝尔登 X-cc: X-密件抄送: X 文件夹:\Phillip_Allen_Jan2002_1\Allen, Phillip K.\'Sent Mail X-原点:Allen-P X-FileName: pallen (Non-Privileged).pst

这是我们的预测”

2 艾伦-p/_sent_mail/10。 “消息 ID: 日期:2001 年 5 月 4 日星期五 13:51:00 -0700 (PDT) 来自:phillip.allen@enron.com 至:john.lavorato@enron.com 主题:回复: 哑剧版:1.0 内容类型:文本/纯文本; charset=us-ascii 内容传输编码:7bit X-来自:菲利普·K·艾伦 X-To:约翰·J·拉沃拉托 X-cc: X-密件抄送: X 文件夹:\Phillip_Allen_Jan2002_1\Allen, Phillip K.\'Sent Mail X-原点:Allen-P X-FileName: pallen (Non-Privileged).pst

出差参加商务会议让旅途充满乐趣。特别是如果您必须准备演示文稿。我建议在这里举行商业计划会议,然后在没有任何正式商务会议的情况下进行旅行。我什至会尝试就是否需要或是否需要旅行获得一些诚实的意见。

就商务会议而言,我认为尝试激发不同群体之间关于哪些有效哪些无效的讨论会更有成效。演示者经常说话,而其他人则安静地等待轮到他们。如果以圆桌讨论的形式举行会议可能会更好。

我对去哪里的建议是奥斯汀。打高尔夫球,租一艘滑雪船和摩托艇。飞到某个地方需要太多时间。”

3 艾伦-p/_sent_mail/100。 “消息 ID: 日期:2000 年 10 月 18 日,星期三 03:00:00 -0700 (PDT) 来自:phillip.allen@enron.com 至:leah.arsdall@enron.com 主题:回复:测试 哑剧版:1.0 内容类型:文本/纯文本; charset=us-ascii 内容传输编码:7bit X-来自:菲利普·K·艾伦 X-To:利亚范阿斯达尔 X-cc: X-密件抄送: X-文件夹:\Phillip_Allen_Dec2000\Notes Folders\'sent mail X-原点:Allen-P X-文件名:pallen.nsf

测试成功。一路走好!!!”

【问题讨论】:

  • 您能否打印来自`emails.csv` 的前五行的样本(根据需要对数据进行匿名处理)?
  • line(line[0],line[1]).. line() 的外部使用意味着您正在尝试调用列表对象,因此出现错误

标签: python csv apache-spark dataframe rdd


【解决方案1】:

如果 RDD 适合内存,那么:

rdd.toPandas().to_csv('emails.csv')

如果没有,请使用 spark-csv 作为您的 spark 版本:

rdd.write.format('com.databricks.spark.csv').save('emails.csv')

在你上面的例子中:

rdd=....map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1]))

你不想:

rdd=....map(lambda line: line.split(",")).map(lambda line: (line[0], line[1]))

【讨论】:

  • 如何将 com.databricks.spark.csv 库附加到 spark?我使用了我的帖子中指定的格式。你知道我哪里错了吗?而且,我正在尝试读取文件。
  • 谢谢@Alexander。我修好了它。但是现在在读取文件时我得到 java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema。需要 2 个字段,同时提供 1 个值。我编辑了我的帖子以显示我的第 3 行。第一列是文件,它只是 1, 2 ,3 粗体。第二列是包含所有消息内容的消息。您能否告诉我如何更正此问题以按原样合并 2 列?
【解决方案2】:

如果你有一个巨大的文件,为什么不使用 pandas 数据框而不是一次加载所有文件,比如:

import pandas as pd
df_pd = pd.read_csv('myfilename.csv',chunksize = 10000)

for i,chunk in enumerate(df1):
    if i==0:
        df_spark = sqlContext.createDataFrame(chunk)
    else:
        df_spark = df_spark.unionAll(sqlContext.createDataFrame(chunk))

df_spark 将是您所需的 spark 数据框。这是低效的,但它会工作。对于其他一些实现相同的方法,您可以参考question的答案@

另一种可能的方法是使用 rdd 的 inferSchema 方法,但您需要在 csv 文件中有列名才能使用,请参阅this。 所以你可以这样做:

srdd = inferSchema(rdd)
email_rdd=rdd.map(lambda r: srdd(*r))

dataframe=sqlContext.createDataFrame(email_rdd)

【讨论】:

    猜你喜欢
    • 2015-11-11
    • 2018-03-25
    • 1970-01-01
    • 2016-01-03
    • 2018-04-26
    • 1970-01-01
    • 1970-01-01
    • 2015-12-04
    • 2020-08-05
    相关资源
    最近更新 更多