【问题标题】:Cassandra Bulk-Write performance with Java Driver is atrocious compared to MongoDB与 MongoDB 相比,使用 Java 驱动程序的 Cassandra Bulk-Write 性能非常糟糕
【发布时间】:2017-01-31 05:22:33
【问题描述】:

我已经为 MongoDB 和 Cassandra 构建了一个导入器。基本上导入器的所有操作都是相同的,除了最后一部分形成数据以匹配所需的 cassandra 表模式和想要的 mongodb 文档结构。 Cassandra的写入性能和MongoDB相比真的很差,我觉得我做错了什么。

基本上,我的抽象导入器类加载数据,读取所有数据并将其传递给扩展的 MongoDBImporter 或 CassandraImporter 类以将数据发送到数据库。一次以一个数据库为目标——不能同时对 C* 和 MongoDB 进行“双重”插入。导入器在同一台机器上针对相同数量的节点 (6) 运行。


问题:

MongoDB 导入在 57 分钟后完成。我摄取了 10.000.000 个文档,我预计 Cassandra 的行数大致相同。我的 Cassandra 导入器现在已经运行了 2.5 小时,并且只插入了 5.000.000 行。我将等待进口商完成并在此处编辑实际完成时间。


我如何使用 Cassandra 导入:

我在摄取数据之前准备了两个语句一次。这两个语句都是 UPDATE 查询,因为有时我必须将数据附加到现有列表中。在开始导入之前,我的表已完全清除。准备好的语句被一遍又一遍地使用。

PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);

对于 每一 行,我创建一个 BoundStatement 并将该语句传递给我的“自定义”批处理方法:

    BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
    bs = bs.bind();

    //add data... with several bs.setXXX(..) calls

    cassandraConnection.executeBatch(bs);

使用 MongoDB,我可以一次插入 1000 个文档(这是最大值)而不会出现问题。对于 Cassandra,导入器在某些时候仅在我的 10 条语句中以 com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large 崩溃。我正在使用此代码来构建批次。顺便说一句,我之前从 1000、500、300、200、100、50、20 批量开始,但显然它们也不起作用。然后我将其设置为 10,它再次抛出异常。现在我不知道为什么它会破裂。

private static final int MAX_BATCH_SIZE = 10;

private Session session;
private BatchStatement currentBatch;

...

@Override
public ResultSet executeBatch(Statement statement) {
    if (session == null) {
        throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
    }

    if (currentBatch == null) {
        currentBatch = new BatchStatement(Type.UNLOGGED);
    }

    currentBatch.add(statement);
    if (currentBatch.size() == MAX_BATCH_SIZE) {
        ResultSet result = session.execute(currentBatch);
        currentBatch = new BatchStatement(Type.UNLOGGED);
        return result;
    }

    return null;
}

我的 C* 架构如下所示

CREATE TYPE stream.event (
    data_dbl frozen<map<text, double>>,
    data_str frozen<map<text, text>>,
    data_bool frozen<map<text, boolean>>,
);

CREATE TABLE stream.data (
    log_creator text,
    date text, //date of the timestamp
    ts timestamp,
    log_id text, //some id
    hour int, //just the hour of the timestmap
    x double,
    y double,
    events list<frozen<event>>,
    PRIMARY KEY ((log_creator, date, hour), ts, log_id)
) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)

有时我需要向现有行添加更多新事件。这就是为什么我需要一个 UDT 列表。我的 UDT 包含三个映射,因为事件创建者产生不同的数据(字符串/双精度/布尔类型的键/值对)。我知道 UDT 已冻结,我无法触摸已摄取事件的映射。这对我来说很好,我有时只需要添加具有相同时间戳的新事件。我对日志的创建者(一些传感器名称)以及记录的日期(即“22-09-2016”)和时间戳的时间进行分区(以更多地分发数据,同时将相关数据保持在一起一个分区)。


我在 pom.xml 中使用 Cassandra 3.0.8 和 Datastax Java 驱动程序,版本 3.1.0。 根据What is the batch limit in Cassandra?,我不应该通过调整cassandra.yaml 中的batch_size_fail_threshold_in_kb 来增加批量大小。那么......我的导入有什么问题或有什么问题?


更新 所以我调整了我的代码以运行异步查询并将当前运行的插入存储在一个列表中。每当异步插入完成时,它将从列表中删除。当列表大小超过阈值并且之前的插入发生错误时,该方法将等待 500ms 直到插入低于阈值。当没有插入失败时,我的代码现在会自动增加阈值。

