【发布时间】:2014-09-19 07:22:17
【问题描述】:
我的数据模型基于时间序列(在 cassandra CF 中插入来自各种来源的提要。)任何人都可以建议如何在多线程中进行插入。?是否使用类似于多线程的 executeAsync 方法执行查询?我需要设置 cassandra.yaml 的任何属性以实现多线程吗?或任何其他先决条件。
【问题讨论】:
标签: java multithreading datastax datastax-java-driver
我的数据模型基于时间序列(在 cassandra CF 中插入来自各种来源的提要。)任何人都可以建议如何在多线程中进行插入。?是否使用类似于多线程的 executeAsync 方法执行查询?我需要设置 cassandra.yaml 的任何属性以实现多线程吗?或任何其他先决条件。
【问题讨论】:
标签: java multithreading datastax datastax-java-driver
executeAsync() 为语句的执行创建一个单独的线程并立即将控制权返回给调用者——Future<ResultSet> 将得到你的结果。使用这种方法时,直到您进入 Future 之后,您才会知道是否发生了任何异常。
在 Cassandra 中,您无需进行任何设置。只需控制应用程序中的线程数,并正确初始化 Java 驱动程序,以提供符合您需求的 PoolingOptions 对象。
HTH,卡罗
【讨论】:
驱动程序对于多线程使用是安全的。您通常会做的是构建您的Cluster 并在应用程序启动期间获取Session 实例,然后在所有线程之间共享Session。
如何处理多线程取决于您的代码。我也不知道 SQS,但我想你要么有多个消费者从队列中轮询并自己处理消息,要么可能将消息分派到一个工作池。
关于executeAsync,返回的ResultSetFuture实现了Guava的ListenableFuture,所以可以用addListener注册成功回调。但是您必须提供一个Executor 来运行该回调(我不推荐Javadoc 中提到的MoreExecutors#sameThreadExecutor,因为您的回调最终会在驱动程序的一个I/O 线程上运行)。
正如 Carlo 所提到的,一种简单的方法是使用同步 execute,并让您的工作人员阻塞,直到它从 Cassandra 获得响应,然后确认该消息。
【讨论】:
如果您在多线程环境中执行查询,请确保等待 executeAsync(statement) 完成, session.executeAsync(statement) 会立即返回,不保证查询是否有效或提交成功。因此,如果您使用的是线程池,那么请始终使用
ResultSetFuture future = session.executeAsync(statement);
future.getUninterruptibly();
这会等待查询提交,不会消耗内存。
【讨论】: