【问题标题】:How to use Asynchronous/Batch writes feature with Datastax Java driver如何在 Datastax Java 驱动程序中使用异步/批量写入功能
【发布时间】:2013-10-12 17:25:49
【问题描述】:

我计划使用 Datastax Java 驱动程序写入 Cassandra。我主要对 Datastax Java 驱动程序的 Batch WritesAsycnhronous 功能感兴趣,但我无法获得任何可以解释我如何合并的教程这些功能在我下面的代码中使用 Datastax Java 驱动程序..

/**
 * Performs an upsert of the specified attributes for the specified id.
 */
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) {

    try {

        // make a sql here using the above input parameters.

        String sql = sqlPart1.toString()+sqlPart2.toString();

        DatastaxConnection.getInstance();
        PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql);
        prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        

        BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()]));

        DatastaxConnection.getSession().execute(query);

    } catch (InvalidQueryException e) {
        LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e);
    } catch (Exception e) {
        LOG.error("Exception in DatastaxClient::upsertAttributes "+e);
    }
}

在下面的代码中,我正在使用 Datastax Java 驱动程序创建与 Cassandra 节点的连接。

/**
 * Creating Cassandra connection using Datastax Java driver
 *
 */
private DatastaxConnection() {

    try{
        builder = Cluster.builder();
        builder.addContactPoint("some_nodes");

        builder.poolingOptions().setCoreConnectionsPerHost(
                HostDistance.LOCAL,
                builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL));

        cluster = builder
                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
                .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
                .build();

        StringBuilder s = new StringBuilder();
        Set<Host> allHosts = cluster.getMetadata().getAllHosts();
        for (Host h : allHosts) {
            s.append("[");
            s.append(h.getDatacenter());
            s.append(h.getRack());
            s.append(h.getAddress());
            s.append("]");
        }
        System.out.println("Cassandra Cluster: " + s.toString());

        session = cluster.connect("testdatastaxks");

    } catch (NoHostAvailableException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    } catch (Exception e) {

    }
}

任何人都可以帮助我如何将批量写入或异步功能添加到我的上述代码中。谢谢您的帮助..

我正在运行 Cassandra 1.2.9

【问题讨论】:

    标签: java cassandra datastax-java-driver


    【解决方案1】:

    对于异步,它就像使用 executeAsync 函数一样简单:

    ...
    DatastaxConnection.getSession().executeAsync(query);
    

    对于批处理,您需要构建查询(我使用字符串,因为编译器非常清楚如何优化字符串连接):

    String cql =  "BEGIN BATCH "
           cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
           cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
           cql += "APPLY BATCH; "
    
    DatastaxConnection.getInstance();
    PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql);
    prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        
    
    // this is where you need to be careful
    // bind expects a comma separated list of values for all the params (?) above
    // so for the above batch we need to supply 4 params:                     
    BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2");
    
    DatastaxConnection.getSession().execute(query);
    

    附带说明一下,我认为您的语句绑定可能看起来像这样,假设您将属性更改为映射列表,其中每个映射表示批处理中的更新/插入:

    BoundStatement query = prepStatement.bind(userId,
                                              attributesList.get(0).values().toArray(new Object[attributes.size()]), 
                                              userId_2,
                                              attributesList.get(1).values().toArray(new Object[attributes.size()])); 
    

    【讨论】:

    • 有没有办法用命名参数做到这一点?
    • @Highstead 什么编程语言?以上是java so (sort of no)
    • 我专注于 python,但我认为如果有一种方法可以做到这一点,那么另一种方法也能做到。旧的 cql 驱动程序支持它,但已被弃用。所以我正在寻找替换功能。
    • @Highstead Python = 是命名参数,example here 使用较新的 python DataStax 驱动程序。
    • 是服务器端还是客户端?我倾向于用 %(p_name)s 语法猜测客户端。
    【解决方案2】:

    对于 Lyuben 的答案中提供的示例,使用字符串设置批处理的某些属性(如Type.COUNTER(如果您需要更新计数器)将不起作用。相反,您可以像这样批量安排准备好的语句:

    final String insertQuery = "INSERT INTO test.prepared (id, col_1) VALUES (?,?);";
    final PreparedStatement prepared = session.prepare(insertQuery);
    
    final BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
    batch.add(prepared.bind(userId1, "something"));
    batch.add(prepared.bind(userId2, "another"));
    batch.add(prepared.bind(userId3, "thing"));
    
    session.executeAsync(batch);
    

    【讨论】:

    • 我比接受的答案更喜欢这个。此处批处理的内容可以是动态的(相对于已接受答案中的固定 CQL 和参数数量)
    • 我认为这是错误的代码(截至 2019 年)。 BatchStatement 是不可变的。你需要批处理 = batch.add(...
    猜你喜欢
    • 2014-12-03
    • 2017-04-23
    • 2020-11-14
    • 1970-01-01
    • 2016-01-22
    • 1970-01-01
    • 2015-01-04
    • 1970-01-01
    • 2013-10-19
    相关资源
    最近更新 更多