【问题标题】:Async not working when using pykafka and asyncio使用 pykafka 和 asyncio 时异步不起作用
【发布时间】:2018-07-04 10:10:14
【问题描述】:

我尝试使用异步调用多个 pykafka 消费者函数。但是,第一个 pykafka 消费者函数会阻止另一个函数工作。

QueueConsumer 库:

import json
from pykafka import KafkaClient
import configparser

import asyncio


class QueueConsumer(object):

    def __init__(self):
        config = configparser.ConfigParser()
        config.read('config.ini')
        self.config = config

    async def test(self):
        defaultTopic = 'test'
        client = KafkaClient(hosts=self.config['kafka']['host'])
        topic = client.topics[defaultTopic.encode('utf-8')]
        consumer = topic.get_simple_consumer()
        # msg = next(consumer)
        for message in consumer:
            print(defaultTopic+' '+message.value.decode("utf-8"))

    async def coba(self):
        defaultTopic = 'coba'
        client = KafkaClient(hosts=self.config['kafka']['host'])
        topic = client.topics[defaultTopic.encode('utf-8')]
        consumer = topic.get_simple_consumer()
        # msg = next(consumer)
        for message in consumer:
            print(defaultTopic+' '+message.value.decode("utf-8"))

然后我使用以下方法调用这些函数:

import asyncio
queueConsumer = QueueConsumer()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
    queueConsumer.test(),
    queueConsumer.coba(),
))
loop.close()

结果只会返回来自主题'test'的队列消息。

编辑: 我尝试添加另一个功能

async def factorial(self, name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        await asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

然后像这样调用:

    queueConsumer.test(),
queueConsumer.coba(),
queueConsumer.factorial('a',3),
queueConsumer.factorial('b',5),
queueConsumer.factorial('c',7),

执行阶乘函数的一些打印。但是,当调用 test 或 coba 的 print 时,它只会停止其他的。

【问题讨论】:

  • 代码不会通过将async 放在def 之前简单地实现异步。
  • @KlausD。我尝试添加阶乘函数。有用。但是当调用使用 pykafka 的函数时。然后它停止
  • 目前我正在放弃 python 并改用 golang。它运作良好。但是,我仍在等待答案。 :D

标签: python asynchronous apache-kafka pykafka


【解决方案1】:

SimpleConsumer.consume 是一个阻塞调用,因此您需要调整代码以定期轮询新消息,同时放弃轮询之间的控制以让其他异步操作接管。实现这一点的一种方法是使用 KafkaClient 上的 use_greenlets=True kwarg,依靠 gevent 来处理多个异步操作之间的控制流。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-04-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多