【问题标题】:Cassandra Java API, DatastaxCassandra Java API,Datastax
【发布时间】:2019-04-03 04:55:12
【问题描述】:

是否有人使用 AsyncCassandraTemplate 进行具有自定义对象列表的批处理操作?

我需要使用相同的方法,但似乎不再支持传递可迭代对象。

【问题讨论】:

    标签: spring-boot spring-data datastax cassandra-3.0 datastax-java-driver


    【解决方案1】:

    您可以使用Datastax Driver 轻松实现这一目标。如果您使用 Maven,只需将其添加到您的 pom 文件中:

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.3.2</version>
    </dependency>
    
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-mapping</artifactId>
        <version>3.3.2</version>
    </dependency>
    
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-extras</artifactId>
        <version>3.3.2</version>
    </dependency>
    

    然后创建一个实体类:

    @Table(
        name = "message",
        keyspace = "test")
    
    public class Message {
        @PartitionKey
        @Column(name = "message_id")
        private String messageId;
    
        @ClusteringColumn
        private String date;
    
        private String title;
    
        public String getMessageId() {
            return messageId;
        }
    
        public void setMessageId(String messageId) {
            this.messageId = messageId;
        }
    
        public String getDate() {
            return date;
        }
    
        public void setDate(String date) {
            this.date = date;
        }
    
        public String getTitle() {
            return title;
        }
    
        public void setTitle(String title) {
            this.title = title;
        }
    }
    

    然后使用以下代码,您可以构建一个集群,然后启动一些对象,然后为其保存查询创建语句,然后将它们添加到要异步执行的批处理语句中

    public void executeBatchStatement() {
        Cluster cluster = makeCluster();
        Session session = cluster.connect();
    
        MappingManager mappingManager = new MappingManager(session);
        Mapper<Message> messageMapper = mappingManager.mapper(Message.class);
    
        Message messageObj1 = new Message();
        Message messageObj2 = new Message();
        Message messageObj3 = new Message();
        // populate these objects
    
        Statement messageStatement1 = messageMapper.saveQuery(messageObj1, Mapper.Option.saveNullFields(false)); // now this Statement represents the query to save this object
        Statement messageStatement2 = messageMapper.saveQuery(messageObj2, Mapper.Option.saveNullFields(false));
        Statement messageStatement3 = messageMapper.saveQuery(messageObj3, Mapper.Option.saveNullFields(false));
    
        BatchStatement messageBatchStatement = new BatchStatement();
        messageBatchStatement.add(messageStatement1);
        messageBatchStatement.add(messageStatement2);
        messageBatchStatement.add(messageStatement3);
    
        session.executeAsync(messageBatchStatement); // execute asynchronously
    }
    
    private Cluster makeCluster() {
        return Cluster.builder()
                .addContactPoint("localhost")
                .withPort(9042)
                .build();
    } 
    

    如果您想处理执行结果或在成功或失败时做某事,您也可以这样做

    ResultSetFuture future = session.executeAsync(messageBatchStatement);
    Futures.addCallback(future,
        new FutureCallback<ResultSet>() {
            @Override public void onSuccess(ResultSet result) {
                // handle success
            }
    
            @Override public void onFailure(Throwable t) {
                // handle error
            }
        }
    );
    

    【讨论】:

      猜你喜欢
      • 2018-01-18
      • 1970-01-01
      • 2015-01-25
      • 2018-01-29
      • 2016-01-19
      • 2014-06-11
      • 2018-02-15
      • 2020-10-20
      • 1970-01-01
      相关资源
      最近更新 更多