但是在流式传输 3.300.000 行之后,有 280.000 个插入正在处理,但没有发生错误。这似乎是当前处理的插入数量看起来太高了。 6 个 cassandra 节点在已有 2 年历史的商用硬件上运行。

这是并发插入的高数量(6 个节点为 280.000)有问题吗?我应该添加像MAX_CONCURRENT_INSERT_LIMIT 这样的变量吗?

private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...

@Override
public void executeBatch(Statement statement) throws InterruptedException {
    if (this.runningInsertList == null) {
        this.runningInsertList = new ArrayList<>();
    }

    //Sleep while the currently processing number of inserts is too high
    while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
        Thread.sleep(concurrentInsertSleepTime);
    }

    ResultSetFuture future = this.executeAsync(statement);
    this.runningInsertList.add(future);

    Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
            runningInsertList.remove(future);
        }

        @Override
        public void onFailure(Throwable t) {
            concurrentInsertErrorOccured = true;
        }
    }, MoreExecutors.sameThreadExecutor());

    if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
        concurrentInsertLimit += 2000;
        LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));
    }

    return;
}

【问题讨论】:

    标签: java cassandra datastax-java-driver


    【解决方案1】:

    当您在 Cassandra 中运行批处理时,它会选择一个节点作为协调器。然后,该节点负责确保批量写入找到其适当的节点。因此(例如)通过将 10000 次写入批处理在一起,您现在已经为一个节点分配了协调 10000 次写入的工作,其中大部分将用于不同的节点。这样做很容易使节点翻倒,或者消除整个集群的延迟。因此,限制批量大小的原因。

    问题在于 Cassandra CQL BATCH 用词不当,它并没有像您或其他任何人认为的那样做。它不能用于提高性能。并行、异步写入总是比运行相同数量的 BATCHed 语句快。

    我知道我可以轻松地将 10.000 行批处理在一起,因为它们将进入同一个分区。 ...您还会使用单行插入(异步)而不是批处理吗?

    这取决于写入性能是否是您的真正目标。如果是这样,那么我仍然会坚持并行、异步写入。

    有关这方面的更多信息,请查看 DataStax 的 Ryan Svihla 的这两篇博客文章:

    Cassandra: Batch loading without the Batch keyword

    Cassandra: Batch Loading Without the Batch — The Nuanced Edition

    【讨论】:

    • 我知道我可以轻松地将 10.000 行批处理在一起,因为它们将进入同一个分区。我正在写一次我的数据,之后只会将它们读出来。对现有行的更新将在几周内完成一次,但它们可能会发生。我不担心插入热点,因为读取性能对我来说是关键,并且数据只会被写入一次 - 并且只有在没有人查询现有数据时才会写入。你还会使用单行插入(异步)而不是批处理吗?
    【解决方案2】:

    在使用了一段时间 C* 之后,我确信您真的应该只使用批处理来保持多个表同步。如果您不需要该功能,则根本不要使用批处理,因为您招致性能损失。

    将数据加载到 C* 中的正确方法是使用异步写入,如果您的集群无法跟上摄取速率,则可以使用可选的背压。您应该将“自定义”批处理方法替换为:

    • 执行异步写入
    • 控制您有多少飞行中写入
    • 在写入超时时执行一些重试。

    要执行异步写入,请使用 .executeAsync 方法,该方法将返回一个 ResultSetFuture 对象。

    为了控制飞行中查询的数量,只需收集从列表中的.executeAsync 方法检索到的ResultSetFuture 对象,如果列表获取(此处为大致值)说 1k 个元素,然后等待所有元素完成在发出更多写入之前。或者您可以等待第一个完成后再发出另一个写入,以保持列表完整。

    最后,您可以在等待操作完成时检查写入失败。在这种情况下,您可以:

    1. 使用相同的超时值再次写入
    2. 使用增加的超时值再次写入
    3. 等待一段时间,然后使用相同的超时值再次写入
    4. 等待一段时间,然后使用增加的超时值再次写入

    从 1 到 4 你有一个增加的背压强度。选择最适合您的情况。


    问题更新后编辑

    你的插入逻辑对我来说似乎有点坏了:

    1. 我没有看到任何重试逻辑
    2. 如果失败,您不会删除列表中的项目
    3. 您的while (concurrentInsertErrorOccured &amp;&amp; runningInsertList.size() &gt; concurrentInsertLimit) 是错误的,因为只有在发出的查询数> concurrentInsertLimit 时您才会休眠,并且因为2。您的线程只会停在那里。
    4. 你永远不会设置为 false concurrentInsertErrorOccured

    我通常会保留一个(失败的)查询列表,以便以后重试。这给了我对查询的强大控制权,当失败的查询开始累积时,我会睡一会儿,然后继续重试它们(最多 X 次,然后硬失败......)。

    这个列表应该是非常动态的,例如,当查询失败时您可以在其中添加项目,并在您执行重试时删除项目。现在您可以了解集群的限制,并根据例如最后一秒内失败查询的平均数调整您的concurrentInsertLimit,或者坚持使用更简单的方法“如果我们在重试列表中有项目,则暂停”等等……


    在 cmets 之后编辑 2

    由于您不想要任何重试逻辑,我会这样更改您的代码:

    private List<ResultSetFuture> runningInsertList;
    private static int concurrentInsertLimit = 1000;
    private static int concurrentInsertSleepTime = 500;
    ...
    
    @Override
    public void executeBatch(Statement statement) throws InterruptedException {
        if (this.runningInsertList == null) {
            this.runningInsertList = new ArrayList<>();
        }
    
        ResultSetFuture future = this.executeAsync(statement);
        this.runningInsertList.add(future);
    
        Futures.addCallback(future, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(ResultSet result) {
                runningInsertList.remove(future);
            }
    
            @Override
            public void onFailure(Throwable t) {
                runningInsertList.remove(future);
                concurrentInsertErrorOccured = true;
            }
        }, MoreExecutors.sameThreadExecutor());
    
        //Sleep while the currently processing number of inserts is too high
        while (runningInsertList.size() >= concurrentInsertLimit) {
            Thread.sleep(concurrentInsertSleepTime);
        }
    
        if (!concurrentInsertErrorOccured) {
            // Increase your ingestion rate if no query failed so far
            concurrentInsertLimit += 10;
        } else {
            // Decrease your ingestion rate because at least one query failed
            concurrentInsertErrorOccured = false;
            concurrentInsertLimit = Max(1, concurrentInsertLimit - 50);
            while (runningInsertList.size() >= concurrentInsertLimit) {
                Thread.sleep(concurrentInsertSleepTime);
            }
        }
    
        return;
    }
    

    您还可以通过将 List&lt;ResultSetFuture&gt; 替换为计数器来优化该过程。

    希望对您有所帮助。

    【讨论】:

    • 这是个好主意,肯定会实现该功能。当我知道很多行将被插入到同一个分区(因此是同一个节点)时,你有什么建议我做的吗?我正在建立读取性能,因此将相关行放在一起可能对我来说是关键。当我的数据的某个部分当时会导致太大的背压时,我是否应该开始拆分我的数据并继续处理数据的不同部分。类似(伪代码):“如果 resultSetFutureList.size > 1000 然后跳过接下来的 50000 行并继续”?
    • Java 驱动程序是 token 感知的,这意味着它将连接到 right 协调器(即负责 that 分区)执行写入(除非您使用批处理,因为在这种情况下,协调器是根据批处理的第一条语句产生的令牌选择的)。所以你不必担心那件事。继续推送您的数据。如果您从未遇到if resultSetFutureList.size &gt; 1000 的写入超时,则将限制提高到 2k。执行此操作,直到您在此处找到 您的 幻数。在我的情况下,它是...由系统本身自动调整...32k IIRC
    • 所以基本上我设置了一个List&lt;ResultSetFuture&gt; 变量并在我的插入方法中将每个ResultSetFuture 添加到该列表中。我正在使用 Guavas Futures.addCallbackonSuccess 方法从列表中删除相应的 ResultSetFuture。 onFailure 会将布尔值设置为 true,表示发生了插入错误。在我的插入方法中,有一个 while 循环while (errorOccured &amp;&amp; list.size() &gt; insertLimit),线程将在其中休眠 500 毫秒。在流式传输 3.300.000 行之后,处理了 280.000 个插入,但没有发生错误。我应该担心吗?
    • 很难说... 280k 有点高... 你有 NASA 集群吗?您应该使用更新的代码更新您的问题,然后我们会检查是否有问题......
    • 您更新的答案是正确的。我正在逐一构建异步插入的逻辑,这意味着当前没有重试逻辑,这是有意的。我不明白的一件事是:为什么没有查询对我来说失败?在我的代码中,当插入失败并出现异常时,我添加了对记录器的调用。这从未发生过,即使在 300.000 次并发插入时也是如此。您说您的用例的最佳价值是 32k 并发插入。您是否已经看到插入被该标记丢弃?
    猜你喜欢
    • 2015-05-11
    • 1970-01-01
    • 2017-05-27
    • 2011-04-26
    • 2013-08-16
    • 1970-01-01
    • 1970-01-01
    • 2011-12-19
    • 1970-01-01
    相关资源
    最近更新 更多