【问题标题】:Optimization query for DataFrame SparkDataFrame Spark 的优化查询
【发布时间】:2022-01-23 14:59:02
【问题描述】:

我尝试从 Hive 表创建 DataFrame。但我对 Spark API 的工作很糟糕。

我需要帮助优化方法 getLastSession 中的查询,将两个任务合二为一:

val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
val path      = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
val df        = spark.read.parquet(path)


def getLastSession: Dataset[Row] = {
  val lastTime        = df.select(max(col("time_write"))).collect()(0)(0).toString
  val lastSession     = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
  val dfByLastSession = df.filter(col("id_session") === lastSession)

  dfByLastSession.show()
  /*
  +----------+----------------+------------------+-------+
  |id_session|      time_write|               key|  value|
  +----------+----------------+------------------+-------+
  |alskdfksjd|1639950466414000|schema2.table2.csv|Failure|

  */
  dfByLastSession
}

PS。我的源表(例如):

name_process id_session time_write key value
OtherClass jsdfsadfsf 43434883477 schema0.table0.csv Success
OtherClass jksdfkjhka 23212123323 schema1.table1.csv Success
OtherClass alskdfksjd 23343212234 schema2.table2.csv Failure
ExternalClass sdfjkhsdfd 34455453434 schema3.table3.csv Success

【问题讨论】:

  • @blackbishop,不。我正在尝试删除“lastTime”和“lastSession”变量。这样所有对 DataFrame 的操作都在变量“dfByLastSession”中以一种方法发生
  • 我当前的方法工作正常。我只是尝试优化 spark 的查询。
  • 您想获取与id_session 对应的所有行,其中包含最近的time_write,对吗?
  • @blackbishop,是的,这是真的吗!

标签: dataframe scala apache-spark hadoop apache-spark-sql


【解决方案1】:

您可以像这样在窗口中使用row_number

import org.apache.spark.sql.expressions.Window

val dfByLastSession = df.withColumn(
  "rn", 
  row_number().over(Window.orderBy(desc("time_write")))
).filter("rn=1").drop("rn")
    
dfByLastSession.show()

但是,由于您没有按任何字段进行分区,因此可能会降低性能。

您可以在代码中更改的另一件事是使用结构排序来通过一个查询获取与最近的time_write 关联的id_session

val lastSession = df.select(max(struct(col("time_write"), col("id_session")))("id_session")).first.getString(0)

val dfByLastSession = df.filter(col("id_session") === lastSession)

【讨论】:

  • 看起来很不错。感谢你们对我的帮助。我一定会熟悉 spark 中的 windows。
猜你喜欢
  • 1970-01-01
  • 2018-02-03
  • 1970-01-01
  • 2020-11-29
  • 2020-04-12
  • 2019-10-05
  • 2019-02-10
  • 2019-09-06
  • 2023-03-11
相关资源
最近更新 更多