【问题标题】:Transformation of a DataFrame SparkDataFrame Spark 的转换
【发布时间】:2022-01-22 13:51:59
【问题描述】:

我想使用带有某些类型字段的镶木地板表:

name_process: String id_session: String time_write: String key: String value: String

“id_session”是 SparkSession 的 id。

表按“name_process”列分区

例如:

name_process id_session time_write key value
OtherClass sess000001 1639950466114000 schema0.table0.csv Success
OtherClass sess000002 1639950466214000 schema1.table1.csv Success
OtherClass sess000003 1639950466309000 schema0.table0.csv Success
OtherClass sess000003 1639950466310000 schema1.table1.csv Failure
OtherClass sess000003 1639950466311000 schema2.table2.csv Success
OtherClass sess000003 1639950466312000 schema3.table3.csv Success
ExternalClass sess000004 1639950466413000 schema0.table0.csv Success

“key”列的所有值仅在一个 spark 会话(“id_session”列)中是唯一的。发生这种情况是因为我每次启动 spark 会话时都使用相同的文件 (csv)。我打算将这些文件发送到服务器。发送时间和服务器响应都将记录在“time_write”和“value”列中。 也就是说,我想查看所有 csv 文件的最新发送状态。

这是我将与之交互的条目的日志。为了与这个日志交互,我想实现几个方法:

所有 getter 方法都将返回过滤后的包含所有列的 DateFrame。也就是说,结果仍然是 5 列。 我仍然在使用 API Spark 时遇到困难。我需要一些时间才能学会如何在 DataFrames 上执行漂亮的操作。 这是我的结果:

abstract class ProcessResultBook(processName: String, onlyPartition: Boolean = true)(implicit spark: SparkSession) {

  val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
  val path      = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
  val df        = spark.read.parquet(path)


  def getLastSession: Dataset[Row] = {
    val lastTime        = df.select(max(col("time_write"))).collect()(0)(0).toString
    val lastSession     = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
    val dfByLastSession = df.filter(col("id_session") === lastSession)

    dfByLastSession.show()
/*
+----------+----------------+------------------+-------+
|id_session|      time_write|               key|  value|
+----------+----------------+------------------+-------+
|alskdfksjd|1639950466414000|schema2.table2.csv|Failure|

*/
    dfByLastSession
  }

  def add(df: DataFrame) = ???
  def add(processName: String, idSession: String, timeWrite: String, key: String, value: String) = ???
  def getSessionsByProcess(processName: String) = ???
  def getBySessionAndProcess(processName: String, idSession: String) = ???
  def getUnique(processName: String) = ???
  def delByTime(time: String) = ???
  def delByIdSession(idSession: String) = ???

  def getCurrentTime: SQLTimestamp    = DateTimeUtils.fromMillis(TimeStamp.getCurrentTime.getTime)
  def convertTime(time: Long): String = TimeStamp.getNtpTime(time).getDate.toString
}

我有案例类:

case class RowProcessResult(
                              nameProcess: String,
                              idSession: String,
                              timeWrite: String,
                              key: String,
                              value: String
                           )

帮助实现2个方法:

  • def add(data: List[RowProcessResult]): Unit
  • def getUnique(nameProcess: String): DataFrame 或 List[RowProcessResult]

方法add(..)已在hive表中添加数据集合。

方法getUnique(nameProcess: String): DataFrame。返回一个 DataFrame,其中包含“key”列的唯一值的所有列。对于每个唯一的“键”值,都会选择最近的日期。

PS.:我创建 Hive Table 的测试类:

def createHiveTable(implicit spark: SparkSession) {

  val schema = "test_schema"
  val table = "test_table"
  val partitionName = "name_process"
  val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil

  spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
  //val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"

  val path = new File(".").getAbsolutePath ++ "/src/test/data-lineage/test_data_journal.csv"

  val df = spark.read.option("delimiter", ",")
    .option("header", true)
    .csv(path)

  df.show()

  df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")

}

【问题讨论】:

    标签: dataframe scala apache-spark hadoop hive


    【解决方案1】:

    已经很久了。我把我的决定留在这里。

    import spark.implicits._
    
      val schema = "test_db"
      val table  = "test_table"
      val df     = spark.read.table(s"$schema.$table").filter(col("name_process") === processName).persist
    
    
      def getLastSession: Dataset[Row] = {
    
        val lastSessionId   = df.select(max(struct(col("time_write"), col("id_session")))("id_session"))
                                .first.getString(0)
        val dfByLastSession = df.filter(col("id_session") === lastSessionId)
        dfByLastSession.show()
        dfByLastSession
      }
    
      def add(listRows: Seq[RowProcessResult]) = {
    
        val df = listRows.toDF().withColumn("name_process", lit(processName))
        df.show()
        addDfToTable(df)
    
      }
    
      def add(nameProcess: String, idSession: String, timeWrite: String, key: String, value: String) = {
        val df = RowProcessResult(idSession, timeWrite, key, value) :: Nil toDF()
        addDfToTable(df)
      }
    
      def getSessionsByProcess(externalProcessName: String) = {
        spark.read.table(s"$schema.$table").filter(col("name_process") === externalProcessName)
      }
    
      def getSession(idSession: String, processName: String = this.processName) = {
        if (processName.equals(this.processName))
          df.filter(col("id_session") === idSession)
        else
          getSessionsByProcess(processName).filter(col("id_session") === idSession)
      }
    
      def getUnique = df.sort(col("time_write").desc).dropDuplicates("key")
    
      def addDfToTable(df: DataFrame) =
        df.write.mode(SaveMode.Append).insertInto(s"$schema.$table")
    
      def getFullDf = df
      def getCurrentTime = TimeStamp.getCurrentTime
      def convertTime(time: Long): String = TimeStamp.getNtpTime(time).getDate.toString
    }
    

    我可以得到可以忍受的解决方案。这还不错。谢谢你,新年快乐!! =)

    【讨论】:

      猜你喜欢
      • 2017-03-17
      • 1970-01-01
      • 2017-08-10
      • 1970-01-01
      • 1970-01-01
      • 2019-02-17
      • 2017-06-02
      • 2018-11-30
      • 2020-11-29
      相关资源
      最近更新 更多