【发布时间】:2021-04-17 13:31:17
【问题描述】:
我正在尝试从特定表中的 Cassandra 获取数据,并在进行一些更改后尝试将其插入到 Cassandra 中的另一个表中。这两个表都位于键空间“test”中。当我试图从第一个表中获取数据时,一切正常并且能够获取数据。但是,在处理第一个查询的输出的未来处理程序中,我试图将数据插入到同一 Cassandra 实例下的另一个表中,但它正在失败。我从应用程序中收到一条错误消息,指出“cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])" 。我不确定我哪里出错了
import threading
from threading import Event
from cassandra.query import SimpleStatement
from cassandra.cluster import Cluster
hosts=['127.0.0.1']
keyspace="test"
thread_local = threading.local()
cluster_ = Cluster(hosts)
def get_session():
if hasattr(thread_local, "cassandra_session"):
print("got session from threadlocal")
return thread_local.cassandra_session
print(" Connecting to Cassandra Host " + str(hosts))
session_ = cluster_.connect(keyspace)
print(" Connecting and creating session to Cassandra KeySpace " + keyspace)
thread_local.cassandra_session = session_
return session_
class PagedResultHandler(object):
def __init__(self, future):
self.error = None
self.finished_event = Event()
self.future = future
self.future.add_callbacks(
callback=self.handle_page,
errback=self.handle_error)
def handle_page(self, rows):
for row in rows:
process_row(row)
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()
def handle_error(self, exc):
self.error = exc
self.finished_event.set()
def process_row(row):
print(row)
session_ = get_session()
stmt = session_.prepare(
"INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?)")
results = session_.execute(stmt,
[row.customer, row.snr, row.rttt,row.created_time])
print("Done")
session = get_session()
query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
raise handler.error
cluster_.shutdown()
但是,当我尝试执行 python 文件时,应用程序抛出错误“cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])"来自“process_row”方法的 getSession() 调用。显然,第一次调用 Cassandra 成功了,没有任何问题。没有连接问题,Cassandra 实例在本地运行良好。我可以使用 cqlsh 查询数据。如果我在未来处理程序之外调用 process_row 方法一切正常,我不确定需要做什么才能从未来处理程序发生。
Connecting to Cassandra Host ['127.0.0.1']
Connecting and creating session to Cassandra KeySpace test
Row(customer='abcd', snr=100, rttt=121, created_time=datetime.datetime(2020, 8, 8, 2, 26, 51))
Connecting to Cassandra Host ['127.0.0.1']
Traceback (most recent call last):
File "test/check.py", , in <module>
raise handler.error
File "cassandra/cluster.py", line 4579, in cassandra.cluster.ResponseFuture._set_result
File "cassandra/cluster.py", line 4777, in cassandra.cluster.ResponseFuture._set_final_result
File "test/check.py"", in handle_page
process_row(row)
File "test/check.py"", in process_row
session_ = get_session()
File "/test/check.py"", in get_session
session_ = cluster_.connect(keyspace)
File "cassandra/cluster.py", line 1715, in cassandra.cluster.Cluster.connect
File "cassandra/cluster.py", line 1772, in cassandra.cluster.Cluster._new_session
File "cassandra/cluster.py", line 2553, in cassandra.cluster.Session.__init__
cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])
Process finished with exit code 1
【问题讨论】:
-
如果我可以问,您使用的是哪个 Python 版本?
-
我正在使用 python 3.7
标签: python python-3.x cassandra cassandra-3.0