【问题标题】:Multi Threading with datastax java driver 2.0使用 datastax java 驱动程序 2.0 的多线程
【发布时间】:2014-09-19 07:22:17
【问题描述】:

我的数据模型基于时间序列(在 cassandra CF 中插入来自各种来源的提要。)任何人都可以建议如何在多线程中进行插入。?是否使用类似于多线程的 executeAsync 方法执行查询?我需要设置 cassandra.yaml 的任何属性以实现多线程吗?或任何其他先决条件。

【问题讨论】:

    标签: java multithreading datastax datastax-java-driver


    【解决方案1】:

    executeAsync() 为语句的执行创建一个单独的线程并立即将控制权返回给调用者——Future<ResultSet> 将得到你的结果。使用这种方法时,直到您进入 Future 之后,您才会知道是否发生了任何异常。

    在 Cassandra 中,您无需进行任何设置。只需控制应用程序中的线程数,并正确初始化 Java 驱动程序,以提供符合您需求的 PoolingOptions 对象。

    HTH,卡罗

    【讨论】:

    • 感谢您的回复,在我的过程中,我收到了来自 AWS SQS 的多条消息并准备插入语句,然后执行它,然后从 SQS 中删除消息。由于 SQS 有 messageReceiptHandler 可以用来识别消息,但现在在 Future 的情况下,我们如何识别确认?
    • 不知道 SQS,我不确定我是否理解所有内容,但解决方案可能是避免 executeAsync() 并实现您自己的 Callable (或任何您需要的,消息处理程序等.),在 call() 方法中执行查询,然后返回消息,以便执行所需的操作(或者实现一个 Runnable 并在 run() 中执行所有操作)
    【解决方案2】:

    驱动程序对于多线程使用是安全的。您通常会做的是构建您的Cluster 并在应用程序启动期间获取Session 实例,然后在所有线程之间共享Session

    如何处理多线程取决于您的代码。我也不知道 SQS,但我想你要么有多个消费者从队列中轮询并自己处理消息,要么可能将消息分派到一个工作池。

    关于executeAsync,返回的ResultSetFuture实现了Guava的ListenableFuture,所以可以用addListener注册成功回调。但是您必须提供一个Executor 来运行该回调(我不推荐Javadoc 中提到的MoreExecutors#sameThreadExecutor,因为您的回调最终会在驱动程序的一个I/O 线程上运行)。

    正如 Carlo 所提到的,一种简单的方法是使用同步 execute,并让您的工作人员阻塞,直到它从 Cassandra 获得响应,然后确认该消息。

    【讨论】:

      【解决方案3】:

      如果您在多线程环境中执行查询,请确保等待 executeAsync(statement) 完成, session.executeAsync(statement) 会立即返回,不保证查询是否有效或提交成功。因此,如果您使用的是线程池,那么请始终使用

      ResultSetFuture future = session.executeAsync(statement);
      future.getUninterruptibly();
      

      这会等待查询提交,不会消耗内存。

      【讨论】:

        猜你喜欢
        • 2016-11-29
        • 2016-11-29
        • 2016-05-30
        • 1970-01-01
        • 2018-02-15
        • 2013-10-19
        • 2015-05-13
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多