【问题标题】:Doing pattern matching on typed dataset在类型化数据集上进行模式匹配
【发布时间】:2020-09-27 11:11:00
【问题描述】:

我正在尝试根据 spark 数据集的类型应用不同类型的逻辑。 根据传递给doWorkCustomerWorker)的案例类的类型,我必须应用不同类型的聚合。 我该怎么做?

import org.apache.spark.sql.{Dataset, SparkSession}

object SparkSql extends App {
  import spark.implicits._

  val spark = SparkSession
    .builder()
    .appName("Simple app")
    .config("spark.master", "local")
    .getOrCreate()

  sealed trait Person {
    def name: String
  }

  final case class Customer(override val name: String, email: String)                extends Person
  final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person

  val workers: Dataset[Worker] = Seq(
    Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
    Worker("Sam", id = 1, skills = Array("self-motivation"))
  ).toDS

  def doWork(persons: Dataset[Person]): Unit = {
    persons match {
      case ... // Dataset[Customer] ... do something
      case ... // Dataset[Worker] ... do something else
    }
  }

}

【问题讨论】:

  • 你不能这样做,因为类型擦除。数据集的类型是静态的还是动态的?另外,你想对所有数据集还是对每个元素做点什么?
  • 您不能直接对泛型进行模式匹配,因为它们在运行时不存在。但是,可能有出路,详情请参阅此答案:stackoverflow.com/questions/12218641/…
  • 查询-你如何将Dataset[Worker]传递给def doWork(persons: Dataset[Person]): Unit = {

标签: scala apache-spark dataset pattern-matching


【解决方案1】:

使用案例类,您可以进行模式匹配。 Case 类是 Scala 允许对对象进行模式匹配的方式,而无需 需要大量的样板。通常,您需要做的就是添加 您希望模式匹配的每个类的单个 case 关键字。

举个例子:

abstract class Expr
case class Var(name: String) extends Expr
case class Number(num: Double) extends Expr
case class UnOp(operator: String, arg: Expr) extends Expr
case class BinOp(operator: String,left: Expr, right: Expr) extends Expr

def simplifyTop(expr: Expr): Expr = expr match {
  case UnOp("",UnOp("",e)) => e // Double negation
  case BinOp("+", e, Number(0)) => e // Adding zero
  case BinOp("*", e, Number(1)) => e // Multiplying by one
  case _ => expr
}

用你的例子我会试试这个

  def doWork(persons: Person): Unit = {
    persons match {
      case Customer => ... do something
      case Worker ... do something else
    }
  }

dataset.map(doWork)

【讨论】:

  • 可以,但是如何处理不同类型的数据集?
  • def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] (Scala-specific) Returns a new Dataset that contains the result of applying func to each element. Annotations @Experimental() @Evolving() Since 1.6.0 您可以应用映射并传递给对象 Person doWork(persons: Person)
【解决方案2】:

修改您的方法以接受 [T <:parent] 并从 Dataset.javaRdd 中提取 bean 类名称,如下所示

import org.apache.spark.sql.Dataset

object InheritDataframe {


  private def matcherDef[T <:parent](dfb: Dataset[T]): Unit = {

    dfb.toJavaRDD.classTag.toString() match {
      case "child1" =>  println("child1")
      case "child2" => println("child2")
      case _ => println("Unkown")
    }

  }

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    val dfB  = List(child1(1)).toDS()
    val dfC  = List(child2(1)).toDS()

    matcherDef(dfB)
    matcherDef(dfC)

  }


}

case class child1(i: Int) extends parent(i)

case class child2(i: Int) extends parent(i)

class parent(j: Int)


【讨论】:

    【解决方案3】:

    试试这个-

    
    sealed trait Person {
      def name: String
    }
    
    final case class Customer(override val name: String, email: String)                extends Person
    final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
    
    

    测试用例-

      @Test
      def test62262873(): Unit = {
    
        val workers: Dataset[Worker] = Seq(
          Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
          Worker("Sam", id = 2, skills = Array("self-motivation"))
        ).toDS
    
        import scala.reflect.runtime.universe._
        def doWork[T : TypeTag](persons: Dataset[T]): Unit = {
          typeOf[T] match {
            case t if t =:= typeOf[Worker] => println("I'm worker")
              persons.as[Worker].filter(_.id == 2).show(false)
            case t if t =:= typeOf[Customer] => println("I'm Customer")
              persons.as[Customer].filter(_.name.contains("B")).show(false)
    
          }
        }
        doWork(workers)
    
        /**
          * I'm worker
          * +----+---+-----------------+
          * |name|id |skills           |
          * +----+---+-----------------+
          * |Sam |2  |[self-motivation]|
          * +----+---+-----------------+
          */
      }
    

    【讨论】:

    • 这很有趣,但是我无法访问每种情况下的类型,请参阅代码的修改 ``` import scala.reflect.runtime.universe._ def doWork2[T: TypeTag](persons: Dataset[T]): Dataset[T] = { typeOf[T] match { case t if t =:= typeOf[Worker] => people.filter(.id == 2) case t if t =: = typeOf[Customer] => people.filter(.name.contains("B")) } } ```
    • 查看编辑,对我来说效果很好,如果有帮助,请随时接受 + 点赞
    【解决方案4】:

    我找到了自己问题的解决方案,但是我想感谢 Someshwar Kale'answer,因为它可以满足要求。在这个版本中,我使用隐式创建转换器,我可以根据需要进行扩展。

    import org.apache.spark.sql.{Dataset, SparkSession}
    
    object TempProject extends App {
      import spark.implicits._
    
      val spark = SparkSession
        .builder()
        .appName("Simple app")
        .config("spark.master", "local")
        .getOrCreate()
    
      sealed trait Person {
        def name: String
      }
      final case class Customer(override val name: String, email: String)                extends Person
      final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
    
      trait CustomDataProcessor[T] {
        def doSomethingCool(dataset: Dataset[T]): Dataset[T]
      }
    
      implicit object CustomerDataProcessor extends CustomDataProcessor[Customer] {
    
        override def doSomethingCool(dataset: Dataset[Customer]): Dataset[Customer] =
          dataset.filter(_.name.contains("B"))
      }
    
      implicit object WorkerDataProcessor extends CustomDataProcessor[Worker] {
    
        override def doSomethingCool(dataset: Dataset[Worker]): Dataset[Worker] =
          dataset.filter(_.id == 2)
      }
    
      def doWork[T](person: Dataset[T])(implicit processor: CustomDataProcessor[T]): Unit = {
        processor.doSomethingCool(person)
      }
    
      val workers: Dataset[Worker] = Seq(
        Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
        Worker("Sam", id = 1, skills = Array("self-motivation"))
      ).toDS
    
      val customers: Dataset[Customer] = Seq(
        Customer("Bob", "bob@email"),
        Customer("Jack", "jack@email")
      ).toDS
    
      doWork(workers)
      doWork(customers)
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-10-25
      • 1970-01-01
      相关资源
      最近更新 更多