【发布时间】:2021-12-24 01:46:36
【问题描述】:
我正在尝试摄取一些文件,并且每个文件都被读取为单个列字符串(这是预期的,因为它是一个固定宽度的文件),我必须将该单个值拆分为不同的列。这意味着我必须访问数据帧,但我必须使用 writeStream,因为它是一个流数据帧。这是输入的示例:
"64 Apple 32.32128Orange12.1932 Banana 2.45"
预期的数据框:
64, Apple, 32.32
128, Orange, 12.19
32, Banana, 2.45
注意每一列的字符数量是多少(3,6,5)
我尝试使用 forEach 作为以下示例,但它没有做任何事情:
two_d = []
streamingDF = (
spark.readStream.format("cloudFiles")
.option("encoding", sourceEncoding)
.option("badRecordsPath", badRecordsPath)
.options(**cloudfiles_config)
.load(sourceBasePath)
)
def process_row(string):
rows = round(len(string)/chars_per_row)
for i in range(rows):
current_index = 0
two_d.append([])
for j in range(len(META_SIZES)):
two_d[i].append(string[(i*chars_per_row+current_index) : (i*chars_per_row+current_index+META_SIZES[j])].strip())
current_index += META_SIZES[j]
print(two_d[i])
query = streamingDF.writeStream.foreach(process_row).start()
我可能会做一个 withColumn 来添加它们而不是列表,或者使用该列表并尽可能使其成为流数据帧。
编辑:我添加了一个输入示例并解释了 META_SIZES
【问题讨论】:
标签: python dataframe pyspark spark-streaming fixed-width