【问题标题】:Creating multiple columns in spark Dataframe dynamically在 spark Dataframe 中动态创建多列
【发布时间】:2017-09-11 11:50:40
【问题描述】:

我有字典,其中包含以下信息,

dict_segs = {'key1' : {'a' : {'col1' : 'value1', 'col2' : 'value2', 'col3': 'value3'}, 
                'b' : {'col2' : 'value2', 'col3' : 'value3'}, 
                'c' : {'col1' : 'value1'}},
        'key2' : {'d' : {'col3' : 'value3', 'col2' : 'value2'},
                'f' : {'col1' : 'value1', 'col4' : 'value4'}}}

待办事项:

keys 基本上是“segments”,其基础字典,即 key1 的 a、b、c 是“subsegments”。对于每个子段,过滤条件在子段的基础字典中可用,即 a、b、c、d、f。另外,子段字典键的过滤条件也是pyspark数据框的列名。

我想在 pyspark 数据框中为每个段一次性创建子段列,当满足过滤条件时,每个子段列的值将为 1,否则为 0,类似于,

for item in dict_segs:
    pyspark_dataframe.withColumn(*dict_segs[item].keys(), when(meeting filter criteria with respect to each key), 1).otherwise(0))

在进行研究时,我能够在 scala 中找到类似的东西,但列过滤条件是静态的,但对于上述逻辑,即动态。请看下面的scala逻辑,

Spark/Scala repeated calls to withColumn() using the same function on multiple columns

需要支持根据上面的伪代码为每个段派生上述逻辑。

谢谢。

【问题讨论】:

    标签: python apache-spark dynamic pyspark multiple-columns


    【解决方案1】:

    您正在寻找select 声明:

    让我们创建一个示例数据框:

    df = spark.createDataFrame(
        sc.parallelize([["value" + str(i) for i in range(1, 5)], ["value" + str(i) for i in range(5, 9)]]), 
        ["col" + str(i) for i in range(1, 5)]
    )
    
    +------+------+------+------+
    |  col1|  col2|  col3|  col4|
    +------+------+------+------+
    |value1|value2|value3|value4|
    |value5|value6|value7|value8|
    +------+------+------+------+
    

    现在对于字典中的所有 keysdict_seg[key] 中的所有 subkeysdict_seg[key][subkey] 中的所有 columns

    import pyspark.sql.functions as psf
    df.select(
        ["*"] +
        [
            eval('&'.join([
                '(df["' + c + '"] == "' + dict_segs[k][sk][c] + '")' for c in dict_segs[k][sk].keys()
            ])).cast("int").alias(sk) 
            for k in dict_segs.keys() for sk in dict_segs[k].keys()
        ]
    ).show()
    
    +------+------+------+------+---+---+---+---+---+
    |  col1|  col2|  col3|  col4|  a|  b|  c|  d|  f|
    +------+------+------+------+---+---+---+---+---+
    |value1|value2|value3|value4|  1|  1|  1|  1|  1|
    |value5|value6|value7|value8|  0|  0|  0|  0|  0|
    +------+------+------+------+---+---+---+---+---+
    
    • "*" 允许您保留所有以前存在的列,可以将其替换为 df.columns
    • alias(sk) 允许您将名称 sk 赋予新列
    • cast("int") 将 boolean 类型更改为 int 类型

    我真的不明白为什么你有一个深度 3 的字典,似乎 key1, key2 并不是真的有用。

    【讨论】:

    • 感谢完美的解决方案。对于字典中的额外级别。我在其中一个过滤条件中使用此信息。但是,经过分析发现您的观点是有效的,并且不是必需的。掉了一级,最后保留了两级。
    • 酷我很高兴能帮上忙。不要忘记将问题标记为已解决
    • 当然,你能帮忙传递列表而不是价值。所以我修改为: eval('&'.join([ '(df["' + c + '"] == "' + dict_segs[k][sk][c] + '")' for c in dict_segs[k][sk].keys() ])) 修改:eval('&'.join([ '(tbl["' + c + '"].isin("' + v + '") )' for c, v in self.dict_tstPlan[sk].iteritems() ])) 我想传递 'v' 值的列表...
    • 您必须使用 '","'.join(v) 将列表转换为字符串,因此在 eval 函数中您最终会得到:eval('&'.join([ '(tbl["' + c + '"].isin(["' + '","'.join(v) + '"]))'
    猜你喜欢
    • 1970-01-01
    • 2021-08-20
    • 2019-07-08
    • 2018-07-11
    • 2015-07-25
    • 2020-09-22
    • 2020-01-31
    • 1970-01-01
    • 2017-12-13
    相关资源
    最近更新 更多