【问题标题】:Python3: cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1']) when using execute_async futurePython3: cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1']) when using execute_async future
【发布时间】:2021-04-17 13:31:17
【问题描述】:

我正在尝试从特定表中的 Cassandra 获取数据,并在进行一些更改后尝试将其插入到 Cassandra 中的另一个表中。这两个表都位于键空间“test”中。当我试图从第一个表中获取数据时,一切正常并且能够获取数据。但是,在处理第一个查询的输出的未来处理程序中,我试图将数据插入到同一 Cassandra 实例下的另一个表中,但它正在失败。我从应用程序中收到一条错误消息,指出“cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])" 。我不确定我哪里出错了

import threading
from threading import Event
from cassandra.query import SimpleStatement
from cassandra.cluster import Cluster


hosts=['127.0.0.1']
keyspace="test"
thread_local = threading.local()
cluster_ = Cluster(hosts)
def get_session():
    if hasattr(thread_local, "cassandra_session"):
        print("got session from threadlocal")
        return thread_local.cassandra_session
    print(" Connecting to Cassandra Host " + str(hosts))
    session_ = cluster_.connect(keyspace)
    print(" Connecting and creating session to Cassandra KeySpace " + keyspace)
    thread_local.cassandra_session = session_
    return session_


class PagedResultHandler(object):

    def __init__(self, future):
        self.error = None
        self.finished_event = Event()
        self.future = future
        self.future.add_callbacks(
            callback=self.handle_page,
            errback=self.handle_error)

    def handle_page(self, rows):
        for row in rows:
            process_row(row)

        if self.future.has_more_pages:
            self.future.start_fetching_next_page()
        else:
            self.finished_event.set()

    def handle_error(self, exc):
        self.error = exc
        self.finished_event.set()

def process_row(row):
    print(row)
    session_ = get_session()
    stmt = session_.prepare(
        "INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?)")
    results = session_.execute(stmt,
                               [row.customer, row.snr, row.rttt,row.created_time])
    print("Done")

session = get_session()
query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
    raise handler.error
cluster_.shutdown()

但是,当我尝试执行 python 文件时,应用程序抛出错误“cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])"来自“process_row”方法的 getSession() 调用。显然,第一次调用 Cassandra 成功了,没有任何问题。没有连接问题,Cassandra 实例在本地运行良好。我可以使用 cqlsh 查询数据。如果我在未来处理程序之外调用 process_row 方法一切正常,我不确定需要做什么才能从未来处理程序发生。

Connecting to Cassandra Host ['127.0.0.1']
Connecting and creating session to Cassandra KeySpace test
Row(customer='abcd', snr=100, rttt=121, created_time=datetime.datetime(2020, 8, 8, 2, 26, 51))
 Connecting to Cassandra Host ['127.0.0.1']
Traceback (most recent call last):
  File "test/check.py", , in <module>
    raise handler.error
  File "cassandra/cluster.py", line 4579, in cassandra.cluster.ResponseFuture._set_result
  File "cassandra/cluster.py", line 4777, in cassandra.cluster.ResponseFuture._set_final_result
  File "test/check.py"",  in handle_page
    process_row(row)
  File "test/check.py"",  in process_row
    session_ = get_session()
  File "/test/check.py"", in get_session
    session_ = cluster_.connect(keyspace)
  File "cassandra/cluster.py", line 1715, in cassandra.cluster.Cluster.connect
  File "cassandra/cluster.py", line 1772, in cassandra.cluster.Cluster._new_session
  File "cassandra/cluster.py", line 2553, in cassandra.cluster.Session.__init__
cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])

Process finished with exit code 1

【问题讨论】:

标签: python python-3.x cassandra cassandra-3.0


【解决方案1】:

好的,Cassandra 推荐以下内容:

  • 每个键空间最多使用一个 Session,或使用一个 Session 并在查询中明确指定键空间

https://www.datastax.com/blog/4-simple-rules-when-using-datastax-drivers-cassandra

