【发布时间】:2019-04-03 04:55:12
【问题描述】:
是否有人使用 AsyncCassandraTemplate 进行具有自定义对象列表的批处理操作?
我需要使用相同的方法,但似乎不再支持传递可迭代对象。
【问题讨论】:
标签: spring-boot spring-data datastax cassandra-3.0 datastax-java-driver
是否有人使用 AsyncCassandraTemplate 进行具有自定义对象列表的批处理操作?
我需要使用相同的方法,但似乎不再支持传递可迭代对象。
【问题讨论】:
标签: spring-boot spring-data datastax cassandra-3.0 datastax-java-driver
您可以使用Datastax Driver 轻松实现这一目标。如果您使用 Maven,只需将其添加到您的 pom 文件中:
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.3.2</version>
</dependency>
然后创建一个实体类:
@Table(
name = "message",
keyspace = "test")
public class Message {
@PartitionKey
@Column(name = "message_id")
private String messageId;
@ClusteringColumn
private String date;
private String title;
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
}
然后使用以下代码,您可以构建一个集群,然后启动一些对象,然后为其保存查询创建语句,然后将它们添加到要异步执行的批处理语句中
public void executeBatchStatement() {
Cluster cluster = makeCluster();
Session session = cluster.connect();
MappingManager mappingManager = new MappingManager(session);
Mapper<Message> messageMapper = mappingManager.mapper(Message.class);
Message messageObj1 = new Message();
Message messageObj2 = new Message();
Message messageObj3 = new Message();
// populate these objects
Statement messageStatement1 = messageMapper.saveQuery(messageObj1, Mapper.Option.saveNullFields(false)); // now this Statement represents the query to save this object
Statement messageStatement2 = messageMapper.saveQuery(messageObj2, Mapper.Option.saveNullFields(false));
Statement messageStatement3 = messageMapper.saveQuery(messageObj3, Mapper.Option.saveNullFields(false));
BatchStatement messageBatchStatement = new BatchStatement();
messageBatchStatement.add(messageStatement1);
messageBatchStatement.add(messageStatement2);
messageBatchStatement.add(messageStatement3);
session.executeAsync(messageBatchStatement); // execute asynchronously
}
private Cluster makeCluster() {
return Cluster.builder()
.addContactPoint("localhost")
.withPort(9042)
.build();
}
如果您想处理执行结果或在成功或失败时做某事,您也可以这样做
ResultSetFuture future = session.executeAsync(messageBatchStatement);
Futures.addCallback(future,
new FutureCallback<ResultSet>() {
@Override public void onSuccess(ResultSet result) {
// handle success
}
@Override public void onFailure(Throwable t) {
// handle error
}
}
);
【讨论】: