【问题标题】:Converting complex RDD to a flatten RDD with PySpark使用 PySpark 将复杂的 RDD 转换为扁平的 RDD
【发布时间】:2017-01-13 10:09:30
【问题描述】:

我有以下 CSV(示例)

 id     timestamp         routeid   creationdate        parameters
 1000  21-11-2016 22:55     14      21-11-2016 22:55    RSRP=-102,
 1002  21-11-2016 22:55     14      21-11-2016 22:55    RA Req. SN=-146,TPC=4,RX Antennas=-8,   
 1003  21-11-2016 22:55     14      21-11-2016 22:55    RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,

基本上我想将参数从一列分成多列,如下所示:

id , timestamp, routeid, creationdate, RSRP ,RA REQ. SN, TPC,RX Antennas,MCS

因此,如果没有任何参数的值,我会将值设置为 NULL,例如:

 1000  21-11-2016 22:55     14      21-11-2016 22:55 -102 NULL NULL NULL NULL

如果值存在则填写行,

这是我尝试过的:

from pyspark import SparkContext
import os
import sys
from pyspark.sql import SQLContext
import itertools
import re

sc = SparkContext("local","Work")
sqlContext = SQLContext(sc)

df1 = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('file:///sample.csv')

def aaa(a):
    aa = a.split(',', 15000)
    filtered = filter(lambda p: not re.match(r'^\s*$', p), aa)           
    listWithNoEmptyLines = [z for z in filtered if z != []]

    for x in listWithNoEmptyLines:
       ab = x.split("=")
        AllList = []
        rsrp = "" 
        ra_req_sn = ""
        tpc = ""
        rx_antenas = ""
        mcs = ""
         if 'RSRP' in ab:
            rsrp = ab[1]
         else:
            rsrp = "NULL"
         if 'RA Req. SN' in ab:
            ra_req_sn = ab[1]
         else:
            ra_req_sn = "NULL"
         if 'TPC' in ab:
            tpc = ab[1]
         else:
            tpc = "NULL"
         if 'RX Antennas' in ab:
             rx_antenas = ab[1]
         else:
             rx_antenas = "NULL"
         if 'MCS' in ab:
             mcs = ab[1]
         else:
            mcs = "NULL"
    return rsrp,ra_req_sn,tpc,rx_antenas
DFtoRDD  = df1.rdd.map(list).map(lambda x: [str(x[1]), str(x[2]), str(x[3]), aaa(str(x[4]))])
print DFtoRDD.collect()

给我以下结果,

[['1000','21-11-2016 22:55', '14', '21-11-2016 22:55', ('-102', 'NULL', 'NULL', 'NULL')], ['1002',21-11-2016 22:55', '14', '21-11-2016 22:55', ('NULL', '-146', 'NULL', 'NULL')], ['1003','21-11-2016 22:55', '14', '21-11-2016 22:55', ('NULL', '134', 'NULL', 'NULL')]]

预期结果:

   id     timestamp         routeid   creationdate        RSRP    RA Req. SN   TPC   RX Antennas MCS
  1000  21-11-2016 22:55     14      21-11-2016 22:55    -102      NULL        NULL    NULL       NULL
  1002  21-11-2016 22:55     14      21-11-2016 22:55    NULL    -146         4        -8        NULL   
  1003  21-11-2016 22:55     14      21-11-2016 22:55    NULL    134       -191       -91       -83

【问题讨论】:

  • 我不明白你的问题。
  • 这更像是一个正则表达式问题而不是 spark。
  • 是的,所以在函数 aaa 我们如何应用正则表达式?还是我们可以创建数据框并对其进行转换? @elisash 我想通过检查其中的属性来分隔参数列并为每个属性及其值创建列

标签: python apache-spark pyspark apache-spark-sql


【解决方案1】:

您需要按如下方式定义一个 udf,然后选择每个字段。我使用了与制表符分隔符相同的数据。

from pyspark.sql.functions import udf
from pyspark.sql.types import *

df1 = spark.read.format('com.databricks.spark.csv').options(header='true',delimiter='\t').load('./sample.txt')
df1.show()
# +----+----------------+-------+----------------+--------------------+
# |  id|       timestamp|routeid|    creationdate|          parameters|
# +----+----------------+-------+----------------+--------------------+
# |1000|21-11-2016 22:55|     14|21-11-2016 22:55|          RSRP=-102,|
# |1002|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=-146,T...|
# |1003|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=134,RX...|
# +----+----------------+-------+----------------+--------------------+

现在让我们像上面提到的那样定义我们的 UDF:

import re
def f_(s):
    pattern = re.compile("([^,=]+)=([0-9\-]+)")
    return dict(pattern.findall(s or "")) 

我们可以直接在“简单”样本上测试函数:

f_("RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,")
# {'RA Req. SN': '134', 'RX Antennas': '-91', 'TPC': '-191', 'MCS': '-83'}

好的,它正在工作。我们现在可以注册在 SQL 中使用:

spark.udf.register("f", f_, MapType(StringType(), StringType()))

spark.sql("SELECT f('RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,')").show()
# +---------------------------------------------------+
# |f(RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,)|
# +---------------------------------------------------+
# |                               Map(RA Req. SN ->...|
# +---------------------------------------------------+

但就您而言,我认为您会对每个字段的实际 udf 感兴趣:

extract = udf(f_,  MapType(StringType(), StringType()))

df1.select(df1['*'], extract(df1['parameters']).getItem('RSRP').alias('RSRP')).show()
# +----+----------------+-------+----------------+--------------------+----+
# |  id|       timestamp|routeid|    creationdate|          parameters|RSRP|
# +----+----------------+-------+----------------+--------------------+----+
# |1000|21-11-2016 22:55|     14|21-11-2016 22:55|          RSRP=-102,|-102|
# |1002|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=-146,T...|null|
# |1003|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=134,RX...|null|
# +----+----------------+-------+----------------+--------------------+----+

【讨论】:

  • 感谢 eliasah,如何在我的代码中添加您的解决方案,我没有得到。你能帮忙吗
  • 非常感谢 eliasah,你真的救了我
  • 好的,先生。我可以在单个语句中应用多个 getItem() 吗??
猜你喜欢
  • 2023-03-13
  • 1970-01-01
  • 1970-01-01
  • 2016-05-29
  • 2017-01-20
  • 2016-01-03
  • 2017-11-02
  • 2020-12-06
  • 2015-12-22
相关资源
最近更新 更多