【问题标题】:Why is my Kafka consumer much slower than my kafka producer?为什么我的 Kafka 消费者比我的 Kafka 生产者慢得多?
【发布时间】:2016-05-25 03:24:05
【问题描述】:

我得到了一个可以完全抓取的数据流。数据全部放入 Kafka,然后发送到 Cassandra。现在卡夫卡消费者非常慢,比生产者慢得多。我希望它们完全一样。我该怎么做才能达到这个结果或我的代码有什么问题?

这是我在 python 中的 Kafka 消费者代码:

import logging
from cassandra.cluster import Cluster
from kafka.consumer.kafka import KafkaConsumer
from kafka.consumer.multiprocess import MultiProcessConsumer
from kafka.client import KafkaClient
from kafka.producer.simple import SimpleProducer
import json
from datetime import datetime, timedelta  
from cassandra import ConsistencyLevel
from dateutil.parser import parse
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG)
class Whitelist(logging.Filter):
    def __init__(self, *whitelist):
        self.whitelist = [logging.Filter(name) for name in whitelist]
    def filter(self, record):
        return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
    handler.addFilter(Whitelist('consumer'))
log = logging.getLogger('consumer')
try:
    cluster = Cluster(['localhost']); session = cluster.connect(keyspace)
    kafka = KafkaClient('localhost')
    consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None)
    article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?")
    article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM
    article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
    article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)")
    article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)")
    schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)")
    axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)")
    while True:
        messages = consumer.get_messages(count=16)
        if len(messages) == 0:
            print 'IDLE'
            continue
        for message in messages:
            try:
                response = json.loads(message.value)
                data = json.loads(response['body'])
                print response['body']
                articles = data['articles']
                idlist = [r['id'] for r in articles]
                if len(idlist)>0:
                    article_rows = session.execute(article_lookup_stmt,[idlist])
                    rows = [r.id for r in article_rows]
                    for article in articles:
                        try:
                            if not article['id'] in rows:
                                article['created_at'] = parse(article['created_at'])
                                scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0)
                                session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre']))
                                session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id']))
                                session.execute(article_by_url_insert_stmt, (article['url'], article['id']))
                                session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id']))
                                log.debug('%s %s' % (article['id'],article['created_at']))
                            session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares']))
                        except Exception as e:
                            print 'error==============:',e
                            continue
            except Exception as e:
                print 'error is:',e
                log.exception(e.message)
except Exception as e:
    log.exception(e.message)

编辑:

我还添加了我的个人资料结果,代码行似乎很慢

    article_rows = session.execute(article_lookup_stmt,[idlist])

Sun Feb 14 16:01:01 2016    consumer.out

         395793 function calls (394232 primitive calls) in 23.074 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      141   10.695    0.076   10.695    0.076 {select.select}
     7564   10.144    0.001   10.144    0.001 {method 'acquire' of 'thread.lock' objects}
        1    0.542    0.542   23.097   23.097 consumer.py:5(<module>)
     1510    0.281    0.000    0.281    0.000 {method 'recv' of '_socket.socket' objects}
       38    0.195    0.005    0.195    0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode)
       13    0.078    0.006    0.078    0.006 {time.sleep}
     2423    0.073    0.000    0.137    0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__)
    22112    0.063    0.000    0.095    0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack)
        3    0.052    0.017    0.162    0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response)
2006/2005    0.047    0.000    0.055    0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan)
     1270    0.032    0.000    0.034    0.000 /usr/local/lib/python2.7/threading.py:259(__init__)
        3    0.024    0.008    0.226    0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics)
       33    0.024    0.001    0.031    0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple)
    15374    0.024    0.000    0.024    0.000 {built-in method new of type object at 0x788ee0}
      141    0.023    0.000   11.394    0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request)
      288    0.020    0.000    0.522    0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes)
     2423    0.018    0.000    0.029    0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller)
      115    0.018    0.000   11.372    0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages)
     2423    0.018    0.000    0.059    0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers)
    24548    0.017    0.000    0.017    0.000 {_struct.unpack}
44228/43959    0.016    0.000    0.016    0.000 {len}

感谢您的回复。

【问题讨论】:

  • 正如目前所说,您的问题缺少正确答案所需的详细信息。使用分析器找出脚本的哪些部分较慢,然后尝试重写这些部分以使其更快。有关详细信息,请参阅docs.python.org/2/library/profile.html
  • 我的脚本中慢的部分是在消息中的消息之后。
  • 您的消费者发出 5 个 cassandra 查询 - 没有迹象表明您的消费者做了什么,但似乎 5 个同步 CQL 查询可能比普通的生产者花费更长的时间。
  • @JeffJirsa 所以你建议做异步?我的消费者将数据写入 cassandra。谢谢
  • 异步更快,当然。我们可以看到你的消费者写信给 cassandra,你的生产者是做什么的?

标签: python apache-kafka kafka-consumer-api kafka-producer-api kafka-python


【解决方案1】:

您可以尝试在不保存到 C* 的情况下运行消费者,这样您就可以观察它有多大的不同。
如果事实证明保存到 C* 是一个瓶颈(我假设是这样),那么您可以拥有一个线程池(大于 16 个线程您的使用者产生),其唯一职责是写入 C*。

这样,您将卸载代码的缓慢部分,这将只在消费者代码中留下琐碎的部分。
您可以使用from multiprocessing import Pool
更多here

【讨论】:

  • 谢谢!如果我已经在这样做但它仍然很慢怎么办?你是对的,写信给 Cassandra 是瓶颈。但是不知道为什么在数据增加的时候会这么慢。
  • 如果您正在使用您发布的代码,那么您还没有这样做 :) 但是如果您这样做,并且您注意到无论线程池中的线程数如何,您仍然保持大致相同的吞吐量,那么您必须调整 C*。由于我们知道 Cassandra 最显着的特性是始终能够接受几乎无限量的数据(随着 horz.scale 的增加),这应该是可行的,您应该将此问题发布到 C* 标签。
  • 非常感谢。很抱歉有很多后续问题,我还有一个 :):如果我已经使用多进程消费者,为什么还需要使用多线程?谢谢
  • 这样你就可以从你的 Kafka 消费者独立地扩展对 C* 的写入,因为从 Kafka 读取时,你可以运行最多数量的消费者线程,因为你有分区(至少 Java 消费者客户端是这样的)作品)。
猜你喜欢
  • 2019-05-09
  • 2015-07-13
  • 2015-03-25
  • 2018-12-18
  • 1970-01-01
  • 2022-01-05
  • 2015-09-06
  • 2017-11-03
  • 2019-01-15
相关资源
最近更新 更多