【问题标题】:Pyspark, when trying to join two RDD, I got a UnicodeEncode errorPyspark,在尝试加入两个 RDD 时,出现 UnicodeEncode 错误
【发布时间】:2018-09-24 18:06:27
【问题描述】:
UnicodeEncodeError: 'ascii' codec can't encode character u'\xa9' in position 261: ordinal not in range(128)

以上是我制作了一张新桌子后得到的。实际上我在下面使用了这个命令; table3 = u' '.join((table1, table2)).encode('utf-8').strip() 但它不起作用,我将把我的代码和真正的输出用于每个 RDD。

创建第一个 RDD 的代码

table1=sc.textFile('inventory').map(lambda line:next(csv.reader([line]))).map(lambda fields:((fields[0],fields[8],fields[10]),1))

第一个 RDD 实际输出

[(('BibNum', 'ItemCollection', 'ItemLocation'), 1),
(('3011076', 'ncrdr', 'qna'), 1),
 (('2248846', 'nycomic', 'lcy'), 1)]

创建第二个 RDD 的代码

table2=sc.textFile('checkouts').map(lambda line:next(csv.reader([line]))).map(lambda fields:((fields[0],fields[3],fields[5]),1))

第二个RDD实际输出

[(('BibNum', 'ItemCollection', 'CheckoutDateTime'), 1),
(('1842225', 'namys', '05/23/2005 03:20:00 PM'), 1), 
(('1928264', 'ncpic', '12/14/2005 05:56:00 PM'), 1),
(('1982511', 'ncvidnf', '08/11/2005 01:52:00 PM'), 1),
(('2026467', 'nacd', '10/19/2005 07:47:00 PM'), 1)]

最后,我尝试使用以下代码 table3 = u' '.join((table1, table2)).encode('utf-8').strip() 来连接 table1 和 table2。但它没有用。如果您对此错误有任何想法,请赐教。

【问题讨论】:

  • 您的文本文件中似乎存在编码问题...
  • 即使我能够从每个 RDD 中获取值,但它对于文本文件可能有问题吗?有什么办法可以解决吗?我想加入每张桌子。
  • @user8371915 我使用了普通的连接语法,但它发生了同样的错误,所以这就是我尝试上述语法的原因。
  • u' '.join((table1, table2)).encode('utf-8').strip() - 不是join 语法。它是与 Spark joins 无关的随机代码,除了名称。如果您的代码无论如何都失败了,那么显然错误不在这里。最好的猜测是在csv.reader([line]) 中,因为csv 阅读器不支持unicode。最后,如果您尝试按原样加入RDD,那么您没有阅读对 RDD 加入的期望。如果您希望有人能够回答这个问题,请以 minimal reproducible example 开头。

标签: python apache-spark unicode pyspark encode


【解决方案1】:

让我们看看我是否了解您的需求。
你有两个 rdds,你想根据两个值加入它们。
首先,您可以清理 rdd 的标题(第一行)。
然后定义连接的键,然后进行连接。
我将使用一种效果稍差但易于理解的清洁方法
(您可以在这里找到更有效的方法:Remove first element in RDD without using filter function

rdd1 = sc.parallelize([(('BibNum', 'ItemCollection', 'ItemLocation'), 1),
(('3011076', 'ncrdr', 'qna'), 1),
 (('2248846', 'nycomic', 'lcy'), 1),
                       (('1928264', 'ncpic', '12/14/2005 05:56:00 PM'), 1)])

rdd2 = sc.parallelize([(('BibNum', 'ItemCollection', 'CheckoutDateTime'), 1),
(('1842225', 'namys', '05/23/2005 03:20:00 PM'), 1), 
(('1928264', 'ncpic', '12/14/2005 05:56:00 PM'), 1),
(('1982511', 'ncvidnf', '08/11/2005 01:52:00 PM'), 1),
(('2026467', 'nacd', '10/19/2005 07:47:00 PM'), 1)])

rdd1_for_join = rdd1.zipWithIndex().filter(lambda x: x[1] != 0)\
.map(lambda x: ( (x[0][0][0], x[0][0][1]), x[0] ))

rdd2_for_join = rdd2.zipWithIndex().filter(lambda x: x[1] != 0)\
.map(lambda x: ( (x[0][0][0], x[0][0][1]), x[0] ))

print rdd1_for_join.join(rdd2_for_join).collect()

[(('1928264', 'ncpic'), ((('1928264', 'ncpic', '12/14/2005 05:56:00 PM'), 1), (('1928264', 'ncpic', '12/14/2005 05:56:00 PM'), 1)))]

【讨论】:

  • 先生,上面的真实输出是输出的一部分..所以我需要知道可以应用于整个csv文件的一般方式..但感谢您的回复。
  • “一般方式”是什么意思?我向您展示了如何使用两个键连接两个 rdd。无论 rdds 的大小如何,这都将起作用。您可以对更多或更少的键使用相同的方法。你还需要什么?
  • 很抱歉让您感到困惑,我的意思是我的 csv 文件中有很多行,但是当您使用 sc.parallerize 时,您会亲自输入值,所以只输入一些数据可以吗在里面??
  • 我使用 sc.parallelize 从您的示例数据中创建了一个 rdd,您不需要使用它,因为您已经加载了 rdds 对吗?我以为你的问题出在加入..
  • 是的,没错,我已经做了两个rdd,所以我的问题只是加入每个RDD,但它不起作用。
猜你喜欢
  • 2018-01-18
  • 2022-11-21
  • 1970-01-01
  • 1970-01-01
  • 2016-04-01
  • 1970-01-01
  • 2013-11-15
  • 1970-01-01
  • 2022-01-15
相关资源
最近更新 更多