【发布时间】:2022-01-22 13:51:59
【问题描述】:
我想使用带有某些类型字段的镶木地板表:
name_process: String id_session: String time_write: String key: String value: String
“id_session”是 SparkSession 的 id。
表按“name_process”列分区
例如:
| name_process | id_session | time_write | key | value |
|---|---|---|---|---|
| OtherClass | sess000001 | 1639950466114000 | schema0.table0.csv | Success |
| OtherClass | sess000002 | 1639950466214000 | schema1.table1.csv | Success |
| OtherClass | sess000003 | 1639950466309000 | schema0.table0.csv | Success |
| OtherClass | sess000003 | 1639950466310000 | schema1.table1.csv | Failure |
| OtherClass | sess000003 | 1639950466311000 | schema2.table2.csv | Success |
| OtherClass | sess000003 | 1639950466312000 | schema3.table3.csv | Success |
| ExternalClass | sess000004 | 1639950466413000 | schema0.table0.csv | Success |
“key”列的所有值仅在一个 spark 会话(“id_session”列)中是唯一的。发生这种情况是因为我每次启动 spark 会话时都使用相同的文件 (csv)。我打算将这些文件发送到服务器。发送时间和服务器响应都将记录在“time_write”和“value”列中。 也就是说,我想查看所有 csv 文件的最新发送状态。
这是我将与之交互的条目的日志。为了与这个日志交互,我想实现几个方法:
所有 getter 方法都将返回过滤后的包含所有列的 DateFrame。也就是说,结果仍然是 5 列。 我仍然在使用 API Spark 时遇到困难。我需要一些时间才能学会如何在 DataFrames 上执行漂亮的操作。 这是我的结果:
abstract class ProcessResultBook(processName: String, onlyPartition: Boolean = true)(implicit spark: SparkSession) {
val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
val path = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
val df = spark.read.parquet(path)
def getLastSession: Dataset[Row] = {
val lastTime = df.select(max(col("time_write"))).collect()(0)(0).toString
val lastSession = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
val dfByLastSession = df.filter(col("id_session") === lastSession)
dfByLastSession.show()
/*
+----------+----------------+------------------+-------+
|id_session| time_write| key| value|
+----------+----------------+------------------+-------+
|alskdfksjd|1639950466414000|schema2.table2.csv|Failure|
*/
dfByLastSession
}
def add(df: DataFrame) = ???
def add(processName: String, idSession: String, timeWrite: String, key: String, value: String) = ???
def getSessionsByProcess(processName: String) = ???
def getBySessionAndProcess(processName: String, idSession: String) = ???
def getUnique(processName: String) = ???
def delByTime(time: String) = ???
def delByIdSession(idSession: String) = ???
def getCurrentTime: SQLTimestamp = DateTimeUtils.fromMillis(TimeStamp.getCurrentTime.getTime)
def convertTime(time: Long): String = TimeStamp.getNtpTime(time).getDate.toString
}
我有案例类:
case class RowProcessResult(
nameProcess: String,
idSession: String,
timeWrite: String,
key: String,
value: String
)
帮助实现2个方法:
- def add(data: List[RowProcessResult]): Unit
- def getUnique(nameProcess: String): DataFrame 或 List[RowProcessResult]
方法add(..)已在hive表中添加数据集合。
方法getUnique(nameProcess: String): DataFrame。返回一个 DataFrame,其中包含“key”列的唯一值的所有列。对于每个唯一的“键”值,都会选择最近的日期。
PS.:我创建 Hive Table 的测试类:
def createHiveTable(implicit spark: SparkSession) {
val schema = "test_schema"
val table = "test_table"
val partitionName = "name_process"
val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil
spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
//val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"
val path = new File(".").getAbsolutePath ++ "/src/test/data-lineage/test_data_journal.csv"
val df = spark.read.option("delimiter", ",")
.option("header", true)
.csv(path)
df.show()
df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")
}
【问题讨论】:
标签: dataframe scala apache-spark hadoop hive