【问题标题】:Scrapy with non-blocking RabbitMQ Twisted protocolScrapy 使用非阻塞 RabbitMQ Twisted 协议
【发布时间】:2021-07-24 09:08:57
【问题描述】:

我正在尝试解决 Scrapy 的要求,即使用 start_urls 数组或 start_requests 生成器中的 URL 来喂蜘蛛。相反,我想从由 RabbigMQ 的 Twisted 协议在接收到特定交换的消息时触发的回调中踢出下载器(以及后续的下载器中间件)。

接下来,我有一个消费者和生产者应用程序的简单示例。生产者应用程序是一个 Python 程序,可将 URL 发送到 RabbitMQ 交换。消费者是一个非阻塞的 Twisted 协议,它只是将它们打印到标准输出。

这是示例消费者 Twisted 协议。它在交换报价上监听路由键 url:

import pika
from pika import spec
from pika.adapters import twisted_connection
from pika.exchange_type import ExchangeType
from twisted.internet import reactor, protocol, defer
from twisted.internet.defer import inlineCallbacks
from twisted.python import log

PREFETCH_COUNT = 2
EXCHANGE = "quotes"
ROUTING_KEY = "urls"


class PikaProtocol(twisted_connection.TwistedProtocolConnection):
    connected = False
    name = "AMQP:Protocol"

    def __init__(self, factory, parameters):
        super().__init__(parameters)
        self.factory = factory

    @inlineCallbacks
    def connectionReady(self):
        self._channel = yield self.channel()
        yield self._channel.basic_qos(prefetch_count=PREFETCH_COUNT)
        self.connected = True
        yield self._channel.confirm_delivery()
        for (
                exchange,
                routing_key,
                callback,
        ) in self.factory.read_list:
            yield self.setup_read(exchange, routing_key, callback)

        self.send()

    @inlineCallbacks
    def read(self, exchange, routing_key, callback):
        """Add an exchange to the list of exchanges to read from."""
        if self.connected:
            yield self.setup_read(exchange, routing_key, callback)

    @inlineCallbacks
    def setup_read(self, exchange, routing_key, callback):
        """This function does the work to read from an exchange."""
        if exchange:
            yield self._channel.exchange_declare(
                exchange=exchange,
                exchange_type=ExchangeType.topic,
                durable=True,
                auto_delete=False,
            )

        yield self._channel.queue_declare(queue=routing_key, durable=True)
        if exchange:
            yield self._channel.queue_bind(queue=routing_key, exchange=exchange)
            yield self._channel.queue_bind(
                queue=routing_key, exchange=exchange, routing_key=routing_key
            )

        (
            queue,
            _consumer_tag,
        ) = yield self._channel.basic_consume(queue=routing_key, auto_ack=False)
        d = queue.get()
        d.addCallback(self._read_item, queue, callback)
        d.addErrback(self._read_item_err)

    def _read_item(self, item, queue, callback):
        """Callback function which is called when an item is read."""
        d = queue.get()
        d.addCallback(self._read_item, queue, callback)
        d.addErrback(self._read_item_err)
        (
            channel,
            deliver,
            _props,
            msg,
        ) = item

        log.msg(
            "%s (%s): %s" % (deliver.exchange, deliver.routing_key, repr(msg)),
            system="Pika:<=",
        )
        d = defer.maybeDeferred(callback, item)
        d.addCallbacks(
            lambda _: channel.basic_ack(deliver.delivery_tag),
            lambda _: channel.basic_nack(deliver.delivery_tag),
        )

    @staticmethod
    def _read_item_err(error):
        print(error)

    def send(self):
        """If connected, send all waiting messages."""
        if self.connected:
            while self.factory.queued_messages:
                (
                    exchange,
                    r_key,
                    message,
                ) = self.factory.queued_messages.pop(0)
                self.send_message(exchange, r_key, message)

    @inlineCallbacks
    def send_message(self, exchange, routing_key, msg):
        """Send a single message."""
        log.msg("%s (%s): %s" % (exchange, routing_key, repr(msg)), system="Pika:=>")
        yield self._channel.exchange_declare(
            exchange=exchange,
            exchange_type=ExchangeType.topic,
            durable=True,
            auto_delete=False,
        )
        prop = spec.BasicProperties(delivery_mode=2)
        try:
            yield self._channel.basic_publish(
                exchange=exchange, routing_key=routing_key, body=msg, properties=prop
            )
        except Exception as error:  # pylint: disable=W0703
            log.msg("Error while sending message: %s" % error, system=self.name)


class PikaFactory(protocol.ReconnectingClientFactory):
    name = "AMQP:Factory"

    def __init__(self, parameters):
        self.parameters = parameters
        self.client = None
        self.queued_messages = []
        self.read_list = []

    def startedConnecting(self, connector):
        log.msg("Started to connect.", system=self.name)

    def buildProtocol(self, addr):
        self.resetDelay()
        log.msg("Connected", system=self.name)
        self.client = PikaProtocol(self, self.parameters)
        return self.client

    def clientConnectionLost(self, connector, reason):  # pylint: disable=W0221
        log.msg("Lost connection.  Reason: %s" % reason.value, system=self.name)
        protocol.ReconnectingClientFactory.clientConnectionLost(self, connector, reason)

    def clientConnectionFailed(self, connector, reason):
        log.msg("Connection failed. Reason: %s" % reason.value, system=self.name)
        protocol.ReconnectingClientFactory.clientConnectionFailed(
            self, connector, reason
        )

    def send_message(self, exchange=None, routing_key=None, message=None):
        self.queued_messages.append((exchange, routing_key, message))
        if self.client is not None:
            self.client.send()

    def read_messages(self, exchange, routing_key, callback):
        """Configure an exchange to be read from."""
        self.read_list.append((exchange, routing_key, callback))
        if self.client is not None:
            self.client.read(exchange, routing_key, callback)



