【问题标题】:extract tags from Dataframe column从 Dataframe 列中提取标签
【发布时间】:2019-10-27 16:32:38
【问题描述】:

我有一个数据框,其中包含来自 Azure Consumtion Databricks python notebook 的数据。我在这里只显示列/行的子集。

[Row(ResourceRate='0.029995920244854', PreTaxCost='0.719902085876484',  
ResourceType='Microsoft.Compute/virtualMachines',  Tags=None, ),
 Row(ResourceRate='1.10999258782982',  PreTaxCost='26.6398221079157',  
ResourceType='Microsoft.Compute/virtualMachines',  
Tags='"{  ""project"": ""70023"",  ""service"": ""10043""}"')
 ]

我需要从标签列中提取标签并将它们公开为(表)列。
顺便说一句,我不确定从哪里得到这些双引号。可能来自源表beeing .csv。但这可能很容易最终解决。

我使用 pyspark。我试图做这样的事情Split Spark Dataframe string column into multiple columns

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import split, posexplode, concat, expr, lit, col, first
df2 = df.withColumn("num", monotonically_increasing_id())
df3 = df2.select(
        "num",
        split("Tags", ", ").alias("Tags"),
        posexplode(split("Tags", ",")).alias("pos", "val")
    )
#display(df3)
df4 = df3.drop("val")\
    .select(
        "num",
        concat(lit("Tag"),col("pos").cast("string")).alias("name"),
        expr("Tags[pos]").alias("val")
    )
# display(df4)
df5 = df4.groupBy("num").pivot("name").agg(first("val"))
display(df5)

这不是我想要的。

num     Tag0
964     
1677    """project"": ""70023"", """service"": ""10024""
2040    """project"": ""70025"", """service"": ""10034""
2214    
...

我更喜欢将标签作为 cols:

num     project        service       ResourceRate       PreTaxCost
964                                  0.029995920244854  0.719902085876484
677     70023          10024         1.10999258782982   26.6398221079157
2040    70025          10034         0.029995920244854  0.719902085876484
2214                                 0.029995920244854  0.719902085876484
...

【问题讨论】:

  • [Row(ResourceRate='0.029995920244854', PreTaxCost='0.719902085876484', ResourceType='Microsoft.Compute/virtualMachines', Tags=None, ), Row(ResourceRate='1.10999258782982', PreTaxCost=' 26.6398221079157', ResourceType='Microsoft.Compute/virtualMachines', Tags='"{ ""project"": ""70023"", ""service"": ""10043""}"') ]
  • 你能展示一下DF的结构吗?
  • 整个结构非常长而扁平——只是一串串。重要的一个是标签。它可以包含其他(有趣的)值。
  • 我可以只拆分一个字段,剩下的交给你。

标签: python azure apache-spark pyspark databricks


【解决方案1】:

IIUC,您可以将Tags 转换为一列JSON 字符串(trim 前导和尾随"regexp_replace"到一个单独的"),然后使用 json_tuple() 检索所需的字段。见以下代码:

from pyspark.sql.functions import expr, json_tuple

df.withColumn('Tags', expr("""regexp_replace(trim(BOTH '"' FROM Tags), '""', '"')""")) \
  .select('*', json_tuple('Tags', 'project', 'service').alias('project','service'))\
  .show()                                                  
#+-----------------+-----------------+--------------------+--------------------+-------+-------+
#|       PreTaxCost|     ResourceRate|        ResourceType|                Tags|project|service|
#+-----------------+-----------------+--------------------+--------------------+-------+-------+
#|0.719902085876484|0.029995920244854|Microsoft.Compute...|                null|   null|   null|
#| 26.6398221079157| 1.10999258782982|Microsoft.Compute...|{ "project": "700...|  70023|  10043|
#+-----------------+-----------------+--------------------+--------------------+-------+-------+

【讨论】:

    【解决方案2】:

    这是尝试将标签拆分为多列的示例代码:

    from pyspark.sql import SparkSession
    import pyspark.sql.functions as f
    
    
    def columnList(r):
      val = str(r[0].tags)
      i = int(val.index("{") + 1)
      j = int(val.index("}"))
      val = val[i:j]
      vals = val.split(",")
      collist = []
      collist.append('id')
      for val in vals:
        keyval = val.split(":")
        key = keyval[0]
        collist.append(key.replace('"',""))
      return collist
    
    def valueList(r):
      val = r[1]
      i = int(val.index("{")+1)
      j = int(val.index("}"))
      val = val[i:j]
      vals = val.split(",")
      valList = []
      valList.append(r[0])
      for val in vals:
          keyval = val.split(":")
          value = keyval[1]
          valList.append(value.replace('"',""))
      return valList
    
    sc = SparkSession.builder.appName("example").\
    config("spark.driver.memory","1g").\
    config("spark.executor.cores",2).\
    config("spark.max.cores",4).getOrCreate()
    
    df = 
    

    sc.read.format("csv").option("header","true").option("delimiter","|").load("columns.csv")

    tagsdf = df.select("id","tags")
    
    
    colList = columnList(tagsdf.rdd.take(1))
    tagsdfrdd = tagsdf.rdd.map(lambda r : valueList(r))
    
    dfwithnewcolumns = tagsdfrdd.toDF(colList)
    
    newdf = df.drop("tags").join(dfwithnewcolumns,on=["id"])
    
    newdf.show()
    

    示例测试文件 id|ResourceRate|PreTaxCost|ResourceType|标签 1|'1.10999258782982'|'26.6398221079157'|'Microsoft.Compute/virtualMachines'|'"{ ""project"": ""70023"", ""service"": ""10043""}"'

    如果你没有 id 列,那么你可能想要合并 rdds

    【讨论】:

      猜你喜欢
      • 2023-01-28
      • 1970-01-01
      • 2013-04-29
      • 2013-07-12
      • 1970-01-01
      • 2019-06-12
      • 2018-03-30
      • 1970-01-01
      • 2010-10-17
      相关资源
      最近更新 更多