【发布时间】:2016-01-13 17:48:44
【问题描述】:
如果当前数据 ID 在 Cassandra 数据库中存在或不存在,这是我的查询:
row = session.execute("SELECT * FROM articles where id = %s", [id])
在Kafka中解析消息,然后判断这条消息在Cassandra数据库中是否存在,如果不存在,则应该进行插入操作,如果存在则不应该插入到数据中。
messages = consumer.get_messages(count=25)
if len(messages) == 0:
print 'IDLE'
sleep(1)
continue
for message in messages:
try:
message = json.loads(message.message.value)
data = message['data']
if data:
for article in data:
source = article['source']
id = article['id']
title = article['title']
thumbnail = article['thumbnail']
#url = article['url']
text = article['text']
print article['created_at'],type(article['created_at'])
created_at = parse(article['created_at'])
last_crawled = article['last_crawled']
channel = article['channel']#userid
category = article['category']
#scheduled_for = created_at.replace(minute=created_at.minute + 5, second=0, microsecond=0)
scheduled_for=(datetime.utcnow() + timedelta(minutes=5)).replace(second=0, microsecond=0)
row = session.execute("SELECT * FROM articles where id = %s", [id])
if len(list(row))==0:
#id parse base62
ids = [id[0:2],id[2:9],id[9:16]]
idstr=''
for argv in ids:
num = int(argv)
idstr=idstr+encode(num)
url='http://weibo.com/%s/%s?type=comment' % (channel,idstr)
session.execute("INSERT INTO articles(source, id, title,thumbnail, url, text, created_at, last_crawled,channel,category) VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s)", (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))
session.execute("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (%s, %s, %s,%s) USING TTL 86400", (source,'article', scheduled_for, id))
log.info('%s %s %s %s %s %s %s %s %s %s' % (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))
except Exception, e:
log.exception(e)
#log.info('error %s %s' % (message['url'],body))
print e
continue
我有一个 ID,它只有一个唯一的表行,我想成为这样。一旦我为唯一 ID 添加了不同的 schedule_for 时间,我的系统就会崩溃。添加这个if len(list(row))==0: 是正确的想法,但之后我的系统非常慢。
这是我的表格描述:
DROP TABLE IF EXISTS schedules;
CREATE TABLE schedules (
source text,
type text,
scheduled_for timestamp,
id text,
PRIMARY KEY (source, type, scheduled_for, id)
);
这个 schedule_for 是可变的。这里也是一个具体的例子:
Hao article 2016-01-12 02:09:00+0800 3930462206848285
Hao article 2016-01-12 03:09:00+0801 3930462206848285
Hao article 2016-01-12 04:09:00+0802 3930462206848285
Hao article 2016-01-12 05:09:00+0803 3930462206848285
这是我的文章 CQL 架构:
CREATE TABLE crawler.articles (
source text,
created_at timestamp,
id text,
category text,
channel text,
last_crawled timestamp,
text text,
thumbnail text,
title text,
url text,
PRIMARY KEY (source, created_at, id)
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 604800
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
CREATE INDEX articles_id_idx ON crawler.articles (id);
CREATE INDEX articles_url_idx ON crawler.articles (url);
【问题讨论】:
-
您能提供文章的表架构吗?这样我们就可以看到主键和所有细节。
-
我在上面的帖子中添加了文章方案。感谢您的回复!
标签: python cassandra scrapy apache-kafka