【问题标题】:How Cassandra handle blocking execute statement in datastax java driverCassandra如何处理datastax java驱动程序中的阻塞执行语句
【发布时间】:2016-04-29 04:56:37
【问题描述】:

阻止来自 com.datastax.driver.core.Session 的执行方法

public ResultSet execute(Statement statement);

对此方法的评论:

此方法会阻塞,直到至少收到一些结果 数据库。但是,对于 SELECT 查询,它不保证 结果已全部收到。但它确实保证了一些 已从数据库收到响应,特别是 保证如果请求无效,将抛出异常 用这种方法。

来自 com.datastax.driver.core.Session 的非阻塞执行方法

public ResultSetFuture executeAsync(Statement statement);

此方法不会阻塞。查询完成后立即返回 传递到底层网络堆栈。特别是,从 此方法不保证查询有效或什至 已提交到活动节点。与失败有关的任何异常 访问 {@link 时将抛出查询 结果集未来}。

我有 02 个关于它们的问题,如果你能帮助我理解它们,那就太好了。

假设我有 100 万条记录,我希望所有记录都到达数据库(没有任何丢失)。

问题 1: 如果我有 n 个线程,所有线程将具有相同数量的记录,它们需要发送到数据库。他们都继续使用阻塞执行调用向 cassandra 发送多个插入查询。如果我增加 n 的值,是否也有助于加快我需要将所有记录插入到 cassandra 的时间?

这会导致 cassandra 出现性能问题吗? Cassandra 是否必须确保对于每条插入记录,集群中的所有节点都应该立即知道新记录?为了保持数据的一致性。 (我假设 cassandra 节点甚至不会考虑使用本地机器时间来控制记录插入时间)。

问题 2: 使用非阻塞执行,如何确保所有插入都成功?我知道的唯一方法是等待 ResultSetFuture 检查插入查询的执行。我有什么更好的方法吗?非阻塞执行比阻塞执行更容易失败吗?

非常感谢您的帮助。

【问题讨论】:

    标签: cassandra datastax datastax-java-driver


    【解决方案1】:

    如果我有 n 个线程,所有线程将具有相同数量的记录,它们需要发送到数据库。他们都继续使用阻塞执行调用向 cassandra 发送多个插入查询。如果我增加 n 的值,是否也有助于加快我需要将所有记录插入到 cassandra 的时间?

    在某种程度上。让我们稍微分离一下客户端实现细节,并从“并发请求数”的角度来看问题,因为如果您使用 executeAsync,则不需要为每个正在进行的请求设置一个线程。在我的测试中,我发现虽然拥有大量并发请求有很多价值,但存在一个阈值,即收益递减或性能开始下降。我的一般经验法则是(number of Nodes *native_transport_max_threads (default: 128)* 2),但您可能会发现更多或更少的最佳结果。

    这里的想法是,将比 cassandra 一次处理的更多请求排队并没有太大价值。在减少飞行请求数量的同时,您还可以限制驱动程序客户端和 cassandra 之间连接的不必要拥塞。

    问题 2:使用非阻塞执行,如何确保所有插入都成功?我知道的唯一方法是等待 ResultSetFuture 检查插入查询的执行。我有什么更好的方法吗?非阻塞执行是否比阻塞执行更容易失败?

    通过get 等待 ResultSetFuture 是一种方法,但如果您正在开发一个完全异步的应用程序,您希望尽可能避免阻塞。使用番石榴,你最好的两把武器是Futures.addCallbackFutures.transform

    • Futures.addCallback 允许您注册在驱动程序收到响应时执行的FutureCallbackonSuccess 在成功的情况下被执行,onFailure 在其他情况下被执行。

    • Futures.transform 允许您有效地将返回的ResultSetFuture 映射到其他内容。例如,如果您只想要 1 列的值,则可以使用它将 ListenableFuture<ResultSet> 转换为 ListenableFuture<String>,而无需在 ResultSetFuture 上的代码中阻塞,然后获取字符串值。

      李>

    在编写数据加载程序的上下文中,您可以执行以下操作:

    1. 为简单起见,请使用Semaphore 或其他具有固定许可数的构造(这将是您的最大飞行请求数)。每当您使用executeAsync 提交查询时,都需要获得许可。您实际上应该只需要 1 个线程(但可能需要引入一个 # cpu cores size 的池来执行此操作)从 Semaphore 获取许可并执行查询。它只会阻止获取,直到有可用的许可。
    2. 使用Futures.addCallback 表示从executeAsync 返回的未来。在onSuccessonFailure 情况下,回调应该调用Sempahore.release()。通过释放许可,这应该允许您在第 1 步中的线程继续并提交下一个请求。

    为了进一步提高吞吐量,您可能需要考虑使用BatchStatement 并批量提交请求。如果您保持批次较小(50-250 是一个不错的数字)并且批次中的插入都共享相同的分区键,这是一个不错的选择。

    【讨论】:

    • nodes * native_transport_max_threads 位上我不卖。特别是,推理(将比 cassandra 一次处理的更多请求排队并没有太大价值)假设旅行时间是即时的/可以忽略不计。如果我的客户端和 cassandra 节点之间的单程行程时间为 100 毫秒,并且服务器可以在 2 毫秒内处理请求,我希望一次将大约 50 条线路放在网络上。这里的想法是,我现在上网的消息将在约 100 毫秒内到达,在此期间服务器可以处理约 50 条消息,我想让服务器保持忙碌,始终确保它正常工作
    【解决方案2】:

    除了上面的答案,

    看起来像 execute() 调用 executeAsync(statement).getUninterruptibly(),所以无论您是否使用 execute() 管理自己的“n 个线程池”并阻止自己直到执行完成最多 n 个正在运行的线程或使用 executeAsync () 在所有记录上,cassandra 端的性能应该大致相同,具体取决于执行时间/计数 + 超时。

    它们的执行都将运行从池中借用的连接,每次执行在客户端都有一个 streamId,并在此 streamId 的响应返回时通过未来通知您,受客户端每个连接的总请求数和总请求数限制通过在选择执行请求的每个节点上读取线程,任何更高的数字都将缓冲在受连接 maxQueueSize 和 maxRequestsPerConnection 限制的队列(未阻塞)中,任何高于此值的都将失败。这样做的美妙之处在于,executeAsync() 不会在每个请求/执行的新线程上运行。

    因此,必须限制通过execute() 或executeAsync() 可以运行的请求数量,在execute() 中您要避免超出这些限制。

    在性能方面,您将开始看到超出每个节点可以处理的惩罚,因此具有良好大小池的 execute() 对我来说是有意义的。更好的是,使用反应式架构来避免创建这么多除了等待什么都不做的线程,所以大量的线程会导致客户端浪费上下文切换。对于较少数量的请求,executeAsync() 会更好地避免使用线程池。

    DefaultResultSetFuture future = new DefaultResultSetFuture(..., makeRequestMessage(statement, null));
    new RequestHandler(this, future, statement).sendRequest();

    【讨论】:

      猜你喜欢
      • 2015-06-23
      • 2015-05-13
      • 2014-05-04
      • 1970-01-01
      • 2013-05-30
      • 2018-02-15
      • 2016-03-05
      • 2016-11-14
      • 1970-01-01
      相关资源
      最近更新 更多