【发布时间】:2016-05-25 03:24:05
【问题描述】:
我得到了一个可以完全抓取的数据流。数据全部放入 Kafka,然后发送到 Cassandra。现在卡夫卡消费者非常慢,比生产者慢得多。我希望它们完全一样。我该怎么做才能达到这个结果或我的代码有什么问题?
这是我在 python 中的 Kafka 消费者代码:
import logging
from cassandra.cluster import Cluster
from kafka.consumer.kafka import KafkaConsumer
from kafka.consumer.multiprocess import MultiProcessConsumer
from kafka.client import KafkaClient
from kafka.producer.simple import SimpleProducer
import json
from datetime import datetime, timedelta
from cassandra import ConsistencyLevel
from dateutil.parser import parse
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG)
class Whitelist(logging.Filter):
def __init__(self, *whitelist):
self.whitelist = [logging.Filter(name) for name in whitelist]
def filter(self, record):
return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
handler.addFilter(Whitelist('consumer'))
log = logging.getLogger('consumer')
try:
cluster = Cluster(['localhost']); session = cluster.connect(keyspace)
kafka = KafkaClient('localhost')
consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None)
article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?")
article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM
article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)")
article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)")
schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)")
axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)")
while True:
messages = consumer.get_messages(count=16)
if len(messages) == 0:
print 'IDLE'
continue
for message in messages:
try:
response = json.loads(message.value)
data = json.loads(response['body'])
print response['body']
articles = data['articles']
idlist = [r['id'] for r in articles]
if len(idlist)>0:
article_rows = session.execute(article_lookup_stmt,[idlist])
rows = [r.id for r in article_rows]
for article in articles:
try:
if not article['id'] in rows:
article['created_at'] = parse(article['created_at'])
scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0)
session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre']))
session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id']))
session.execute(article_by_url_insert_stmt, (article['url'], article['id']))
session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id']))
log.debug('%s %s' % (article['id'],article['created_at']))
session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares']))
except Exception as e:
print 'error==============:',e
continue
except Exception as e:
print 'error is:',e
log.exception(e.message)
except Exception as e:
log.exception(e.message)
编辑:
我还添加了我的个人资料结果,代码行似乎很慢
article_rows = session.execute(article_lookup_stmt,[idlist])
Sun Feb 14 16:01:01 2016 consumer.out
395793 function calls (394232 primitive calls) in 23.074 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
141 10.695 0.076 10.695 0.076 {select.select}
7564 10.144 0.001 10.144 0.001 {method 'acquire' of 'thread.lock' objects}
1 0.542 0.542 23.097 23.097 consumer.py:5(<module>)
1510 0.281 0.000 0.281 0.000 {method 'recv' of '_socket.socket' objects}
38 0.195 0.005 0.195 0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode)
13 0.078 0.006 0.078 0.006 {time.sleep}
2423 0.073 0.000 0.137 0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__)
22112 0.063 0.000 0.095 0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack)
3 0.052 0.017 0.162 0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response)
2006/2005 0.047 0.000 0.055 0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan)
1270 0.032 0.000 0.034 0.000 /usr/local/lib/python2.7/threading.py:259(__init__)
3 0.024 0.008 0.226 0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics)
33 0.024 0.001 0.031 0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple)
15374 0.024 0.000 0.024 0.000 {built-in method new of type object at 0x788ee0}
141 0.023 0.000 11.394 0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request)
288 0.020 0.000 0.522 0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes)
2423 0.018 0.000 0.029 0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller)
115 0.018 0.000 11.372 0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages)
2423 0.018 0.000 0.059 0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers)
24548 0.017 0.000 0.017 0.000 {_struct.unpack}
44228/43959 0.016 0.000 0.016 0.000 {len}
感谢您的回复。
【问题讨论】:
-
正如目前所说,您的问题缺少正确答案所需的详细信息。使用分析器找出脚本的哪些部分较慢,然后尝试重写这些部分以使其更快。有关详细信息,请参阅docs.python.org/2/library/profile.html。
-
我的脚本中慢的部分是在消息中的消息之后。
-
您的消费者发出 5 个 cassandra 查询 - 没有迹象表明您的消费者做了什么,但似乎 5 个同步 CQL 查询可能比普通的生产者花费更长的时间。
-
@JeffJirsa 所以你建议做异步?我的消费者将数据写入 cassandra。谢谢
-
异步更快,当然。我们可以看到你的消费者写信给 cassandra,你的生产者是做什么的?
标签: python apache-kafka kafka-consumer-api kafka-producer-api kafka-python