【问题标题】:Failed to write statements写语句失败
【发布时间】:2015-07-07 14:31:25
【问题描述】:

我正在将 spark 与 cassandra 一起使用,我想将数据写入我的 cassandra 表中:

CREATE TABLE IF NOT EXISTS MyTable(
 user TEXT,
 date TIMESTAMP,
 event TEXT,
 PRIMARY KEY((user ),date , event)
);

但我得到了这个错误:

java.io.IOException: Failed to write statements to KeySpace.MyTable.
    at    com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/04/28 17:57:47 WARN TaskSetManager: Lost task 13.2 in stage 1.0 (TID 43, dev2-cim.aid.fr): TaskKilled (killed intentionally)

我的 Cassandra 日志文件中出现了这个警告:

WARN  [SharedPool-Worker-2] 2015-04-28 16:45:21,219 BatchStatement.java:243 - Batch of prepared statements for [*********] is of size 8158, exceeding specified threshold of 5120 by 3038

在互联网上进行了一些搜索后,我找到了这个链接,他解释了他如何解决同样的问题: http://progexc.blogspot.fr/2015/03/write-batch-size-error-spark-cassandra.html

所以,现在我修改了我的 spark 算法以添加:

conf.set("spark.cassandra.output.batch.grouping.key", "None")
conf.set("spark.cassandra.output.batch.size.rows", "10")
conf.set("spark.cassandra.output.batch.size.bytes", "2048")

这个值删除了我在 cassandra 日志中收到的警告消息,但我仍然有同样的错误:Failed to write statements

在我的 spark 日志失败中,我发现了这个错误:

Failed to execute: 
    com.datastax.spark.connector.writer.RichBatchStatement@67827d57
    com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be    empty
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:103)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293)
    at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455)
    at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at  org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)

【问题讨论】:

  • 在失败留下的堆栈跟踪中,你看到 cause 了吗?警告可能与它无关。
  • 我刚刚编辑了我的帖子以显示堆栈跟踪。但我在堆栈跟踪中没有看到原因。
  • 我在spark的日志文件中发现了这个原因:InvalidQueryException: Key may not be empty
  • 哦。这更具体也更明确:批处理中的某些行包含无效数据。可以出示一下代码吗?
  • 是的,谢谢,这只是我的错,我有 6 行的空/空白用户。感谢您的帮助

标签: apache-spark cassandra-2.0 datastax


【解决方案1】:

我通过像节点一样重新启动集群来解决问题。 以下是我尝试过的事情。 我也面临同样的问题,我尝试了你在博客中提到的所有选项,但没有成功。 我的数据大小是 174GB。总共 174 Gb 数据,我的集群有 3 个节点,每个节点有 16 个核心和 48 GB 内存。 当时我尝试单次加载 174gb,但我遇到了同样的问题。 之后,我将 174 GB 隔离在 109 个文件中,每个文件 1.6 GB 并尝试加载,这次我在加载 100 个文件(每个 1.6 GB)后再次遇到同样的问题。 我认为可能是 101 文件中的数据有问题。我尝试加载第一个文件并尝试将第一个文件加载到新表中,并尝试将新数据加载到新表中,但所有这些情况都有问题。 然后我认为这是 cassandra 集群的问题,并且也重新启动了集群和节点。 然后问题就消失了。

【讨论】:

    【解决方案2】:

    我遇到了同样的问题,并在上​​面的 cmets 中找到了解决方案(由 Amine CHERIFI 和 maasg 提供)。

    与主键对应的列并不总是填充正确的值(在我的例子中是一个空字符串“”)。

    这触发了错误

    ERROR QueryExecutor: Failed to execute: \
    com.datastax.spark.connector.writer.RichBatchStatement@26ad2668 \
    com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be empty
    

    解决方案是提供一个默认的非空字符串。

    【讨论】:

      【解决方案3】:

      在“com/datastax/spark/connector/writer/AsyncExecutor.scala:45”中添加断点,即可得到真正的异常。

      在我的例子中,我的 keyspace 的 replication_factor 是 2,但我只有一个活着。

      【讨论】:

        【解决方案4】:

        如果您在 yarn-cluster 模式下运行,请不要忘记使用 yarn logs -applicationId <appId> --appOwner <appOwner> 检查整​​个纱线日志。 这比yarn webUI上的日志给了我更多失败的原因

        Caused by: com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)
        at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:50)
        at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
        at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266)
        at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
        ... 11 more
        

        解决方案是在您的 spark-defaults.conf 中设置 spark.cassandra.output.consistency.level=ANY

        【讨论】:

        • 理想的 spark.cassandra.output.batch.grouping.buffer.size 应该是什么?如何计算?
        猜你喜欢
        • 2018-02-02
        • 2019-01-26
        • 1970-01-01
        • 1970-01-01
        • 2021-10-17
        • 2015-02-24
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多