【问题标题】:How to create data frame from a particular portion of text log file using pyspark如何使用 pyspark 从文本日志文件的特定部分创建数据框
【发布时间】:2018-05-09 06:03:20
【问题描述】:

我是 pyspark 的新手... 我有一个大日志文件,其中包含如下数据:

sfdfd
fsdfsdffdhfgjgfjkyklhljk,erygrt,tegtyryu,.
sgsgggggfsdf

==========================================  
Roll Name   class  
==========================================  
1     avb    wer21g2
------------------------------------------  

===========================================  
empcode   Emnname   Dept   Address   
===========================================  
12d      sf        sdf22    dghsjf  
asf2    asdfw2     df21df   fsfsfg  
dsf21   sdf2       df2      sdgfsgf  
------------------------------------------- 

现在我想使用 Spark 和 python (Pyspark) 将此文件拆分为多个 RDD/Dataframe。我可以使用 APIHadoopFile 在 Scala 中执行此操作,现在我想在 Pyspark 中执行此操作。谁可以帮我这个事。

加速输出是:

Roll Name clas  
1   avb   wer21g2  


empcode   Emnname   Dept   Address  
12d      sf        sdf22    dghsjf  
asf2    asdfw2     df21df   fsfsfg  
dsf21   sdf2       df2      sdgfsgf  

这是我尝试过的代码:

with open(path) as f:
    out = []
    for line in f:
        if line.rstrip() == findStr:
            tmp = []
            tmp.append(line)
            for line in f:
               # print(line)
                if line.rstrip() == EndStr:
                    out.append(tmp)
                    break
                tmp.append(line)
f.close()

SMN_df = spark.createDataFrame(tmp, StringType()).show(truncate=False)

我能够创建数据框但没有得到预期的输出。任何人都可以帮助我。

更多详情请看附件截图。

数据集

【问题讨论】:

    标签: python apache-spark pyspark spark-dataframe


    【解决方案1】:
    from pyspark.sql import SparkSession
    import re
    
    
    spark=SparkSession.Builder.config("spark.sql.warehouse.dir","file://C:/temp")
    .appName("SparkSQL").getOrCreate()
    
    path="C:/Users/Rudrashis/Desktop/test2.txt"
    Txtpath="L:/SparkScala/test.csv"
    EndStr="---------------------------------"
    FilterStr="================================="
    def prepareDataset(Findstr):
    with open(path) as f:
        out=[]
        for line in f:
            if line.rstrip()==Findstr:
                tmp=[]
                tmp.append(re.sub("\s+",",",line.strip()))
                for line in f:
                    if line.rstrip()==EndStr:
                        out.append(tmp)
                        break
    
                    tmp.append(re.sub("\s+",",",line.strip()))
                return (tmp)
    f.close()
    
    def Makesv(Lstcommon):
    with open("test.csv","w")as outfile:
        for entries in map(str.strip(),Lstcommon):
            outfile.write(entries)
    outfile.close()
    
    ###For 1st block################
    LstStudent=[]
    LstStudent=prepareDataset("Roll  Name  Class")
    LstStudent.list(filter(lambda a: a!=FilterStr,LstStudent))
    createStudent=Makesv(LstStudent)
    
    Student_DF=spark.read.format('com.databricks.spark.csv')
    .options(header="true",inferschema="true").load(Txtpath)
    Student_DF.show(truncate=False)
    ######### end 1st block####
    
    #####2nd block start####
    LstEmp=[]
    LstEmp=prepareDataset("empcode   Emnname   Dept   Address")
    LstEmp.list(filter(lambda a: a!=FilterStr,LstEmp))
    CreateEmp=Makesv(LstEmp)
    
     Emp_DF=spark.read.format('com.databricks.spark.csv')
    .options(header="true",inferschema="true").load(Txtpath)
    Emp_DF.show(truncate=False)
    
    ##### end of 2nd block#####
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-12-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-17
      相关资源
      最近更新 更多