【发布时间】:2016-10-25 02:55:17
【问题描述】:
我正在寻找一种使用 PySpark 将函数应用于 RDD 并将结果放入新列的方法。使用 DataFrames,看起来很简单: 给定:
rdd = sc.parallelize([(u'1751940903', u'2014-06-19', '2016-10-19'), (u'_guid_VubEgxvPPSIb7W5caP-lXg==', u'2014-09-10', '2016-10-19')])
我的代码可能如下所示:
df= rdd.toDF(['gigya', 'inscription','d_date'])
df.show()
+--------------------+-------------------------+----------+
| gigya| inscription| d_date|
+--------------------+-------------------------+----------+
| 1751940903| 2014-06-19|2016-10-19|
|_guid_VubEgxvPPSI...| 2014-09-10|2016-10-19|
+--------------------+-------------------------+----------+
然后:
from pyspark.sql.functions import split, udf, col
get_period_day = udf(lambda item : datetime.strptime(item, "%Y-%m-%d").timetuple().tm_yday)
df.select('d_date', 'gigya', 'inscription', get_period_day(col('d_date')).alias('period_day')).show()
+----------+--------------------+-------------------------+----------+
| d_date| gigya|inscription_service_6Play|period_day|
+----------+--------------------+-------------------------+----------+
|2016-10-19| 1751940903| 2014-06-19| 293|
|2016-10-19|_guid_VubEgxvPPSI...| 2014-09-10| 293|
+----------+--------------------+-------------------------+----------+
有没有办法在不需要将我的 RDD 转换为 DataFrame 的情况下做同样的事情?以地图为例..
这段代码只能给我一部分预期的结果:
rdd.map(lambda x: datetime.strptime(x[1], '%Y-%m-%d').timetuple().tm_yday).cache().collect()
帮助?
【问题讨论】:
标签: apache-spark pyspark spark-dataframe rdd pyspark-sql