【问题标题】:Cannot save java date to Cassandra无法将 java 日期保存到 Cassandra
【发布时间】:2016-08-05 17:44:54
【问题描述】:

我尝试使用 spark 将一些数据保存到 casandra,但是当我尝试保存日期时,我得到了

线程“主”org.apache.spark.sql.AnalysisException 中的异常: 无法从 field2#5 中提取值;在 org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:73) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:475) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:467) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:339) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:339) 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:338) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:27​​3) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$8.apply(TreeNode.scala:298) 在 scala.collection.MapLike$MappedValues$$anonfun$iterator$3.apply(MapLike.scala:246) 在 scala.collection.MapLike$MappedValues$$anonfun$iterator$3.apply(MapLike.scala:246) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) 在 scala.collection.IterableLike$class.foreach(Ite​​rableLike.scala:72) 在 scala.collection.IterableLike$$anon$1.foreach(Ite​​rableLike.scala:294) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:24) 在 scala.collection.TraversableViewLike$class.force(TraversableViewLike.scala:87) 在 scala.collection.IterableLike$$anon$1.force(IterableLike.scala:294) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:27​​3) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:27​​3) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$8.apply(TreeNode.scala:298) 在 scala.collection.MapLike$MappedValues$$anonfun$iterator$3.apply(MapLike.scala:246) 在 scala.collection.MapLike$MappedValues$$anonfun$iterator$3.apply(MapLike.scala:246) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) 在 scala.collection.IterableLike$class.foreach(Ite​​rableLike.scala:72) 在 scala.collection.IterableLike$$anon$1.foreach(Ite​​rableLike.scala:294) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:24) 在 scala.collection.TraversableViewLike$class.force(TraversableViewLike.scala:87) 在 scala.collection.IterableLike$$anon$1.force(IterableLike.scala:294) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:27​​3) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:27​​3) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:108) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:118) 在 org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:122) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 在 scala.collection.immutable.List.foreach(List.scala:318) 在 scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 在 scala.collection.AbstractTraversable.map(Traversable.scala:105) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:122) 在 org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:27​​3) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:467) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:347) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:347) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:328) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80) 在 scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 在 scala.collection.immutable.List.foldLeft(List.scala:84) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72) 在 scala.collection.immutable.List.foreach(List.scala:318) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) 在 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:294) 在 org.apache.spark.sql.Dataset.(Dataset.scala:79) 在 org.apache.spark.sql.Dataset.(Dataset.scala:90) 在 org.apache.spark.sql.DataFrame.as(DataFrame.scala:209) 在 casandra.casandra.App.readFromSqlServer(App.java:55) 在 casandra.casandra.App.main(App.java:76)

错误出现在:

 Dataset<Table1> tData = dataFrame.as(Encoders.bean(Table1.class));
 List<Table1> tList = tData.collectAsList();

我的表类

public class Table1 {
    private String field1;
    private Date field2;
}

我的卡桑德拉表:

CREATE TABLE "a"."table1" (
field1 text,
field2 timestamp,
PRIMARY KEY (( field1 )));

有人知道怎么解决吗?

编辑:

SparkConf conf = new SparkConf();
conf.setAppName("Casandra Test");
conf.setMaster("local[*]");
conf.set("spark.cassandra.connection.host", adress);
App app = new App(conf);
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
 Map<String, String> options = new HashMap<String, String>();
options.put("url", sqlServerAddress);
options.put("dbtable", "(SELECT field1, field2 FROM table1");
options.put("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver");
DataFrame dataFrame = sqlContext.read().format("jdbc").options(options).load();        
Dataset<table1> ceData = dataFrame.as(Encoders.bean(Table1.class));
List<table1> ceList = ceData.collectAsList();           
JavaRDD<table1> ceRDD = sc.parallelize(app.readFromSqlServer(sqlContext));
javaFunctions(ceRDD).writerBuilder("a", "table1", mapToRow(Table1.class)).saveToCassandra();
sc.stop();

【问题讨论】:

  • 你能给出完整的spark代码吗?尤其是当你调用saveToCassandra() 方法时
  • @doanduyhai:我添加了这部分代码,但我的应用程序在它之前崩溃了。
  • 请显示“dataFrame.as(Encoders.bean(Table1.class));”中的完整代码直到“saveToCassandra”。现在根本无法理解……
  • @doanduyhai 我提交了完整的代码。
  • 我能看到一些奇怪的东西,不应该是 Table1.class 而不是 table1.class (大写 T 信)?

标签: java apache-spark cassandra


【解决方案1】:

好的,我找到了你的问题。

如果您查看方法 Encoders.bean() 的官方文档,它会说:

Creates an encoder for Java Bean of type T.

T must be publicly accessible.

supported types for java bean field:

- primitive types: boolean, int, double, etc.
- boxed types: Boolean, Integer, Double, etc.
- String
- java.math.BigDecimal
- time related: java.sql.Date, java.sql.Timestamp
- collection types: only array and java.util.List currently, map support is in progress
nested java bean.

java.util.Date 类型未提及,因此 Encoders

不支持

您应该更新您的 Table1 类以使用其他类型而不是 java.util.Date

【讨论】:

  • 好的,现在我将类型更改为 sql.date,它得到了我 org.codehaus.commons.compiler.CompileException:文件 'generated.java',第 82 行,第 85 列:没有适用的构造函数/方法找到实际参数“long”;候选人是:“public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)”
  • 正常,你的类型是java.sql.Date,Cassandra列类型是timestamp,不匹配。我建议在 Cassandra 中的 Table1bigint 类型中使用 Long 类型并在应用程序中自己执行转换。很遗憾,Spark Encoders 不能支持java.util.Date
  • 谢谢你,它有效。但是很遗憾不支持date
猜你喜欢
  • 2018-10-19
  • 1970-01-01
  • 2021-01-24
  • 2012-08-20
  • 1970-01-01
  • 1970-01-01
  • 2015-11-14
  • 2021-01-18
  • 1970-01-01
相关资源
最近更新 更多