【问题标题】:Error reading Cassandra TTL and WRITETIME with Spark 3.0使用 Spark 3.0 读取 Cassandra TTL 和 WRITETIME 时出错
【发布时间】:2021-12-24 05:40:47
【问题描述】:

虽然来自 DataStax states 的最新 spark-cassandra-connector 支持读/写 TTL 和 WRITETIME,但我仍然收到 SQL 未定义函数错误。

在 9.1 LTS ML(包括 Apache Spark 3.1.2、Scala 2.12)集群上使用 Databricks 和库 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.1.0 和 CassandraSparkExtensions 的 Spark 配置。 CQL 版本 3.4.5。

spark.sql.extensions com.datastax.spark.connector.CassandraSparkExtensions

用笔记本代码确认配置:

spark.conf.get("spark.sql.extensions")

输出[7]:'com.datastax.spark.connector.CassandraSparkExtensions'

# Cassandra connection configs using Data Source API V2
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.host", "10.1.4.4")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.port", "9042")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.auth.username", dbutils.secrets.get(scope = "myScope", key = "CassUsername"))
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.auth.password", dbutils.secrets.get(scope = "myScope", key = "CassPassword")) 
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.enabled", True)
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.trustStore.path", "/dbfs/user/client-truststore.jks")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.trustStore.password", dbutils.secrets.get("key-vault-secrets", "cassTrustPassword"))
spark.conf.set("spark.sql.catalog.cassandrauat.spark.dse.continuous_paging_enabled", False) 

# catalog name will be "cassandrauat" for Cassandra
spark.conf.set("spark.sql.catalog.cassandrauat", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.conf.set("spark.sql.catalog.cassandrauat.prop", "key")
spark.conf.set("spark.sql.defaultCatalog", "cassandrauat") # will override Spark to use Cassandra for all databases
%sql 
select id, did, ts, val, ttl(val) 
from cassandrauat.myKeyspace.myTable

SQL 语句错误:AnalysisException:未定义函数:'ttl'。该函数既不是注册的临时函数,也不是在数据库“默认”中注册的永久函数。第 1 行 25 号位

当直接在 Cassandra 集群上运行相同的 CQL 查询时,它会产生一个结果。

任何有关 CassandraSparkExtensions 未加载原因的帮助。

为预加载库后发生的 NoSuchMethodError 添加完整的堆栈跟踪

com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(Lscala/PartialFunction;)Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.replaceMetadata(CassandraMetadataFunctions.scala:152)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply$1.$anonfun$applyOrElse$2(CassandraMetadataFunctions.scala:187)
    at scala.collection.immutable.Stream.foldLeft(Stream.scala:549)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply$1.applyOrElse(CassandraMetadataFunctions.scala:186)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply$1.applyOrElse(CassandraMetadataFunctions.scala:183)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:484)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:484)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:460)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:428)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.apply(CassandraMetadataFunctions.scala:183)
    at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.apply(CassandraMetadataFunctions.scala:90)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$3(RuleExecutor.scala:221)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:221)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:89)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:218)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:210)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:210)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:271)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:264)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:191)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:188)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:109)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:188)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:246)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:347)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:245)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:96)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:180)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:180)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:97)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:86)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:101)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:689)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:684)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.$anonfun$executeSql$1(SQLDriverLocal.scala:91)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.executeSql(SQLDriverLocal.scala:37)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.repl(SQLDriverLocal.scala:144)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$13(DriverLocal.scala:541)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:50)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:50)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:518)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:689)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
    at java.lang.Thread.run(Thread.java:748)

    at com.databricks.backend.daemon.driver.SQLDriverLocal.executeSql(SQLDriverLocal.scala:129)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.repl(SQLDriverLocal.scala:144)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$13(DriverLocal.scala:541)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:50)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:50)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:518)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:689)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
    at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

    标签: apache-spark cassandra databricks azure-databricks spark-cassandra-connector


    【解决方案1】:

    如果您刚刚通过集群 UI 添加了 Spark Cassandra 连接器,那么它将无法工作 - 原因是在 Spark 启动后将库安装到集群中,因此找不到在 spark.sql.extensions 中指定的类。

    要解决此问题,您需要在 Spark 启动之前将 Jar 文件放入集群节点 - 您可以使用 cluster init script 执行此操作,这将使用类似这样的方式直接下载 jar(但它会下载多个副本 - 对于每个节点) :

    #!/bin/bash
    
    wget -q -O /databricks/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar \
      https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector-assembly_2.12/3.1.0/spark-cassandra-connector-assembly_2.12-3.1.0.jar
    

    或者最好下载汇编jar,放到DBFS上,然后从DBFS复制到目标目录(比如上传到/FileStore/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar):

    #!/bin/bash
    
    cp /dbfs/FileStore/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar \
      /databricks/jars/
    

    更新 (13.11.2021):SCC 3.1.0 与 Spark 3.2.0 不完全兼容(部分内容已在 DBR 9.1 中)。详情请见SPARKC-670

    【讨论】:

    • 现在我收到了 NoSuchMethodError。这可能是因为我使用的是带有 3.1.0 jar 的 Spark 3.1.2 集群吗?我看到使用 Azure Databricks 获得匹配的唯一选择是一直回到 Databricks 7.3LTS,这对我来说并不是一个真正的选择。驱动程序适用于不包括 ttl 或 writetime 的选择。
    • 你能发布堆栈跟踪吗?
    • 请将堆栈跟踪放入您的问题中,而不是答案中
    • 查看答案更新
    • 你可以试试这个版本:dropbox.com/s/jx8i5i7aw42bv5k/…
    猜你喜欢
    • 2020-05-25
    • 1970-01-01
    • 1970-01-01
    • 2019-04-10
    • 2015-05-10
    • 2015-08-16
    • 2014-01-31
    • 2016-04-29
    • 2017-01-18
    相关资源
    最近更新 更多