【问题标题】:cassandra-spark-connector error with repartitionByCassandraReplica functionrepartitionByCassandraReplica 函数的 cassandra-spark-connector 错误
【发布时间】:2015-06-24 09:35:08
【问题描述】:

我正在尝试使用 1.2 版本中的新加入功能,但 repl 中的 repartitionByCassandraReplica 函数出现错误。

我尝试复制网站的示例并创建了一个包含几个元素的 cassandra 表 (shopping_history): https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.mde

import com.datastax.spark.connector.rdd._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector._
import com.datastax.driver.core._

case class CustomerID(cust_id: Int)
val idsOfInterest = sc.parallelize(1 to 1000).map(CustomerID(_))
val repartitioned =  idsOfInterest.repartitionByCassandraReplica("cim_dev", "shopping_history", 10)
repartitioned.first()

我收到此错误:

15/04/13 18:35:43 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, dev2-cim.aid.fr): java.lang.ClassNotFoundException: $line31.$read$$iwC$$iwC$CustomerID
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:344)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098)
    at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我使用带有连接器 1.2.0 RC 3 的 spark 1.2.0。 idsOfInterest 上使用的 joinWithCassandraTable 函数有效。

我也很好奇两者之间的差异:joinWithCassandraTable / cassandraTable with a In 子句 / foreachPartition(withSessionDo) 语法。

它们是否都向充当协调器的本地节点请求数据? joinWithCassandraTable 与 repartitionByCassandraReplica 结合起来是否像异步查询一样高效,只向本地节点请求数据?如果不应用 repartitionByCassandraReplica 会发生什么?

我已经在 cassandra 连接器的谷歌论坛上问过这个问题: https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/b615ANGSySc

谢谢

【问题讨论】:

  • 在不知道您是如何运行此代码的情况下,我不确定您的类加载器问题,您能给我们您的提交命令或启动命令吗?
  • @RussS,我的启动命令是 spark-shell :) 在 spark-default.conf 中设置了 spark.executor.extraClassPath / spark.driver.extraClassPath 到 cassandra 连接器 jar。奇怪的是,在shell中创建的class not found...
  • 您使用的是完整程序集吗?也可以试试 --jars 在某些版本的 spark 上有时会出现一些类加载器的怪异现象。

标签: cassandra apache-spark connector


【解决方案1】:

我会在这里回答你的第二个问题,如果我能根据更多信息弄清楚的话,我会跟进第一部分。

我也很好奇两者之间的差异: joinWithCassandraTable / 带有 In 子句的 cassandraTable / foreachPartition(withSessionDo) 语法。

带有 in 子句的 cassandraTable 将创建单个 spark 分区。所以对于非常小的in子句可能是可以的,但是子句必须从驱动程序序列化到spark应用程序。这对于大型 in 子句可能非常糟糕,而且通常我们不希望在不需要时将数据从 spark 驱动程序来回发送到执行程序。

joinWithCassandraTableforeachPartition(withSessionDo) 非常相似。主要区别在于 joinWithCassandraTable 调用使用了连接器转换和读取代码,这将使从 Cassandra Rows 中获取 Scala 对象变得更加容易。在这两种情况下,您的数据都保持 RDD 形式,不会被序列化回驱动程序。他们还将使用前一个 RDD 中的分区器(或公开首选位置方法的最后一个 RDD),因此他们将能够使用 repartitionByCassandraTable 进行工作。

如果未应用repartitionByCassandraTable,则将在节点上请求数据,该节点可能是也可能不是您请求的信息的协调者。这将在您的查询中添加一个额外的网络跃点,但这可能不会造成很大的性能损失。在加入之前要重新分区的时间实际上取决于数据总量和重新分区操作中 spark shuffle 的成本。

【讨论】:

  • 感谢您的回答 RussS。我仍然对“可能或可能不是协调员”部分感到好奇。到目前为止,我一直在使用带有 IN 子句的 foreachPartition(withSessionDo) 语法来查询有关批量 customerID(Part.Key)的一些时间序列信息,并且我遇到了一些非常大的查询的问题。我想知道是否每个执行程序都在查询本地 cassandra 作为协调器,从而产生大量网络流量和 CPU 负载。这就是我对 joinWithCassandraTable 感兴趣的原因。那么,executor 什么时候需要 coordinator 呢? 1.1 和 1.2 的行为是否不同?
  • in 子句还执行连接不会执行的服务器端 multiget。至于协调器,如果只有一个 multiget 查询,则每输入只能获得一个协调器。
猜你喜欢
  • 2018-09-17
  • 2021-08-24
  • 2020-10-17
  • 2019-04-10
  • 1970-01-01
  • 2017-08-06
  • 2019-10-28
  • 2016-08-20
  • 2019-10-06
相关资源
最近更新 更多