【问题标题】:Convert a dataframe containing a list of dictionaries to a several rows in pyspark将包含字典列表的数据框转换为 pyspark 中的多行
【发布时间】:2021-08-26 16:34:06
【问题描述】:

我有以下问题,我有一个包含两列和字典列表的数据框。我为我拥有的数据结构创建的方案如下:

        tick_by_tick_schema = StructType([
            StructField('localSymbol', StringType()),
            StructField('tickByTicks', ArrayType(StructType([
                StructField('price', StringType()),
                StructField('size', StringType()),
                StructField('specialConditions', StringType()),
            ]))),
            StructField('domBids', ArrayType(StructType([
                StructField('price_bid', StringType()),
                StructField('size_bid', StringType()),
                StructField('marketMaker_bid', StringType()),
            ])))
        ])

我的数据框是这样的:

+-----------+----------------+----------------------------------------------------------------------------------------+
|localSymbol|tickByTicks     |domBids                                                                                 |
+-----------+----------------+----------------------------------------------------------------------------------------+
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|
+-----------+----------------+----------------------------------------------------------------------------------------+

现在我想得到的是这样的:

+-----------+----------------+----------------------------------------------------------------------------------------+---------+---------------+-----+
|localSymbol|tickByTicks     |domBids                                                                                 |price_bid|marketMaker_bid|price|
+-----------+----------------+----------------------------------------------------------------------------------------+---------+---------------+-----+
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|32.8     |CHX            |32.99|
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|32.8     |MEMX           |32.99|
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|32.8     |NYSENAT        |32.99|
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|32.79    |NSDQ           |32.99|
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|32.69    |BYX            |32.99|
+-----------+----------------+----------------------------------------------------------------------------------------+---------+---------------+-----+

我试过这个,但显然它不起作用xD

df = self.tick_by_tick_data_processed.select(f.col('localSymbol'),f.col('tickByTicks'),f.col('domBids'))\
    .withColumn('price_bid', f.explode(f.col('tickByTicks.price'))) \
    .withColumn('marketMaker_bid', f.explode(f.col('domBids.marketMaker_bid'))) \
    .withColumn('price_bid', f.explode(f.col('domBids.price_bid')))

【问题讨论】:

  • 我认为您需要使用explodedomBids 拆分为单独的行,然后将它们拆分为单独的列。
  • 我认为不是因为你在做笛卡尔乘法,或者至少像你说的那样做对我不起作用。你能把代码写给我吗?
  • 我不知道在 PySpark 中做这件事的正确方法是什么——在我看来,这就像在 Pandas 中可以通过使用 explode 来获取单个值而不是列表单元格来解决的问题。我想也许这会让你朝着正确的方向前进。

标签: python apache-spark pyspark apache-spark-sql spark-structured-streaming


【解决方案1】:

这可能有效:

# It explodes and select all struct columns
df = self.tick_by_tick_data_processed \
    .withColumn('tick', f.explode(f.col('tickByTicks'))) \
    .withColumn('dom', f.explode(f.col('domBids'))) \
    .select('localSymbol', 'tick.*', 'dom.*')

# OR

# Selecting only desired columns
df = self.tick_by_tick_data_processed \
    .withColumn('tick', f.explode(f.col('tickByTicks'))) \
    .withColumn('dom', f.explode(f.col('domBids'))) \
    .select('localSymbol', 
            f.col('tick.price').alias('tick_price'), 
            f.col('dom.marketMaker_bid').alias('marketMaker_bid'),
            f.col('dom.price_bid').alias('price_bid'))

【讨论】:

  • 对不起,我刚到家,它可以工作,我在 spark 中设置了错误。谢谢!!!! :)
猜你喜欢
  • 2019-04-21
  • 1970-01-01
  • 2021-05-21
  • 1970-01-01
  • 2023-03-12
  • 1970-01-01
  • 2018-10-10
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多