def main():
    parameters = pika.ConnectionParameters(
        host="localhost",
        virtual_host="/",
        credentials=pika.PlainCredentials("guest", "guest"),
    )

    def callback(tup):
        url = tup[-1].decode('utf-8')
        print(f'Received {url}')

    f = PikaFactory(parameters)
    f.read_messages(EXCHANGE, ROUTING_KEY, callback)

    reactor.connectTCP(parameters._host, parameters._port, f)

    reactor.run()
    

if __name__ == "__main__":
    main()

这是您将 URL 提供给它的方式(并看到它们打印在标准输出上):

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.basic_publish(exchange='quotes', routing_key='urls', body='http://quotes.toscrape.com/page/1/')
print(" [x] Sent url to quotes exchange")
connection.close()

如何运行引号蜘蛛下载器(而不是简单地将 URL 打印到标准输出)?

谢谢!

【问题讨论】:

    标签: python scrapy rabbitmq twisted


    【解决方案1】:

    它可能对你有帮助,我努力了,我终于设法让我的蜘蛛通过 TwistedAdapter 消耗 rabbitmq

    class TwistedConsumer(scrapy.Spider):
    """ Twisted compatible RabbitMQ Consumer
    """
    
    start_urls = []
    
    def start_consuming(self):
    
        self.channel = None
        self.exchange = None
        self.queue = None
        self.queue_object = None
        self.consumer_tag = None
    
        
        self.user = get_project_settings().get('RABBITMQ_USER') 
        self.password = get_project_settings().get('RABBITMQ_PASSWORD')
        self.host = get_project_settings().get('RABBITMQ_HOST')
        self.port = get_project_settings().get('RABBITMQ_PORT')
        self.virtual_host = get_project_settings().get('RABBITMQ_VIRTUAL_HOST')
        self.queue = get_project_settings().get('RABBITMQ_QUEUE_NAME')
        self.exchange = get_project_settings().get('RABBITMQ_EXCHANGE')
        self.routing_key = get_project_settings().get('RABBITMQ_ROUTING_KEY')
        
        credentials = pika.PlainCredentials(self.user, self.password)
        parameters = pika.ConnectionParameters(self.host,
                                                self.port,
                                                self.virtual_host,
                                                credentials,
                                                heartbeat=10)
        
    
        # a deffered of a connection
        cc = protocol.ClientCreator(
            reactor,
            twisted_connection.TwistedProtocolConnection,
            parameters
        )
        
        
        self.deferedConnection = cc.connectTCP(self.host, self.port)
        
        
        self.deferedConnection.addCallback(lambda protocol: protocol.ready)
        
        
        self.deferedConnection.addCallback(self.run)
        
        
        self.deferedConnection.addCallback(self.populate_start_urls)
        
        self.crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
        self.crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)
        
    
    def populate_start_urls(self):
        self.start_urls = [self.queue_object.get()]
        
        
    def spider_idle(self):
        """ Waits for request to be scheduled.
        :return: None
        """
        raise DontCloseSpider
    
    
    def item_scraped(self, *args, **kwargs):
        self.logger.info(f"Item scraped")
        
    
    
    @defer.inlineCallbacks
    def run(self, connection):
        """Receives the connection as parameter and then consumes from the queue
        periodically.
        """
        self.channel = yield connection.channel()
    
        yield self.channel.exchange_declare(exchange=self.exchange,
                                                auto_delete=True,
                                                exchange_type="direct",
                                                durable=True)
    
        yield self.channel.queue_declare(queue=self.queue,
                                            durable=True,
                                            arguments={
                                                ... # put whatever arg you may need
                                            })
    
        self.channel.queue_bind(exchange=self.exchange,
                                routing_key=self.routing_key,
                                queue=self.queue)
    
        yield self.channel.basic_qos(prefetch_count=10)
    
        self.queue_object, self.consumer_tag = yield self.channel.basic_consume(
                                                                queue=self.queue,
                                                                auto_ack=True)
    
        l = task.LoopingCall(self.read, self.queue_object)
        l.start(interval=0.01)
        
        
    def next_request(body):
        raise NotImplementedError
        
        
    @defer.inlineCallbacks
    def read(self, queue_object):
        """Responsible for consuming the queue. See that it ACKs at the end.
        """
    
        # queue_object.get() is a ClosableDeferredQueue, hence, a Deferred
        ch, method, properties, body = yield queue_object.get()
    
        if body:
            self.next_request(body)
    

    然后实现一个从它继承的蜘蛛:

    class MySpider(TwistedConsumer):
    name = "my_spider"
    
    start_urls = []
    
    
    @classmethod
    def from_crawler(cls, crawler):
        
        spider = super(MySpider, cls).from_crawler(crawler)
        spider.start_consuming()
        return spider
    
    
    def next_request(self, body):
        
        # parse your body here
        # ....
        # let's assume your url is coming from the body
        
        request = scrapy.Request(url, callback=self.parse)    
        self.crawler.engine.crawl(request, self)
    
    
    def parse(self, response):
       # do your parsing stuff here
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-01-02
      • 1970-01-01
      • 2023-03-28
      • 1970-01-01
      • 2010-12-15
      • 2014-03-19
      • 1970-01-01
      相关资源
      最近更新 更多