【发布时间】:2016-05-02 12:54:11
【问题描述】:
我正在尝试将大量数据(约 4100 万行)从 SQL 数据库迁移到 Cassandra。我之前使用一半的数据集进行了试运行,一切都按预期运行。
问题是,现在我正在尝试完全迁移,Cassandra 正在引发持续的超时错误。例如:
[INFO] [talledLocalContainer] com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:10112 (com.datastax.driver.core.exceptions.DriverException: Timed out waiting for server response))
[INFO] [talledLocalContainer] at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
[INFO] [talledLocalContainer] at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
[INFO] [talledLocalContainer] at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205)
[INFO] [talledLocalContainer] at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
[INFO] [talledLocalContainer] at com.mycompany.tasks.CassandraMigrationTask.execute(CassandraMigrationTask.java:164)
[INFO] [talledLocalContainer] at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
[INFO] [talledLocalContainer] at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
[INFO] [talledLocalContainer] Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:10112 (com.datastax.driver.core.exceptions.DriverException: Timed out waiting for server response))
[INFO] [talledLocalContainer] at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:108)
[INFO] [talledLocalContainer] at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:179)
[INFO] [talledLocalContainer] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[INFO] [talledLocalContainer] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[INFO] [talledLocalContainer] at java.lang.Thread.run(Thread.java:745)
我尝试增加cassandra.yaml 中的超时值,这增加了迁移在超时之前能够运行的时间(大致与超时的增加成正比)。
在更改超时设置之前,我的堆栈跟踪看起来更像:
[INFO] [talledLocalContainer] com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)
[INFO] [talledLocalContainer] at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:54)
[INFO] [talledLocalContainer] at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
[INFO] [talledLocalContainer] at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205)
[INFO] [talledLocalContainer] at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
[INFO] [talledLocalContainer] at com.mycompany.tasks.CassandraMigrationTask.execute(CassandraMigrationTask.java:164)
[INFO] [talledLocalContainer] at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
[INFO] [talledLocalContainer] at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
[INFO] [talledLocalContainer] Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)
[INFO] [talledLocalContainer] at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:54)
[INFO] [talledLocalContainer] at com.datastax.driver.core.Responses$Error.asException(Responses.java:99)
[INFO] [talledLocalContainer] at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140)
[INFO] [talledLocalContainer] at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:249)
[INFO] [talledLocalContainer] at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:433)
[INFO] [talledLocalContainer] at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:697)
[INFO] [talledLocalContainer] at com.datastax.shaded.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
[INFO] [talledLocalContainer] at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[INFO] [talledLocalContainer] at com.datastax.shaded.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
[INFO] [talledLocalContainer] at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:296)
[INFO] [talledLocalContainer] at com.datastax.shaded.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
我的超时设置目前是:
# How long the coordinator should wait for read operations to complete
read_request_timeout_in_ms: 30000
# How long the coordinator should wait for seq or index scans to complete
range_request_timeout_in_ms: 30000
# How long the coordinator should wait for writes to complete
write_request_timeout_in_ms: 30000
# How long the coordinator should wait for counter writes to complete
counter_write_request_timeout_in_ms: 30000
# How long a coordinator should continue to retry a CAS operation
# that contends with other proposals for the same row
cas_contention_timeout_in_ms: 1000
# How long the coordinator should wait for truncates to complete
# (This can be much longer, because unless auto_snapshot is disabled
# we need to flush first so we can snapshot before removing the data.)
truncate_request_timeout_in_ms: 60000
# The default timeout for other, miscellaneous operations
request_timeout_in_ms: 20000
...这让我在超时发生之前插入了大约 150 万行。原来的超时设置是:
# How long the coordinator should wait for read operations to complete
read_request_timeout_in_ms: 5000
# How long the coordinator should wait for seq or index scans to complete
range_request_timeout_in_ms: 10000
# How long the coordinator should wait for writes to complete
write_request_timeout_in_ms: 2000
# How long the coordinator should wait for counter writes to complete
counter_write_request_timeout_in_ms: 5000
# How long a coordinator should continue to retry a CAS operation
# that contends with other proposals for the same row
cas_contention_timeout_in_ms: 1000
# How long the coordinator should wait for truncates to complete
# (This can be much longer, because unless auto_snapshot is disabled
# we need to flush first so we can snapshot before removing the data.)
truncate_request_timeout_in_ms: 60000
# The default timeout for other, miscellaneous operations
request_timeout_in_ms: 10000
...这导致超时大约每 300,000 行发生一次。
在我成功运行和现在之间发生的唯一重大变化是我向 Cassandra 部署添加了第二个节点。所以直观地说,我认为这个问题与从第一个节点到第二个节点的数据传播有关(例如,<some process> 与插入的数据量呈线性关系,当有只有一个节点)。但我没有看到任何可能对配置/缓解此有用的明显选项。
如果相关,我会在迁移期间使用批处理语句,通常每批最多使用 100 到 200 条语句/行。
我的密钥空间最初设置为WITH REPLICATION =
{ 'class' : 'SimpleStrategy', 'replication_factor' : 2 },但我将其更改为WITH REPLICATION =
{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 },看看这是否会有所不同。它没有。
我还尝试在所有插入语句(以及封闭的批处理语句)上显式设置ConsistencyLevel.ANY。那也没什么区别。
Cassandra 在任一节点上的日志似乎都没有什么有趣的东西,尽管第一个节点肯定比第二个节点显示更多的“操作”:
第一个节点 - 454317 次操作
INFO [SlabPoolCleaner] 2016-01-25 19:46:08,806 ColumnFamilyStore.java:905 - Enqueuing flush of assetproperties_flat: 148265302 (14%) on-heap, 0 (0%) off-heap
INFO [MemtableFlushWriter:15] 2016-01-25 19:46:08,807 Memtable.java:347 - Writing Memtable-assetproperties_flat@350387072(20.557MiB serialized bytes, 454317 ops, 14%/0% of on/off-heap limit)
INFO [MemtableFlushWriter:15] 2016-01-25 19:46:09,393 Memtable.java:382 - Completed flushing /var/cassandra/data/itb/assetproperties_flat-e83359a0c34411e593abdda945619e28/itb-assetproperties_flat-tmp-ka-32-Data.db (5.249MiB) for commitlog position ReplayPosition(segmentId=1453767930194, position=15188257)
第二个节点 - 2020 年运营
INFO [BatchlogTasks:1] 2016-01-25 19:46:33,961 ColumnFamilyStore.java:905 - Enqueuing flush of batchlog: 4923957 (0%) on-heap, 0 (0%) off-heap
INFO [MemtableFlushWriter:22] 2016-01-25 19:46:33,962 Memtable.java:347 - Writing Memtable-batchlog@796821497(4.453MiB serialized bytes, 2020 ops, 0%/0% of on/off-heap limit)
INFO [MemtableFlushWriter:22] 2016-01-25 19:46:33,963 Memtable.java:393 - Completed flushing /var/cassandra/data/system/batchlog-0290003c977e397cac3efdfdc01d626b/system-batchlog-tmp-ka-11-Data.db; nothing needed to be retained. Commitlog position was ReplayPosition(segmentId=1453767955411, position=18567563)
有没有人遇到过类似的问题,如果有,解决方法是什么?
是否建议让第二个节点脱机,仅使用第一个节点运行迁移,然后运行nodetool repair 以使第二个节点恢复同步?
编辑
对来自 cmets 的问题的回答:
-
我正在使用 datastax Java 驱动程序,并且有一个服务器端任务 (Quartz job),它使用 ORM 层(休眠)来查找要迁移的下一个数据块,将其写入 Cassandra,然后清除它来自 SQL 数据库。我正在使用以下代码连接到 Cassandra:
public static Session getCassandraSession(String keyspace) { Session session = clusterSessions.get(keyspace); if (session != null && ! session.isClosed()) { //can use the cached session return session; } //create a new session for the specified keyspace Cluster cassandraCluster = getCluster(); session = cassandraCluster.connect(keyspace); //cache and return the session clusterSessions.put(keyspace, session); return session; } private static Cluster getCluster() { if (cluster != null && ! cluster.isClosed()) { //can use the cached cluster return cluster; } //configure socket options SocketOptions options = new SocketOptions(); options.setConnectTimeoutMillis(30000); options.setReadTimeoutMillis(300000); options.setTcpNoDelay(true); //spin up a fresh connection cluster = Cluster.builder().addContactPoint(Configuration.getCassandraHost()).withPort(Configuration.getCassandraPort()) .withCredentials(Configuration.getCassandraUser(), Configuration.getCassandraPass()).withSocketOptions(options).build(); //log the cluster details for confirmation Metadata metadata = cluster.getMetadata(); LOG.debug("Connected to Cassandra cluster: " + metadata.getClusterName()); for ( Host host : metadata.getAllHosts() ) { LOG.debug("Datacenter: " + host.getDatacenter() + "; Host: " + host.getAddress() + "; Rack: " + host.getRack()); } return cluster; }SocketOptions部分是最近添加的,因为最近的超时错误听起来像是来自 Java/客户端,而不是来自 Cassandra 本身。 每批插入不超过 200 条记录。典型值接近 100。
-
两个节点的规格相同:
- Intel(R) Xeon(R) CPU E3-1230 V2 @ 3.30GHz
- 32GB 内存
- 256GB SSD(主)、2TB HDD(备份),均采用 RAID-1 配置
-
第一个节点:
Pool Name Active Pending Completed Blocked All time blocked CounterMutationStage 0 0 0 0 0 ReadStage 0 0 58155 0 0 RequestResponseStage 0 0 655104 0 0 MutationStage 0 0 259151 0 0 ReadRepairStage 0 0 0 0 0 GossipStage 0 0 58041 0 0 CacheCleanupExecutor 0 0 0 0 0 AntiEntropyStage 0 0 0 0 0 MigrationStage 0 0 0 0 0 Sampler 0 0 0 0 0 ValidationExecutor 0 0 0 0 0 CommitLogArchiver 0 0 0 0 0 MiscStage 0 0 0 0 0 MemtableFlushWriter 0 0 80 0 0 MemtableReclaimMemory 0 0 80 0 0 PendingRangeCalculator 0 0 3 0 0 MemtablePostFlush 0 0 418 0 0 CompactionExecutor 0 0 8979 0 0 InternalResponseStage 0 0 0 0 0 HintedHandoff 0 0 2 0 0 Native-Transport-Requests 1 0 1175338 0 0 Message type Dropped RANGE_SLICE 0 READ_REPAIR 0 PAGED_RANGE 0 BINARY 0 READ 0 MUTATION 0 _TRACE 0 REQUEST_RESPONSE 0 COUNTER_MUTATION 0第二个节点:
Pool Name Active Pending Completed Blocked All time blocked CounterMutationStage 0 0 0 0 0 ReadStage 0 0 55803 0 0 RequestResponseStage 0 0 1 0 0 MutationStage 0 0 733828 0 0 ReadRepairStage 0 0 0 0 0 GossipStage 0 0 56623 0 0 CacheCleanupExecutor 0 0 0 0 0 AntiEntropyStage 0 0 0 0 0 MigrationStage 0 0 0 0 0 Sampler 0 0 0 0 0 ValidationExecutor 0 0 0 0 0 CommitLogArchiver 0 0 0 0 0 MiscStage 0 0 0 0 0 MemtableFlushWriter 0 0 394 0 0 MemtableReclaimMemory 0 0 394 0 0 PendingRangeCalculator 0 0 2 0 0 MemtablePostFlush 0 0 428 0 0 CompactionExecutor 0 0 8883 0 0 InternalResponseStage 0 0 0 0 0 HintedHandoff 0 0 1 0 0 Native-Transport-Requests 0 0 70 0 0 Message type Dropped RANGE_SLICE 0 READ_REPAIR 0 PAGED_RANGE 0 BINARY 0 READ 0 MUTATION 0 _TRACE 0 REQUEST_RESPONSE 0 COUNTER_MUTATION 0 -
nodetool ring的输出很长。这是nodetool status:Datacenter: DC1 =============== Status=Up/Down |/ State=Normal/Leaving/Joining/Moving -- Address Load Tokens Owns Host ID Rack UN 204.11.xxx.1 754.66 MB 1024 ? 8cf373d8-0b3e-4fd3-9e63-fdcdd8ce8cd4 RAC1 UN 208.66.xxx.2 767.78 MB 1024 ? 42e1f336-84cb-4260-84df-92566961a220 RAC2 -
我将 Cassandra 的所有超时值增加了 10 倍,并将 Java 驱动程序的读取超时设置设置为匹配,现在我最多可以插入
8m29.4m没有问题。从理论上讲,如果问题与超时值成线性关系,我应该可以在大约 15m 插入之前保持良好状态(这至少足够好,我不需要不断地照看迁移过程以等待每个新错误)。
【问题讨论】:
-
几个问题可以帮助我们找到正确的方向 - 1. 您如何迁移数据?您使用的是 JDBC 连接吗?加载文件? 2.您提到“(以及封闭的批处理语句)” - 每个批处理插入有多大(多少条记录)? 3. 你在什么类型的硬件上运行这个?您使用的是什么类型的硬盘? 4. 最后,nodetool tpstats 的输出也会有所帮助。
-
对不起,还有一个有用的东西是 nodetool ring 来了解令牌分配,因为你的第一个节点做了很多操作。要么节点 1 拥有大部分令牌范围,要么您在该节点上有一些非常大的分区,这会使这成为数据模型问题。
-
@MarcintheCloud - 好的,更新了请求的信息。
-
你能告诉我们你的客户插入数据的源代码吗?您是否使用准备好的语句?您使用的是同步调用还是异步调用?等等...
-
您好 Aroth,感谢您回答所有问题。困难的是,除了我确实注意到可能导致问题的事情之外,您似乎做的一切都是正确的。我注意到它说您正在使用 1024 个令牌。你在 cassandra.yaml 中设置了 num_tokens: 1024 吗?这是非常高的。你最想要的是 256,即便如此,它也被认为有点高。增加超时通常不是解决办法,而是隐藏问题,通常是暂时的。