【问题标题】:Add a new key/value pair to a Spark MapType column向 Spark MapType 列添加新的键/值对
【发布时间】:2018-06-20 03:24:03
【问题描述】:

我有一个带有 MapType 字段的数据框。

>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> fields = StructType([
...         StructField('timestamp',      TimestampType(), True),
...         StructField('other_field',    StringType(), True),
...         StructField('payload',        MapType(
...                                         keyType=StringType(),
...                                         valueType=StringType()),
...                                                     True),   ])
>>> import datetime
>>> rdd = sc.parallelize([[datetime.datetime.now(), 'this should be in', {'akey': 'aValue'}]])
>>> df = rdd.toDF(fields)
>>> df.show()
+--------------------+-----------------+-------------------+
|           timestamp|      other_field|            payload|
+--------------------+-----------------+-------------------+
|2018-01-10 12:56:...|this should be in|Map(akey -> aValue)|
+--------------------+-----------------+-------------------+

我想将other_field 添加为payload 字段中的键。

我知道我可以使用 udf:

>>> def _add_to_map(name, value, map_field):
...     map_field[name] = value
...     return map_field
...
>>> add_to_map = udf(_add_to_map, MapType(StringType(),StringType()))
>>> df.select(add_to_map(lit('other_field'), 'other_field', 'payload')).show(1, False)
+------------------------------------------------------+
|PythonUDF#_add_to_map(other_field,other_field,payload)|
+------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue) |
+------------------------------------------------------+

有没有办法做到这一点没有udf

【问题讨论】:

  • 我担心按行 python 操作会影响性能。

标签: python pyspark apache-spark-sql spark-dataframe


【解决方案1】:

如果您提前知道密钥,这是一种无需udf 的方法。使用create_map 函数。至于这是否更高效,我不知道。

from pyspark.sql.functions import col, lit, create_map

df.select(
    create_map(
        lit('other_field'),
        col('other_field'),
        lit('akey'),
        col('payload')['akey']
    )
).show(n=1, truncate=False)

输出:

+-----------------------------------------------------+
|map(other_field, other_field, akey, payload['akey']) |
+-----------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue)|
+-----------------------------------------------------+

更新

这是一种无需对字典键进行硬编码的方法。不幸的是,它涉及一个collect() 操作。

模拟一些数据

首先,让我们修改您的原始数据框,在 MapType() 字段中包含一个键值对。

from pyspark.sql.functions import col, lit, create_map
import datetime
rdd = sc.parallelize(
    [
        [
            datetime.datetime.now(),
            'this should be in',
            {'akey': 'aValue', 'bkey': 'bValue'}
        ]
    ]
)
df = rdd.toDF(fields)
df.show(n=1, truncate=False)

创建以下内容:

+--------------------------+-----------------+-----------------------------------+
|timestamp                 |other_field      |payload                            |
+--------------------------+-----------------+-----------------------------------+
|2018-01-10 17:37:58.859603|this should be in|Map(bkey -> bValue, akey -> aValue)|
+--------------------------+-----------------+-----------------------------------+

获取地图的键

使用explode()collect(),您可以获得密钥:

from pyspark.sql.functions import explode

keys = [
    x['key'] for x in (df.select(explode("payload"))
                        .select("key")
                        .distinct()
                        .collect())
]

创建包含所有字段的新地图

现在像上面一样使用create_map,但使用来自keys 的信息来动态创建键值对。我使用了reduce(add, ...),因为create_map 期望输入按顺序是键值对——我想不出另一种方法来展平列表。

from operator import add
df.select(
    create_map
    (
        *([lit('other_field'), col('other_field')] + reduce(add, [[lit(k), col('payload').getItem(k)] for k in keys]))
    )
).show(n=1, truncate=False)

最终结果:

+---------------------------------------------------------------------------+
|map(other_field, other_field, akey, payload['akey'], bkey, payload['bkey'])|
+---------------------------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue, bkey -> bValue)      |
+---------------------------------------------------------------------------+

参考文献

  1. pyspark: Create MapType Column from existing columns

  2. PySpark converting a column of type 'map' to multiple columns in a dataframe

【讨论】:

  • 在我的情况下,密钥不会经常更改,因此预先计算密钥可能会给我带来性能提升,我会尝试一下。谢谢@pault
  • 乐于助人。请分享性能结果。我很想知道各种方法是如何相互叠加的。
  • 没有明显的性能差异。但是,由于 union all,这对于 spark stackoverflow.com/questions/38650568/…
  • 但是这些方法比udf快吗?还是您的意思是这些方法与udf 相比没有性能差异?
  • 我必须进行更严格的测试才能看到,我会回复你:)
猜你喜欢
  • 2021-12-09
  • 1970-01-01
  • 2019-11-01
  • 2015-07-09
  • 2016-08-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多