【问题标题】:Spark Streaming Dataset Cassandra Connection UnsupportedOperationCheckerSpark Streaming 数据集 Cassandra 连接 UnsupportedOperationChecker
【发布时间】:2020-12-07 05:10:18
【问题描述】:

我正在尝试将我的流式数据集写入 Cassandra。

我有以下类的流式数据集;

case class UserSession(var id: Int,
                       var visited: List[String]
                      )

我在 Cassandra 中也有以下键空间/表。 (博客=KeySpace,会话=表

CREATE KEYSPACE blog WITH REPLICATION = { 'class' : 'SimpleStrategy',    'replication_factor' : 1 };


CREATE TABLE blog.session(id int PRIMARY KEY, visited list<text>);

我选择list&lt;text&gt;作为访问,因为我访问的类型是List&lt;String&gt;

我的foreach作者如下

class SessionCassandraForeachWriter extends ForeachWriter[UserSession] {

/*
  - on every batch, on every partition `partitionId`
    - on every "epoch" = chunk of data
      - call the open method; if false, skip this chunk
      - for each entry in this chunk, call the process method
      - call the close method either at the end of the chunk or with an error if it was thrown
 */

val keyspace = "blog"
val table = "session"
val connector = CassandraConnector(sparkSession.sparkContext.getConf)

override def open(partitionId: Long, epochId: Long): Boolean = {
  println("Open connection")
  true
}

override def process(sess: UserSession): Unit = {
  connector.withSessionDo { session =>
    session.execute(
      s"""
         |insert into $keyspace.$table("id")
         |values (${sess.id},${sess.visited})
       """.stripMargin)
  }
}

override def close(errorOrNull: Throwable): Unit = println("Closing connection")

 }

查看我的流程函数可能会有所帮助,因为这可能会引发错误。我的主要是以下。

finishedUserSessionsStream: DataSet[UserSession]

def main(args: Array[String]): Unit = {
/// make finishedUserSessionStreams.....

finishedUserSessionsStream.writeStream
      .option("checkpointLocation", "checkpoint")
      .foreach(new SessionCassandraForeachWriter)
      .start()
      .awaitTermination()

}

这给了我以下错误

org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431)

【问题讨论】:

  • 使用什么版本的 Spark Cassandra 连接器?和 Spark 版本
  • spark cassandra 连接器是“3.0.0-alpha2”,我使用的是 spark 版本 3.0.0

标签: apache-spark cassandra spark-structured-streaming spark-cassandra-connector


【解决方案1】:

对于 Spark 3.0 和 Spark Cassandra 连接器 3.0.0,您不应该使用 foreach - 这是 SCC Starting with SCC 2.5.0,你可以直接写数据到Cassandra,像这样(这里是full example):

     val query = streamingCountsDF.writeStream
      .outputMode(OutputMode.Update)
      .format("org.apache.spark.sql.cassandra")
      .option("checkpointLocation", "checkpoint")
      .option("keyspace", "ks")
      .option("table", "table")
      .start()

您还需要改用包含大量修复的 SCC 3.0.0-beta。

【讨论】:

  • 错误消息不支持OutputMode.Update。我将其更改为附加。然后回到原来的问题 org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError
  • 您是否在流数据集上调用.cache?如果是,请参阅issues.apache.org/jira/browse/SPARK-20927
  • 没有缓存它。我把它改回更新,错误是这个 java.lang.IllegalArgumentException: 要求失败:会话不支持更新模式。 session 是 cassandra 中的一张表(我在本地拥有的一张)。你认为我应该改变创建会话表的方式吗?
  • 它应该适用于append 模式,但可能取决于您在编写之前所做的转换
  • 啊,提供一些进一步的背景信息。我事先正在做 MapGroupWithState 。但它应该以 Dataset[UserSession] 对象结束。当我将它写入控制台时它可以完美运行
猜你喜欢
  • 2017-01-13
  • 2016-08-14
  • 2021-11-21
  • 2016-01-29
  • 2019-01-07
  • 1970-01-01
  • 2015-02-03
  • 1970-01-01
  • 2017-08-19
相关资源
最近更新 更多