【问题标题】:How to use writeStream from a pyspark streaming dataframe to chop the values into different columns?如何使用 pyspark 流数据帧中的 writeStream 将值分成不同的列?
【发布时间】:2021-12-24 01:46:36
【问题描述】:

我正在尝试摄取一些文件,并且每个文件都被读取为单个列字符串(这是预期的,因为它是一个固定宽度的文件),我必须将该单个值拆分为不同的列。这意味着我必须访问数据帧,但我必须使用 writeStream,因为它是一个流数据帧。这是输入的示例:

"64 Apple 32.32128Orange12.1932 Banana 2.45"

预期的数据框:

64, Apple, 32.32
128, Orange, 12.19
32, Banana, 2.45

注意每一列的字符数量是多少(3,6,5)

我尝试使用 forEach 作为以下示例,但它没有做任何事情:

two_d = []
streamingDF = (
  spark.readStream.format("cloudFiles")
    .option("encoding", sourceEncoding)
    .option("badRecordsPath", badRecordsPath)
    .options(**cloudfiles_config)
    .load(sourceBasePath)
    )

def process_row(string):

      rows = round(len(string)/chars_per_row)
      for i in range(rows):
        current_index = 0
        two_d.append([])
        for j in range(len(META_SIZES)):
          two_d[i].append(string[(i*chars_per_row+current_index) : (i*chars_per_row+current_index+META_SIZES[j])].strip())
          current_index += META_SIZES[j]
    
        print(two_d[i])
    
query = streamingDF.writeStream.foreach(process_row).start()

我可能会做一个 withColumn 来添加它们而不是列表,或者使用该列表并尽可能使其成为流数据帧。

编辑:我添加了一个输入示例并解释了 META_SIZES

【问题讨论】:

    标签: python dataframe pyspark spark-streaming fixed-width


    【解决方案1】:

    假设输入如下所示。

    ...
    "64 Apple 32.32"
    "128 Orange 12.19"
    "32 Banana 2.45"
    ...
    

    你可以这样做。

    streamingDF = (
      spark.readStream.format("cloudFiles")
        .option("encoding", sourceEncoding)
        .option("badRecordsPath", badRecordsPath)
        .options(**cloudfiles_config)
        .load(sourceBasePath)
        )
    
    #remove this line if strings are already utf-8
    lines = stream_lines.select(stream_lines['value'].cast('string'))
    
    
    lengths = (lines.withColumn('Count', functions.split(lines['value'], ' ').getItem(0))
                    .withColumn('Fruit', functions.split(lines['value'], ' ').getItem(1)
                    .withColumn('Price', functions.split(lines['value'], ' ').getItem(1))
    
    

    请注意,使用 readStream 读取字符串时,“value”被设置为默认列名。如果 cloud_config 包含更改输入列名的任何内容,则您必须更改上述代码中的列名。

    【讨论】:

    • 不幸的是没有分隔符。输入更像是:..."64 Apple 32.32128Orange12.1932 Banana 2.45"... 请注意缺少换行符,并且每个“列”都由相同数量的字符组成,与行相同,因为它是每列的总和
    猜你喜欢
    • 2021-03-21
    • 2020-05-11
    • 1970-01-01
    • 1970-01-01
    • 2020-10-09
    • 1970-01-01
    • 1970-01-01
    • 2018-10-30
    • 1970-01-01
    相关资源
    最近更新 更多