【问题标题】:Codec not found for requested operation: [varchar <-> java.lang.Double], Inserting to Datastax cassandra from spark structure未找到请求操作的编解码器:[varchar <-> java.lang.Double],从 spark 结构插入 Datastax cassandra
【发布时间】:2019-07-27 14:16:22
【问题描述】:

我使用 Spark 结构流从 Kafka 消费,并使用 Foreach 插入 Datastax Cassandra。当我插入 BigInt 和 String 时,它正在插入,但是当我插入 Double 值时,它抛出“找不到请求操作的编解码器:[varchar java.lang.Double]”。如何编写自定义编解码器以在 SCALA 中接受 Double、Long 值。

val view_a = VW_MS_PLAN_UNIT_LA
      .writeStream
      .option(WriteConf.IgnoreNullsParam.name, "true")
      .queryName("VIEW PLAN UNIT LA")
      .outputMode("Append")
      .foreach(new CassandraSinkForeach)
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()


df.show()

Spark 的示例数据框(Printschema) 一个 - 字符串 b- 比格特 C- 双人

Cassandra 表示例;- 创建表 a ( 一个字符串, b大整数, c双 )

 var cassandraDriver: CassandraDriver = null;
  var preparedStatement: PreparedStatement = null;
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    cassandraDriver = new CassandraDriver();
    preparedStatement = cassandraDriver.connector.withSessionDo(session =>
      session.prepare(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} 
      (a, b, c) values(?, ?, ?)""")
    true
  }

  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(preparedStatement.bind(${record.getAs[String](0)}, 
           ${record.getAs[BigInt](1)}, ${record.getAs[Double](2)}))
    )
  }

com.datastax.driver.core.exceptions.CodecNotFoundException:未找到请求操作的编解码器:[varchar java.lang.Double]。我以前的帖子也将有助于更多地描述它 How to change Datatypes of records inserting into Cassandra using Foreach Spark Structure streaming

【问题讨论】:

  • 难道${record.getAs[String](0) 应该改为Double
  • 谢谢,先生,我是您和您页面的忠实追随者。实际上,我想将 Double 值插入 datastax Cassandra column.getAs[BigInt] 与 java 一起工作得很好。长。但是将 Double 插入 Cassandra 存在编解码器问题。我正在使用 Spark/Scala。@JacekLaskowski
  • 我认为您应该专注于开发一个将 Double 插入 Cassandra 的 Scala 示例代码。我不认为这与 Spark 本身有关,不是吗?
  • 当然,先生,这是一个 Scala 编解码器问题。我正在使用 Scala 对象并获取 ROW 类型记录并根据列将其分开。 getAs[String] 和 getAs{bigInt] 与 Cassandra 中的文本和 Bigint 数据类型完美匹配。只有我无法转换 Double,因为它是 scala 中的一个对象。
  • 如果您使用 DSE Analytics,为什么不尝试使用 writeStream

标签: scala apache-spark spark-structured-streaming spark-cassandra-connector


【解决方案1】:

再次查看消息后 - 您的数据不适合表结构。只需添加显式转换...

此外,要在 DataStax Java 驱动程序中使用 Scala 类型,您可以从 java-driver-scala-extras 存储库中获取编解码器。不幸的是,它没有“官方” jar 构建,因此您要么需要自己编译和部署代码,要么只需将代码片段包含到您的项目中。 DataStax 开发博客上有一个blog post 解释了它是如何实现的。

【讨论】:

    猜你喜欢
    • 2019-01-03
    • 2019-05-27
    • 2018-06-08
    • 2020-10-22
    • 1970-01-01
    • 2017-03-24
    • 2017-02-16
    • 2017-04-07
    • 2018-01-06
    相关资源
    最近更新 更多