【发布时间】:2017-01-31 05:22:33
【问题描述】:
我已经为 MongoDB 和 Cassandra 构建了一个导入器。基本上导入器的所有操作都是相同的,除了最后一部分形成数据以匹配所需的 cassandra 表模式和想要的 mongodb 文档结构。 Cassandra的写入性能和MongoDB相比真的很差,我觉得我做错了什么。
基本上,我的抽象导入器类加载数据,读取所有数据并将其传递给扩展的 MongoDBImporter 或 CassandraImporter 类以将数据发送到数据库。一次以一个数据库为目标——不能同时对 C* 和 MongoDB 进行“双重”插入。导入器在同一台机器上针对相同数量的节点 (6) 运行。
问题:
MongoDB 导入在 57 分钟后完成。我摄取了 10.000.000 个文档,我预计 Cassandra 的行数大致相同。我的 Cassandra 导入器现在已经运行了 2.5 小时,并且只插入了 5.000.000 行。我将等待进口商完成并在此处编辑实际完成时间。
我如何使用 Cassandra 导入:
我在摄取数据之前准备了两个语句一次。这两个语句都是 UPDATE 查询,因为有时我必须将数据附加到现有列表中。在开始导入之前,我的表已完全清除。准备好的语句被一遍又一遍地使用。
PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);
对于 每一 行,我创建一个 BoundStatement 并将该语句传递给我的“自定义”批处理方法:
BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
bs = bs.bind();
//add data... with several bs.setXXX(..) calls
cassandraConnection.executeBatch(bs);
使用 MongoDB,我可以一次插入 1000 个文档(这是最大值)而不会出现问题。对于 Cassandra,导入器在某些时候仅在我的 10 条语句中以 com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large 崩溃。我正在使用此代码来构建批次。顺便说一句,我之前从 1000、500、300、200、100、50、20 批量开始,但显然它们也不起作用。然后我将其设置为 10,它再次抛出异常。现在我不知道为什么它会破裂。
private static final int MAX_BATCH_SIZE = 10;
private Session session;
private BatchStatement currentBatch;
...
@Override
public ResultSet executeBatch(Statement statement) {
if (session == null) {
throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
}
if (currentBatch == null) {
currentBatch = new BatchStatement(Type.UNLOGGED);
}
currentBatch.add(statement);
if (currentBatch.size() == MAX_BATCH_SIZE) {
ResultSet result = session.execute(currentBatch);
currentBatch = new BatchStatement(Type.UNLOGGED);
return result;
}
return null;
}
我的 C* 架构如下所示
CREATE TYPE stream.event (
data_dbl frozen<map<text, double>>,
data_str frozen<map<text, text>>,
data_bool frozen<map<text, boolean>>,
);
CREATE TABLE stream.data (
log_creator text,
date text, //date of the timestamp
ts timestamp,
log_id text, //some id
hour int, //just the hour of the timestmap
x double,
y double,
events list<frozen<event>>,
PRIMARY KEY ((log_creator, date, hour), ts, log_id)
) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)
有时我需要向现有行添加更多新事件。这就是为什么我需要一个 UDT 列表。我的 UDT 包含三个映射,因为事件创建者产生不同的数据(字符串/双精度/布尔类型的键/值对)。我知道 UDT 已冻结,我无法触摸已摄取事件的映射。这对我来说很好,我有时只需要添加具有相同时间戳的新事件。我对日志的创建者(一些传感器名称)以及记录的日期(即“22-09-2016”)和时间戳的时间进行分区(以更多地分发数据,同时将相关数据保持在一起一个分区)。
我在 pom.xml 中使用 Cassandra 3.0.8 和 Datastax Java 驱动程序,版本 3.1.0。
根据What is the batch limit in Cassandra?,我不应该通过调整cassandra.yaml 中的batch_size_fail_threshold_in_kb 来增加批量大小。那么......我的导入有什么问题或有什么问题?
更新 所以我调整了我的代码以运行异步查询并将当前运行的插入存储在一个列表中。每当异步插入完成时,它将从列表中删除。当列表大小超过阈值并且之前的插入发生错误时,该方法将等待 500ms 直到插入低于阈值。当没有插入失败时,我的代码现在会自动增加阈值。
但是在流式传输 3.300.000 行之后,有 280.000 个插入正在处理,但没有发生错误。这似乎是当前处理的插入数量看起来太高了。 6 个 cassandra 节点在已有 2 年历史的商用硬件上运行。
这是并发插入的高数量(6 个节点为 280.000)有问题吗?我应该添加像MAX_CONCURRENT_INSERT_LIMIT 这样的变量吗?
private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...
@Override
public void executeBatch(Statement statement) throws InterruptedException {
if (this.runningInsertList == null) {
this.runningInsertList = new ArrayList<>();
}
//Sleep while the currently processing number of inserts is too high
while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
ResultSetFuture future = this.executeAsync(statement);
this.runningInsertList.add(future);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
runningInsertList.remove(future);
}
@Override
public void onFailure(Throwable t) {
concurrentInsertErrorOccured = true;
}
}, MoreExecutors.sameThreadExecutor());
if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
concurrentInsertLimit += 2000;
LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));
}
return;
}
【问题讨论】:
标签: java cassandra datastax-java-driver