【问题标题】:Why is my data insertion in my Cassandra database sometimes stable and sometimes slow?为什么我在 Cassandra 数据库中的数据插入有时稳定有时缓慢?
【发布时间】:2016-01-13 17:48:44
【问题描述】:

如果当前数据 ID 在 Cassandra 数据库中存在或不存在,这是我的查询:

row = session.execute("SELECT * FROM articles where id = %s", [id]) 

在Kafka中解析消息,然后判断这条消息在Cassandra数据库中是否存在,如果不存在,则应该进行插入操作,如果存在则不应该插入到数据中。

messages = consumer.get_messages(count=25)

if len(messages) == 0:
    print 'IDLE'
    sleep(1)
    continue

for message in messages:
    try:
        message = json.loads(message.message.value)
        data = message['data']
        if data:
            for article in data:
                source = article['source']
                id = article['id']
                title = article['title']
                thumbnail = article['thumbnail']
                #url = article['url']
                text = article['text']
                print article['created_at'],type(article['created_at'])
                created_at = parse(article['created_at'])
                last_crawled = article['last_crawled']
                channel = article['channel']#userid
                category = article['category']
                #scheduled_for = created_at.replace(minute=created_at.minute + 5, second=0, microsecond=0)
                scheduled_for=(datetime.utcnow() + timedelta(minutes=5)).replace(second=0, microsecond=0)
                row = session.execute("SELECT * FROM articles where id = %s", [id])
                if len(list(row))==0:
                #id parse base62
                    ids = [id[0:2],id[2:9],id[9:16]]
                    idstr=''
                    for argv in ids:
                        num = int(argv)
                        idstr=idstr+encode(num)
                    url='http://weibo.com/%s/%s?type=comment' % (channel,idstr)
                    session.execute("INSERT INTO articles(source, id, title,thumbnail, url, text, created_at, last_crawled,channel,category) VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s)", (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))
                    session.execute("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (%s, %s, %s,%s) USING TTL 86400", (source,'article', scheduled_for, id))
                    log.info('%s %s %s %s %s %s %s %s %s %s' % (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))

    except Exception, e:
        log.exception(e)
        #log.info('error %s %s' % (message['url'],body))
        print e
        continue

我有一个 ID,它只有一个唯一的表行,我想成为这样。一旦我为唯一 ID 添加了不同的 schedule_for 时间,我的系统就会崩溃。添加这个if len(list(row))==0: 是正确的想法,但之后我的系统非常慢。

这是我的表格描述:

DROP TABLE IF EXISTS schedules;

CREATE TABLE schedules (
 source text,
 type text,
 scheduled_for timestamp,
 id text,
 PRIMARY KEY (source, type, scheduled_for, id)
);

这个 schedule_for 是可变的。这里也是一个具体的例子:

Hao article 2016-01-12 02:09:00+0800 3930462206848285
Hao article 2016-01-12 03:09:00+0801 3930462206848285
Hao article 2016-01-12 04:09:00+0802 3930462206848285
Hao article 2016-01-12 05:09:00+0803 3930462206848285

这是我的文章 CQL 架构:

CREATE TABLE crawler.articles (
    source text,
    created_at timestamp,
    id text,
    category text,
    channel text,
    last_crawled timestamp,
    text text,
    thumbnail text,
    title text,
    url text,
    PRIMARY KEY (source, created_at, id)
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 604800
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';

CREATE INDEX articles_id_idx ON crawler.articles (id);
CREATE INDEX articles_url_idx ON crawler.articles (url);

【问题讨论】:

  • 您能提供文章的表架构吗?这样我们就可以看到主键和所有细节。
  • 我在上面的帖子中添加了文章方案。感谢您的回复!

标签: python cassandra scrapy apache-kafka


【解决方案1】:

查看您的 SCHEMA 以及您使用它的方式,我可以假设 ID 字段上的二级索引会产生问题并减慢查询速度。您可以查看更多详细信息,为什么二级索引在许多地方都不好,只需用谷歌搜索它(source 是一个好的开始,DataStax documentation page 也是一个好的开始)。基本上,当您在 5 节点集群中使用二级索引时,您必须点击每个节点来查找您要查找的项目,并且当使用主键时,每个节点都知道哪个节点保存数据。

如果您使用具有高基数的数据(添加更多项目时性能下降)并且您使用每篇文章不同的 ID,则二级索引尤其糟糕。当您使用低基数时它们是可以的.

我建议再创建一个表,article_by_id,这将是您文章表的反向索引。您可以使用Lightweight Transaction 并首先对该表执行INSERT ... IF NOT EXISTS,如果操作返回true(意味着插入已通过,因此以前不存在记录)您可以对您的articles 表执行常规插入,如果它返回@ 987654329@(表示数据没有插入,因为它已经存在)你可以跳过 INSERT 到articles 表。

这是表格(我建议使用 UUID 而不是文本作为 ID,但我根据您的文章表格创建了表格):

CREATE TABLE article_by_id (
    id text,
    source text,
    created_at timestamp,
    PRIMARY KEY (id)
) WITH comment = 'Article by id.';

这样,您始终可以仅根据 ID 找到密钥的所有部分。如果 ID 是您从该表中选择的输入参数,将为您提供 source 和 created_at。

这里是插入查询,它将返回真或假:

INSERT INTO article_by_id(id, source, created_at) VALUES (%s,%s, %s) IF NOT EXISTS; 

还有更多提示,如果您可以根据实体中的一些不可更改的数据找到密钥,那么您不需要第二个表。例如,如果 source 和 created_at 在您的系统中唯一标识文章并且永远不会更改,您可以删除 id 并使用您的原始表。

【讨论】:

  • 很好的答案,谢谢。您认为文章的最佳主键是什么?
  • 您可以将其保留在文章表中(PK 源,然后通过 created_at 和最后的 id 进行聚类以确保唯一性)。该表非常适合范围查询,您可以将来自多个源的数据分布在多个分区中,以防止长行和热数据。我只是建议使用带有 ID 的附加表以更容易找到它。否则,如果每篇文章的 URL 是唯一的,您可能会完全省略 ID,并使用 URL 而不是 ID。然后将所有位置的 ID 更改为 URL。
  • 另外一个问题也许你可以帮忙:如果我们有下面的表结构,我如何通过“source = 'abc' and created_at >= '2016-01-01 00:00:00'查询“?现在这是一个问题,因为 cassandra 不允许查询非索引字段。 CREATE TABLE 文章(id 文本、源文本、created_at 时间戳、类别文本、频道文本、last_crawled 时间戳、文本文本、缩略图文本、标题文本、url 文本、PRIMARY KEY (id))
  • 使用两张表,保持原样并添加另一张表,该表仅用于检查您的文章是否存在。 Cassandra 中的数据重复是常见的事情,也是正确的做法。
  • 这是一个单一用途的表,您只能通过 id 获取文章,对于其他查询,您可以使用现有的表,您可以在其中按源查询、为范围安排的类型等。检查免费课程可能是个好主意关于数据建模和事情会很清楚(academy.datastax.com/courses/ds220-data-modeling)或将其移至另一个仅与建模相关的问题,因为我们在 cmets 中的讨论已经太长了
猜你喜欢
  • 2018-02-26
  • 2019-05-03
  • 1970-01-01
  • 1970-01-01
  • 2013-06-22
  • 1970-01-01
  • 1970-01-01
  • 2017-09-19
  • 2019-11-23
相关资源
最近更新 更多