【问题标题】:effective way to groupby without using pivot in pyspark不使用 pyspark 中枢轴进行分组的有效方法
【发布时间】:2019-08-20 21:23:48
【问题描述】:

我有一个查询,我需要使用 pyspark 计算内存利用率。我已经使用 pivot 使用 python pandas 实现了这一点,但现在我需要在 pyspark 中完成,而旋转将是一个昂贵的功能,所以我想知道 pyspark 中是否有任何替代方案用于此解决方案

time_stamp          Hostname    kpi kpi_subtype value_current
2019/08/17 10:01:05 Server1     memory  Total       100
2019/08/17 10:01:06 Server1     memory  used        35
2019/08/17 10:01:09 Server1     memory  buffer      8
2019/08/17 10:02:04 Server1     memory  cached      10
2019/08/17 10:01:05 Server2     memory  Total       100
2019/08/17 10:01:06 Server2     memory  used        42
2019/08/17 10:01:09 Server2     memory  buffer      7
2019/08/17 10:02:04 Server2     memory  cached      9
2019/08/17 10:07:05 Server1     memory  Total       100
2019/08/17 10:07:06 Server1     memory  used        35
2019/08/17 10:07:09 Server1     memory  buffer      8
2019/08/17 10:07:04 Server1     memory  cached      10
2019/08/17 10:08:05 Server2     memory  Total       100
2019/08/17 10:08:06 Server2     memory  used        35
2019/08/17 10:08:09 Server2     memory  buffer      8
2019/08/17 10:08:04 Server2     memory  cached      10

需要转化为

time_stamp      Hostname    kpi Percentage
2019-08-17 10:05:00 Server1     memory  17
2019-08-17 10:05:00 Server2     memory  26
2019-08-17 10:10:00 Server1     memory  17
2019-08-17 10:10:00 Server2     memory  17

我使用的 Python 代码

