【问题标题】:Pyspark writing dataframe to avro maintaining the sequence of key valuesPyspark 将数据帧写入 avro 维护键值序列
【发布时间】:2020-06-11 08:04:50
【问题描述】:

我正在尝试使用 pyspark 读取 avro 文件并根据某些键对其中一列进行排序。我的 avro 文件中的一列包含一个 MapType 数据,我需要根据键对其进行排序。测试 avro 仅包含一行实体列具有 MapType 数据。我的意图是将输出写回 avro 文件,但要按键排序。不幸的是,我无法做到这一点,不确定这在 avro 中是否可行?它以与输入出现相同的方式回写。 这是我的代码(我创建了一个笔记本来测试它):

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lit, to_json, create_map, from_json
from pyspark.sql import Row
from pyspark import StorageLevel
import json
from pyspark.sql.types import StringType
import shutil
from pyspark.sql.types import MapType, ArrayType, StringType, StructType, StructField

spark = SparkSession     .builder     .appName("AvroTest")     .config("spark.jars.packages", "org.apache.spark:spark-avro_2.11:2.4.0")     .getOrCreate()

df = spark.read.format("avro").load("part-r-00000.avro")
schema = df.select('entities').schema
sch = schema.fields[0].dataType
print(df.schema)

@udf
def udf_func(line):
    for entkey,subdict in line.items():
        subdictnew = subdict.asDict(True)
        sorteddict = dict(sorted(subdictnew['entities'].items(), key=lambda a: int(a[0])))
        subdictnew['entities'] = sorteddict
        line[entkey] = subdictnew
    return str(line)

dfnew = df.withColumn('entities', from_json(udf_func(df['entities']), sch)).persist(StorageLevel.MEMORY_ONLY_SER)
#dfnew.show()
d = dfnew.dtypes
newschema = dfnew.schema

try:
    shutil.rmtree('testavro/sortedData')
except:
    print('folder already removed')
dfnew.write.format('avro').save('ctipavro/sortedData')
dfnew.show(1, False)

上面的代码以未排序的方式写回了 avro。最后一行以排序方式打印“实体”的数据框列记录。

|37321431529|37321431529|1561020714|[trade -> [trade, [59489777 -> [TRADE_ASSOC_TO_DB_DT -> 2011-09-30, FCBA_IN -> N, ACCT_BALANCE_AM -> 0, CII_BKRPT_CD ->   , CREDIT_AM_EXCP_CD -> 6, FRAUD_IN -> N, ACCT_REPORTED_DT -> 2019-04-01, DATA_USAGE_EXCL_IN -> N, CII_REAFF_CD ->   , DEDUP_RANK_CD -> 0, NY_DISPLAY_RULE_IN -> N, ACCT_HIGH_BALANCE_AM_EXCP_CD -> 6, ACCT_PAYMENT_AM -> 13, EXCLUSION_CD -> 0, KOB_CD -> BB, PAYMENT_GRID_2 -> 0000000-0-0000-00-00000..............

请注意,我在这里打印已经排序的数据帧输出。但是当我尝试将保存的 avro 文件读回新的数据帧并执行show() 时,键再次未排序。请注意trade -> [trade 的第一个键,它应该是59489777,而它是别的东西 - 51237292611。顺便说一句,当我第一次读取输入 avro 时出现了这个键,不知道为什么在排序和写回之后,它首先打印相同的键:

dffresh = spark.read.format("avro").load("testavro/sortedData")
schema = dffresh.schema
print(schema)
dffresh.show(1, False)

输出:

|37321431529|37321431529|1561020714|[trade -> [trade, [51237292611 -> [TRADE_ASSOC_TO_DB_DT -> 2014-09-20, FCBA_IN -> N, ACCT_BALANCE_AM -> 0, CII_BKRPT_CD ->   , CREDIT_AM_EXCP_CD -> 6, FRAUD_IN -> N, ACCT_REPORTED_DT -> 2019-05-01, DATA_USAGE_EXCL_IN -> N, CII_REAFF_CD ->   , DEDUP_RANK_CD -> 0, NY_DISPLAY_RULE_IN -> N, ACCT_HIGH_BALANCE_AM_EXCP_CD -> 6, ACCT_PAYMENT_AM -> 0, EXCLUSION_CD -> 0, KOB_CD -> BC, PAYMENT_GRID_2 -> 000000C0000000..................................

我会请求任何人帮助我。我尝试了多种方法并搜索了多个 SO 问题,但找不到有关如何实现它的线索。

【问题讨论】:

  • 有人能帮忙吗?
  • 你能附上与此相关的示例数据和 avro 架构吗?特别是这个文件part-r-00000.avro 和它的架构

标签: python pyspark avro spark-avro


【解决方案1】:

如果您的源数据是 avro 格式,最好也将处理后的输出写入 Parquet 文件格式。您可以获得谓词下推的好处,并且始终可以处理选择性数量的列。

但如果再次写入 avro 格式是您过程的一部分,则列的顺序并不总是得到保证,因为正在使用的数据结构是 Map。您可以通过使用select 函数并按照您选择的顺序读取列来缓解这种情况。

【讨论】:

  • 谢谢@Yayati。我的问题是其中一列包含 json 数据,其中有多个属性需要排序。我的问题是,如果我使用select 函数,列数据中的属性会出现排序吗?
  • 是的,它们会出现排序
  • 不,不是。我试过了。问题是当新创建的 avro 被保存然后读取时,数据没有出现排序。此代码保存新的 avro 并打印数据框中的数据,它们显示为已排序 - dfnew.write.format('avro').save('ctipavro/sortedData') dfnew.show(1, False)。虽然这似乎没有排序:dffresh = spark.read.format("avro").load("ctipavro/sortedData") dffresh.select('entities').show(1, False)。这两个输出看起来就像我在我的问题中提供的一样。也许在写操作期间,数据再次未排序?
  • 不,我的意思是您必须在架构中显式传递列名。由于实体是 Map/Struct,因此您需要显式分解列值并选择它们。例如 explode(col(entities)) as e 然后 df.select($"e.column_name")
  • 还是不行,能否请您提供一个工作代码sn-p,只是为了以排序的方式读取嵌套的Map类型列。
猜你喜欢
  • 2020-10-06
  • 2018-06-24
  • 2020-11-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-04-11
  • 2020-07-10
相关资源
最近更新 更多