【发布时间】:2016-08-05 17:44:54
【问题描述】:
我尝试使用 spark 将一些数据保存到 casandra,但是当我尝试保存日期时,我得到了
线程“主”org.apache.spark.sql.AnalysisException 中的异常: 无法从 field2#5 中提取值;在 org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:73) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:475) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfun$applyOrElse$4.applyOrElse(Analyzer.scala:467) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:339) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:339) 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:338) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$8.apply(TreeNode.scala:298) 在 scala.collection.MapLike$MappedValues$$anonfun$iterator$3.apply(MapLike.scala:246) 在 scala.collection.MapLike$MappedValues$$anonfun$iterator$3.apply(MapLike.scala:246) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 在 scala.collection.IterableLike$$anon$1.foreach(IterableLike.scala:294) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:24) 在 scala.collection.TraversableViewLike$class.force(TraversableViewLike.scala:87) 在 scala.collection.IterableLike$$anon$1.force(IterableLike.scala:294) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$8.apply(TreeNode.scala:298) 在 scala.collection.MapLike$MappedValues$$anonfun$iterator$3.apply(MapLike.scala:246) 在 scala.collection.MapLike$MappedValues$$anonfun$iterator$3.apply(MapLike.scala:246) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 在 scala.collection.IterableLike$$anon$1.foreach(IterableLike.scala:294) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:24) 在 scala.collection.TraversableViewLike$class.force(TraversableViewLike.scala:87) 在 scala.collection.IterableLike$$anon$1.force(IterableLike.scala:294) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:108) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:118) 在 org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:122) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 在 scala.collection.immutable.List.foreach(List.scala:318) 在 scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 在 scala.collection.AbstractTraversable.map(Traversable.scala:105) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:122) 在 org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:467) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10.applyOrElse(Analyzer.scala:347) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) 在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:347) 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:328) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80) 在 scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 在 scala.collection.immutable.List.foldLeft(List.scala:84) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72) 在 scala.collection.immutable.List.foreach(List.scala:318) 在 org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) 在 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:294) 在 org.apache.spark.sql.Dataset.(Dataset.scala:79) 在 org.apache.spark.sql.Dataset.(Dataset.scala:90) 在 org.apache.spark.sql.DataFrame.as(DataFrame.scala:209) 在 casandra.casandra.App.readFromSqlServer(App.java:55) 在 casandra.casandra.App.main(App.java:76)
错误出现在:
Dataset<Table1> tData = dataFrame.as(Encoders.bean(Table1.class));
List<Table1> tList = tData.collectAsList();
我的表类
public class Table1 {
private String field1;
private Date field2;
}
我的卡桑德拉表:
CREATE TABLE "a"."table1" (
field1 text,
field2 timestamp,
PRIMARY KEY (( field1 )));
有人知道怎么解决吗?
编辑:
SparkConf conf = new SparkConf();
conf.setAppName("Casandra Test");
conf.setMaster("local[*]");
conf.set("spark.cassandra.connection.host", adress);
App app = new App(conf);
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
Map<String, String> options = new HashMap<String, String>();
options.put("url", sqlServerAddress);
options.put("dbtable", "(SELECT field1, field2 FROM table1");
options.put("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver");
DataFrame dataFrame = sqlContext.read().format("jdbc").options(options).load();
Dataset<table1> ceData = dataFrame.as(Encoders.bean(Table1.class));
List<table1> ceList = ceData.collectAsList();
JavaRDD<table1> ceRDD = sc.parallelize(app.readFromSqlServer(sqlContext));
javaFunctions(ceRDD).writerBuilder("a", "table1", mapToRow(Table1.class)).saveToCassandra();
sc.stop();
【问题讨论】:
-
你能给出完整的spark代码吗?尤其是当你调用
saveToCassandra()方法时 -
@doanduyhai:我添加了这部分代码,但我的应用程序在它之前崩溃了。
-
请显示“dataFrame.as(Encoders.bean(Table1.class));”中的完整代码直到“saveToCassandra”。现在根本无法理解……
-
@doanduyhai 我提交了完整的代码。
-
我能看到一些奇怪的东西,不应该是 Table1.class 而不是 table1.class (大写 T 信)?
标签: java apache-spark cassandra