【发布时间】:2022-01-23 14:59:02
【问题描述】:
我尝试从 Hive 表创建 DataFrame。但我对 Spark API 的工作很糟糕。
我需要帮助优化方法 getLastSession 中的查询,将两个任务合二为一:
val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
val path = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
val df = spark.read.parquet(path)
def getLastSession: Dataset[Row] = {
val lastTime = df.select(max(col("time_write"))).collect()(0)(0).toString
val lastSession = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
val dfByLastSession = df.filter(col("id_session") === lastSession)
dfByLastSession.show()
/*
+----------+----------------+------------------+-------+
|id_session| time_write| key| value|
+----------+----------------+------------------+-------+
|alskdfksjd|1639950466414000|schema2.table2.csv|Failure|
*/
dfByLastSession
}
PS。我的源表(例如):
| name_process | id_session | time_write | key | value |
|---|---|---|---|---|
| OtherClass | jsdfsadfsf | 43434883477 | schema0.table0.csv | Success |
| OtherClass | jksdfkjhka | 23212123323 | schema1.table1.csv | Success |
| OtherClass | alskdfksjd | 23343212234 | schema2.table2.csv | Failure |
| ExternalClass | sdfjkhsdfd | 34455453434 | schema3.table3.csv | Success |
【问题讨论】:
-
@blackbishop,不。我正在尝试删除“lastTime”和“lastSession”变量。这样所有对 DataFrame 的操作都在变量“dfByLastSession”中以一种方法发生
-
我当前的方法工作正常。我只是尝试优化 spark 的查询。
-
您想获取与
id_session对应的所有行,其中包含最近的time_write,对吗? -
@blackbishop,是的,这是真的吗!
标签: dataframe scala apache-spark hadoop apache-spark-sql