df3 = pd.read_csv('/home/yasin/Documents/IMI/Data/memorry sample.csv')
df3['time_stamp'] = pd.to_datetime(df3['time_stamp'])
ns5min=5*60*1000000000 
df3['time_stamp'] = pd.to_datetime(((df3['time_stamp'].astype(np.int64) // ns5min + 1 ) * ns5min))
df4 = df3.pivot_table('value_current' , ['time_stamp' , 'Hostname ' , 'kpi' ], 'kpi_subtype')
df4 = df4.reset_index()
df4['Percentage'] = ((df4['Total'] - (df4['Total'] - df4['used'] + df4['buffer'] + df4['cached'])) / df4['Total']) * 100

寻找一种在 pyspark 中复制它并在 python 中作为枢轴的更有效方式是一项昂贵的操作,我需要在一个非常大的数据集上每 5 分钟执行一次

【问题讨论】:

  • 每次您将拥有服务器 1 和 2 的多个记录集?
  • 是的,每 5 分钟我们将获得相同的数据集
  • 第一组server1&server2的时间范围是多少?是5分钟吗?它会延长超过 5 分钟还是会保持在该时间范围内
  • 每 5 分钟将是同一组服务器。实际上将有大约 20000 台服务器

标签: python apache-spark pyspark apache-spark-sql pyspark-sql


【解决方案1】:

当转换为列的值列表未知时,数据透视的开销很大。 Spark 有一个重载的 pivot 方法,将它们作为参数。

def pivot(pivotColumn: String, values: Seq[Any])

如果不知道它们,Spark 必须对数据集中的不同值进行排序和收集。否则,逻辑非常简单,描述为here

该实现添加了一个新的逻辑运算符 (o.a.s.sql.catalyst.plans.logical.Pivot)。该逻辑运算符由新的分析器规则 (o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot) 翻译,该规则目前将其翻译为包含大量 if 语句的聚合,每个主元值一个表达式。

例如,df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D") 将被翻译成等价于df.groupBy("A", "B").agg(expr("sum(if(C = 'small', D, null))"), expr("sum(if(C = 'large', D , 无效的))”))。您可以自己完成此操作,但它会变得很长并且可能很快容易出错。

如果不旋转,我会做这样的事情:

val in = spark.read.csv("input.csv")
      //cast to the unix timestamp
      .withColumn("timestamp", unix_timestamp($"time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType))
      .drop($"time_stamp")

现在我们可以使用主机名按时间窗口对数据集进行分组,并将 KPI 指标收集到地图中。
有一个出色的 answer 描述了这一点。

val joinMap = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }

val grouped = in.groupBy(window($"timestamp", "5 minutes"), $"Hostname")
  .agg(joinMap(collect_list(map($"kpi_subtype", $"value_current".cast(DoubleType)))).as("metrics"))

输出

+------------------------------------------+--------+-------------------------------------------------------------+
|window                                    |Hostname|metrics                                                      |
+------------------------------------------+--------+-------------------------------------------------------------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |[Total -> 100.0, used -> 42.0, buffer -> 7.0, cached -> 9.0] |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
+------------------------------------------+--------+-------------------------------------------------------------+

现在我们定义一些别名和一个简单的选择语句:

val total = col("metrics")("Total")
val used = col("metrics")("used")
val buffer = col("metrics")("buffer")
val cached = col("metrics")("cached")

val result = grouped.select($"window", $"Hostname",
          (total - ((total - used + buffer + cached) / total) * 100).as("percentage"))

我们开始吧:

+------------------------------------------+--------+----------+
|window                                    |Hostname|percentage|
+------------------------------------------+--------+----------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |17.0      |
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |26.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |17.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |17.0      |
+------------------------------------------+--------+----------+

【讨论】:

  • 嗨...我相信这段代码在 scala 中。由于我的其余代码在 pyspark 中,因此需要将其与我现有的代码集成。
  • 用到的函数都是Spark的,python里没有吗?
  • joinMap = udf ({ values: Seq[Map[String, Double]] => values.flatten.toMap} )。我将其修改为 python,但出现符号 '=>' 的语法错误
  • 改用Python UDF 语法:docs.databricks.com/spark/latest/spark-sql/udf-python.html。此外,您的类型不同,这里有一个 python 示例。 stackoverflow.com/questions/41288622/…
  • 谢谢。我想出了替代的 pyspark 代码
【解决方案2】:

第一个是在 spark 中使用 pivot,第二个是使用 map。

第一种解决方案

df = sql.read.csv("/home/yasin/Documents/IMI/Data/memorry sample.csv", header = "True").withColumn("timestamp", unix_timestamp("time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType())).drop("time_stamp")
df = df.withColumn("unixtime",unix_timestamp(df["timestamp"],"yyyy/MM/dd HH:mm:ss"))
df = df.withColumn("unixtime2",(round(df["unixtime"]/300)*300).cast("timestamp"))
df = df.groupBy("unixtime2" , "Hostname" , "kpi").pivot("kpi_subtype").agg(mean(df["value_current"]))
df = df.withColumn("Percentage", (df["Total"] - (df["Total"] - df["Used"] + df["buffer"] + df["cached"])) /df["Total"] * 100)

第二个解决方案

df = sql.read.csv("/home/yasin/Documents/IMI/Data/memorry sample.csv", header = "True").withColumn("timestamp", unix_timestamp("time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType())).drop("time_stamp")
df = df.withColumn("unixtime",unix_timestamp(df["timestamp"],"yyyy/MM/dd HH:mm:ss"))
df = df.withColumn("unixtime2",(round(df["unixtime"]/300)*300).cast("timestamp"))
df = df.withColumn("value_current2",df["value_current"].cast("Float"))
df = df.groupBy("unixtime2" , "Hostname" , "kpi").agg(collect_list(create_map("kpi_subtype","value_current2")).alias("mapped"))
nn=df.withColumn("formula" ,  ( df["mapped"][0]["Total"].cast("Float") - (( df["mapped"][0]["Total"].cast("Float") - df["mapped"][1]["used"].cast("Float")  + df["mapped"][2]["buffer"].cast("Float") + df["mapped"][3]["cached"].cast("Float") ) / df["mapped"][0]["Total"].cast("Float") ) * 100).cast("Float"))

【讨论】:

  • 不错。在 Hive 中寻找一些方法来做到这一点
  • 我没用过hive所以不知道。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-04-09
  • 1970-01-01
  • 1970-01-01
  • 2018-03-08
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多