在您的代码中,您尝试在每次读取查询检索到一些行时创建一个会话。

为了强制代码最多使用一个会话,我们可以创建一个队列,其中子线程将行发送到主线程,主线程通过执行插入查询进一步处理它。我们在主线程中执行此操作,因为我在子线程中执行查询时遇到了问题。

callback_queue = Queue()
session = cluster_.connect(keyspace)
session.row_factory = dict_factory # because queue doesn't accept a Row instance


class PagedResultHandler(object):

    ...

    def handle_page(self, rows):
        for row in rows:
            callback_queue.put(row) # here we pass the row as a dict to the queue
        ...

def process_rows():
    while True:
        try:
            row = callback_queue.get() # here we retrieve the row as a dict from the child thread
            stmt = session.prepare(
                "INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?,?)")
            results = session.execute(stmt,
                                       [row['customer'], row['snr'], row['rttt'], row['created_time']])
            print("Done")
        except Empty:
            pass

query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
process_rows() # for now the code will hang here because we have an infinite loop in this function
handler.finished_event.wait()
if handler.error:
    raise handler.error
cluster_.shutdown()

这将使它工作,但我会替换 while True 否则你将进入无限循环。

【讨论】:

  • 我赞成并接受了您的回答。还授予了针对此问题的赏金。但是,我确实有一个相对较大的数据集,并且我确实希望并行完成这些插入以加快该过程。您给我的解决方案并不能帮助我以有效的方式将包含数百万条记录的完整表转储到另一个表中。你可以看看stackoverflow.com/questions/24785299/…。这将使您了解我为什么要为每个任务创建多个会话。
  • 我们如何在 python 中通过在代码中引入并行性来实现这一点?
【解决方案2】:

好的,在这种情况下,我们做两件事,我们可以使用多线程和批量插入。我认为如果我们不需要批量插入并行性,因为这将从客户端加快速度。多线程不会增加太多速度,因为它不是 CPU 密集型任务。

session = cluster_.connect(keyspace)
session.row_factory = dict_factory


class Fetcher:

    def __init__(self, session):
        self.session = session
        query = "select * from test.data_log"
        self.statement = SimpleStatement(query, fetch_size=1000)

    def run(self):
        rows = self.session.execute(self.statement)

        temp_rows = []
        total = 0
        for row in rows:
            temp_rows.append(row)
            if len(temp_rows) == 1000:
                handler = PagedResultHandler(self.session, temp_rows)
                handler.start()
                temp_rows = []

        handler = PagedResultHandler(self.session, temp_rows)
        handler.start()

    def handle_error(self, err=None):
        print(err)


class PagedResultHandler(threading.Thread):

    def __init__(self, session, rows):
        super().__init__()
        self.session = session
        self.error = None
        self.rows = rows
        self.finished_event = Event()

    def run(self):
        batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
        stmt = session.prepare("INSERT INTO test.data(id, customer,snr,rttt, event_time) VALUES (?,?,?,?,?)")
        for row in self.rows:
            batch.add(stmt, [1, row['customer'], row['snr'], row['rttt'], row['created_time']])
        results = session.execute(batch)
        print(results)


Fetcher(session).run()

这确实脚本同时执行批量插入和多线程,但再次多线程似乎是不必要的。

【讨论】:

  • 原来的尝试和这次有很大的不同。在这里,通过使 Fetcher 成为一个单独的线程,您不会获得任何东西。我同意批处理部分,如果您根据分区键对所有记录进行批处理,它会显着提高所提供的权限,盲批处理不会有太大帮助。同样对于行处理,您带来了并行性,它肯定会提高性能。我仍在试图弄清楚为什么我的原始方法不起作用,即使您与 process_row(row): 函数共享相同的会话,它似乎会抛出 "No Host available:
猜你喜欢
  • 2020-11-27
  • 2021-11-23
  • 2015-09-24
  • 2018-04-25
  • 2022-12-28
  • 2022-12-26
  • 2022-12-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多