【问题标题】:Store Enum into Cassandra as Integer using Spark Cassandra Connector使用 Spark Cassandra 连接器将枚举作为整数存储到 Cassandra
【发布时间】:2016-03-21 16:14:19
【问题描述】:

我正在尝试使用其 Int 表示在 Cassandra 中存储一个 scala 枚举,但我总是得到一个 com.datastax.spark.connector.types.TypeConversionException。我想知道 Enumeration 类是不是特例,还是我做错了什么。

编辑 (2015-12-16)。 让我尝试用代码 sn-p 来扩展我的问题,这样我可能可以更好地传达这个想法。

import org.apache.spark.{SparkConf, SparkContext}

import com.datastax.spark.connector._

object WeekDay {
  sealed abstract class WeekDay(val id: Int)

  case object MON extends WeekDay(0)
  case object TUE extends WeekDay(1)
  case object WED extends WeekDay(2)
  case object THU extends WeekDay(3)
  case object FRI extends WeekDay(4)
  case object SAT extends WeekDay(5)
  case object SUN extends WeekDay(6)

  val values = Map(0 -> MON, 1 -> TUE, 2 -> WED, 3 -> THU, 4 -> FRI, 5 -> SAT, 6 -> SUN)
}
import WeekDay._

object Example {

  case class MyCassandraRow(id: String, weight: Int, day: WeekDay)

  def main (args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("cassandra-connector-example")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.cassandra.connection.host", "127.0.0.1")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)

    val data = sc.parallelize(
      Seq(
        MyCassandraRow("identifier1", 10, MON),
        MyCassandraRow("identifier2", 20, FRI),
        MyCassandraRow("identifier3", 1, SUN)
      )
    )

    data.saveToCassandra("db", "custom_data")
  }
}

如果我使用 TEXT 为“day”字段创建我的 custom_data 表,此代码可以正常工作,但如果我使用以下堆栈跟踪设置为 INT,则会失败:

com.datastax.spark.connector.types.TypeConversionException: Cannot convert object FRI of type class WeekDay$FRI$ to java.lang.Integer.
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:42)
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:40)
at scala.PartialFunction$AndThen.applyOrElse(PartialFunction.scala:185)

所以,我尝试按照https://github.com/datastax/spark-cassandra-connector/blob/master/doc/6_advanced_mapper.md 中的描述实现 TypeConverter 如下:

implicit object IntToWeekDayConverter extends TypeConverter[WeekDay] {
  def targetTypeTag = typeTag[WeekDay]
  def convertPF = {
    case i: Int => values.getOrElse(i, MON)
  }
}

implicit object WeekDayToIntConverter extends TypeConverter[Int] {
  def targetTypeTag = typeTag[Int]
  def convertPF = {
    case d: WeekDay => d.id
  }
}

但我仍然遇到同样的错误。

我已经在这里发布了整个 scala 文件:https://gist.github.com/davideanastasia/b0bef569b4b7dec66c3f#file-cassandraenum-scala

【问题讨论】:

    标签: apache-spark cassandra spark-cassandra-connector


    【解决方案1】:

    Spark Cassandra 连接器中没有来自 Enum -> Integer 的自动转换器。我只是用.id 映射该列以获取整数表示。

    object WeekDay extends Enumeration {
      type WeekDay = Value
      val Mon, Tue, Wed, Thu, Fri, Sat, Sun = Value
    }
    import WeekDay._
    val meetingDays = Seq(WeekDay.Mon, WeekDay.Wed)
    //meetingDays: Seq[WeekDay.Value] = List(Mon, Wed)
    meetingDays.map(_.id)
    //Seq[Int] = List(0, 2) 
    

    【讨论】:

    • 然后我需要一个临时类来将 Cassandra 的输入和输出转换为我的案例类。我想我可以利用 Spark Cassandra 连接器中的 TypeConverter 为我完成繁重的工作。无论如何感谢您的回答,我已经扩展了我的问题,所以上下文可能更清楚一点。
    • 您是否尝试过转换为 java.lang.Integer 而不是 scala
    • 是的,这行得通!在那之后我不得不改变一些小事情,但这使它起作用了。如果你花时间写一个完整的答案,我会投赞成票。
    猜你喜欢
    • 2023-03-10
    • 2015-05-24
    • 2015-10-28
    • 2015-05-19
    • 2016-09-02
    • 1970-01-01
    • 1970-01-01
    • 2018-08-17
    • 2017-03-04
    相关资源
    最近更新 更多