【发布时间】:2019-07-27 14:16:22
【问题描述】:
我使用 Spark 结构流从 Kafka 消费,并使用 Foreach 插入 Datastax Cassandra。当我插入 BigInt 和 String 时,它正在插入,但是当我插入 Double 值时,它抛出“找不到请求操作的编解码器:[varchar java.lang.Double]”。如何编写自定义编解码器以在 SCALA 中接受 Double、Long 值。
val view_a = VW_MS_PLAN_UNIT_LA
.writeStream
.option(WriteConf.IgnoreNullsParam.name, "true")
.queryName("VIEW PLAN UNIT LA")
.outputMode("Append")
.foreach(new CassandraSinkForeach)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
df.show()
Spark 的示例数据框(Printschema) 一个 - 字符串 b- 比格特 C- 双人
Cassandra 表示例;- 创建表 a ( 一个字符串, b大整数, c双 )
var cassandraDriver: CassandraDriver = null;
var preparedStatement: PreparedStatement = null;
def open(partitionId: Long, version: Long): Boolean = {
// open connection
println(s"Open connection")
cassandraDriver = new CassandraDriver();
preparedStatement = cassandraDriver.connector.withSessionDo(session =>
session.prepare(s"""
insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink}
(a, b, c) values(?, ?, ?)""")
true
}
def process(record: org.apache.spark.sql.Row) = {
println(s"Process new $record")
cassandraDriver.connector.withSessionDo(session =>
session.execute(preparedStatement.bind(${record.getAs[String](0)},
${record.getAs[BigInt](1)}, ${record.getAs[Double](2)}))
)
}
com.datastax.driver.core.exceptions.CodecNotFoundException:未找到请求操作的编解码器:[varchar java.lang.Double]。我以前的帖子也将有助于更多地描述它 How to change Datatypes of records inserting into Cassandra using Foreach Spark Structure streaming
【问题讨论】:
-
难道
${record.getAs[String](0)应该改为Double? -
谢谢,先生,我是您和您页面的忠实追随者。实际上,我想将 Double 值插入 datastax Cassandra column.getAs[BigInt] 与 java 一起工作得很好。长。但是将 Double 插入 Cassandra 存在编解码器问题。我正在使用 Spark/Scala。@JacekLaskowski
-
我认为您应该专注于开发一个将 Double 插入 Cassandra 的 Scala 示例代码。我不认为这与 Spark 本身有关,不是吗?
-
当然,先生,这是一个 Scala 编解码器问题。我正在使用 Scala 对象并获取 ROW 类型记录并根据列将其分开。 getAs[String] 和 getAs{bigInt] 与 Cassandra 中的文本和 Bigint 数据类型完美匹配。只有我无法转换 Double,因为它是 scala 中的一个对象。
-
如果您使用 DSE Analytics,为什么不尝试使用
writeStream?
标签: scala apache-spark spark-structured-streaming spark-cassandra-connector