【问题标题】:Cannot connect to cassandra from Spark无法从 Spark 连接到 cassandra
【发布时间】:2016-04-25 05:45:03
【问题描述】:

我的 cassandra 中有一些测试数据。我正在尝试从 spark 获取此数据,但出现如下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o25.load.

java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042

这是我到目前为止所做的:

  1. 开始./bin/cassandra
  2. 使用cqlkeyspace ="testkeyspace2"table="emp" 以及一些键和对应值创建了测试数据。
  3. 写了 standalone.py
  4. 运行以下pyspark shell 命令。

    sudo ./bin/spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.0.jar \
    --packages TargetHolding:pyspark-cassandra:0.2.4 \
    examples/src/main/python/standalone.py
    
  5. 收到上述错误。


standalone.py:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("Stand Alone Python Script")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
loading=sqlContext.read.format("org.apache.spark.sql.cassandra")\
                        .options(table="emp", keyspace = "testkeyspace2")\
                        .load()\
                        .show()

我也尝试使用 --packages datastax:spark-cassandra-connector:1.5.0-RC1-s_2.11,但我遇到了同样的错误。


调试:

我检查了

netstat -tulpn | grep -i listen | grep <cassandra_pid>

并看到它正在侦听端口 9042。


完整的日志跟踪:

Traceback (most recent call last):
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/standalone.py", line 8, in <module>
    .options(table="emp", keyspace = "testkeyspace2")\
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 139, in load
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o25.load.
: java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$.getTokenFactory(CassandraRDDPartitioner.scala:176)
    at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:203)
    at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:57)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.1.1:9042 (com.datastax.driver.core.TransportException: [/127.0.1.1:9042] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:227)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:82)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1307)
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:339)
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
    ... 22 more

我做错了吗?

我对这一切真的很陌生,所以我可以提出一些建议。谢谢!

【问题讨论】:

  • 在您的 cassandra 节点上的 cassandra.yaml 中配置的 rpc_address 和 broadcast_rpc_address(如果已设置)是什么?
  • rpc_address: localhost , rpc_port: 9160 和 broadcast_rpc_address 未设置。
  • 啊,我敢打赌,cassandra 正在将 localhost 解析为 127.0.0.1 并专门监听它。我只是在本地进行了测试,至少对我来说似乎是这样。 'cqlsh 127.0.0.1' 有效,但 'cqlsh 127.0.1.1' 无效。您也可以尝试一下并验证是否是这种情况
  • 当我尝试 cqlsh 127.0.1.1 Connection error: ('Unable to connect to any servers', {'127.0.1.1': error(111, "Tried connecting to [('127.0.1.1', 9042)]. Last error: Connection refused")})
  • 当您尝试使用 127.0.0.1 时它是否有效?

标签: apache-spark cassandra pyspark datastax spark-cassandra-connector


【解决方案1】:

根据我们在问题 cmets 中的对话,问题是 'localhost' 在您的 cassandra.yaml 文件中用于 rpc_address。 Cassandra 使用操作系统将 'localhost' 解析为 127.0.0.1 并明确监听该接口。

要解决此问题,您需要在 cassandra.yaml 中将 rpc_address 更新为 127.0.1.1 并重新启动 cassandra,或者将 SparkConf 更新为引用 127.0.0.1,即:

conf = SparkConf().setAppName("Stand Alone Python Script")
                  .set("spark.cassandra.connection.host", "127.0.0.1")

虽然对我来说似乎很奇怪的一件事是 spark.cassandra.connection.host 也默认为“localhost”,所以对我来说很奇怪 spark cassandra 连接器将“localhost”解析为“127.0.1.1”但 cassandra将其解析为“127.0.0.1”。

【讨论】:

  • 我尝试了这两种方法,但我收到了错误 Failed to open native connection to Cassandra at {127.0.1.1}:9042 。所以我面临同样的问题,但只是现在地址不同。
  • 您确定您正确地更改了 rpc_address 并重新启动了 cassandra? 'cqlsh 127.0.1.1' 现在可以工作了吗?你碰巧在 OS X 上吗?如果是这样,您可能需要手动添加该接口,即“sudo ifconfig lo0 alias 127.0.1.1 up”。
  • 是的,我确定我更改了 rpc_address 并重新启动了 casandra。 cqlsh 127.0.1.1 工作正常,这是netstat -tulpn | grep -i listen | grep &lt;cassandra_pid&gt;justpaste.it/qo7j 的输出。当我启动cassandra时,它最后说INFO 17:25:49 Node localhost/127.0.0.1 state jump to NORMAL
  • 我在 Linux Mint 17 上。
  • Node localhost/127.0.0.1 state jump to NORMAL 指的是为listen_address选择的地址,这是集群中其他节点用于与该主机通信的地址,而不是客户端连接使用的地址。
【解决方案2】:

我在/etc/hosts 中检查了我的 linux hosts 文件,内容类似于

127.0.0.1       localhost
127.0.1.1       <my hostname>

我改成:

127.0.0.1       localhost
127.0.0.1       <my hostname>

效果很好。

正如您在自己的日志文件 line number 58 中看到的那样,它提到了 Your hostname, ganguly resolves to a loopback address: 127.0.1.1; using 192.168.1.32 instead (on interface wlan0),我想这也适用于您的情况。

【讨论】:

    【解决方案3】:

    将此添加到您的 --packages 依赖项旁边,它对我来说非常好。 --conf spark.cassandra.connection.host="127.0.0.1"

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-09-04
      • 2023-03-14
      • 1970-01-01
      • 1970-01-01
      • 2021-01-04
      • 2018-08-17
      • 1970-01-01
      • 2017-06-12
      相关资源
      最近更新 更多