【问题标题】:write dataframe to cassandra facing BusyPoolException将数据帧写入面向 BusyPoolException 的 cassandra
【发布时间】:2019-08-14 05:32:59
【问题描述】:

我正在尝试使用这行代码将数据帧写入 cassandra,有一天能够写入表,但突然出现错误

alertdf
.write.format("org.apache.spark.sql.cassandra")
                 .options(Map("keyspace" -> "dummy", "table" -> "dummytable"))
                  .mode(SaveMode.Append)
                  .save()

我收到以下错误,无法找出问题所在

  ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@7dba59e2
        com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: **.**.**.**/**.**.**.**:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [**.**.**.**/**.**.**.**] Pool is busy (no available connection and the queue has reached its max size 256)))
            at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:211)
            at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:46)
            at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:275)
            at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.onFailure(RequestHandler.java:338)
            at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$6.run(Futures.java:1310)
            at shade.com.datastax.spark.connector.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:457)
            at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ImmediateFuture.addListener(Futures.java:106)
            at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures.addCallback(Futures.java:1322)
            at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures.addCallback(Futures.java:1258)
            at com.datastax.driver.core.RequestHandler$SpeculativeExecution.query(RequestHandler.java:297)
            at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:272)
            at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)
            at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:95)
            at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
            at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
            at com.sun.proxy.$Proxy14.executeAsync(Unknown Source)
            at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
            at com.sun.proxy.$Proxy15.executeAsync(Unknown Source)
            at com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$lessinit$greater$1.apply(QueryExecutor.scala:11)
            at com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$lessinit$greater$1.apply(QueryExecutor.scala:11)
            at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31)
            at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$2.apply(TableWriter.scala:199)
            at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$2.apply(TableWriter.scala:198)
            at scala.collection.Iterator$class.foreach(Iterator.scala:893)
            at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
            at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:198)
            at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
            at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
            at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
            at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
            at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
            at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
            at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
            at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
            at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
            at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
            at org.apache.spark.scheduler.Task.run(Task.scala:86)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)

谁能帮我解决这个问题?

【问题讨论】:

    标签: cassandra datastax cassandra-3.0 spark-cassandra-connector


    【解决方案1】:

    您的服务器似乎已超载,无法按时处理您的请求。我建议尝试调整write-related configuration parameters,例如,output.concurrent.writesoutput.throughput_mb_per_sec 等,但我会从前 2 个开始。

    【讨论】:

    • 感谢您的回复,这两个因素的最佳值应该如何确定?
    • 如果不知道您使用的是什么硬件、如何调整 cassandra 等,这真的很难说。首先将输出吞吐量设置为 50,看看它是否可以工作。如果幸存,请尝试增加,如果没有 - 减少...
    • 我应该提到什么,以便我能在这方面得到你的帮助
    • 有时增加吞吐量等会造成更大的伤害......这真的取决于你的硬件等,加上 Spark 代码本身
    猜你喜欢
    • 2020-07-17
    • 2019-10-31
    • 1970-01-01
    • 2017-10-11
    • 2018-12-10
    • 2021-10-20
    • 2016-05-14
    • 2020-10-06
    • 2018-06-24
    相关资源
    最近更新 更多