【发布时间】:2021-07-24 09:08:57
【问题描述】:
我正在尝试解决 Scrapy 的要求,即使用 start_urls 数组或 start_requests 生成器中的 URL 来喂蜘蛛。相反,我想从由 RabbigMQ 的 Twisted 协议在接收到特定交换的消息时触发的回调中踢出下载器(以及后续的下载器中间件)。
接下来,我有一个消费者和生产者应用程序的简单示例。生产者应用程序是一个 Python 程序,可将 URL 发送到 RabbitMQ 交换。消费者是一个非阻塞的 Twisted 协议,它只是将它们打印到标准输出。
这是示例消费者 Twisted 协议。它在交换报价上监听路由键 url:
import pika
from pika import spec
from pika.adapters import twisted_connection
from pika.exchange_type import ExchangeType
from twisted.internet import reactor, protocol, defer
from twisted.internet.defer import inlineCallbacks
from twisted.python import log
PREFETCH_COUNT = 2
EXCHANGE = "quotes"
ROUTING_KEY = "urls"
class PikaProtocol(twisted_connection.TwistedProtocolConnection):
connected = False
name = "AMQP:Protocol"
def __init__(self, factory, parameters):
super().__init__(parameters)
self.factory = factory
@inlineCallbacks
def connectionReady(self):
self._channel = yield self.channel()
yield self._channel.basic_qos(prefetch_count=PREFETCH_COUNT)
self.connected = True
yield self._channel.confirm_delivery()
for (
exchange,
routing_key,
callback,
) in self.factory.read_list:
yield self.setup_read(exchange, routing_key, callback)
self.send()
@inlineCallbacks
def read(self, exchange, routing_key, callback):
"""Add an exchange to the list of exchanges to read from."""
if self.connected:
yield self.setup_read(exchange, routing_key, callback)
@inlineCallbacks
def setup_read(self, exchange, routing_key, callback):
"""This function does the work to read from an exchange."""
if exchange:
yield self._channel.exchange_declare(
exchange=exchange,
exchange_type=ExchangeType.topic,
durable=True,
auto_delete=False,
)
yield self._channel.queue_declare(queue=routing_key, durable=True)
if exchange:
yield self._channel.queue_bind(queue=routing_key, exchange=exchange)
yield self._channel.queue_bind(
queue=routing_key, exchange=exchange, routing_key=routing_key
)
(
queue,
_consumer_tag,
) = yield self._channel.basic_consume(queue=routing_key, auto_ack=False)
d = queue.get()
d.addCallback(self._read_item, queue, callback)
d.addErrback(self._read_item_err)
def _read_item(self, item, queue, callback):
"""Callback function which is called when an item is read."""
d = queue.get()
d.addCallback(self._read_item, queue, callback)
d.addErrback(self._read_item_err)
(
channel,
deliver,
_props,
msg,
) = item
log.msg(
"%s (%s): %s" % (deliver.exchange, deliver.routing_key, repr(msg)),
system="Pika:<=",
)
d = defer.maybeDeferred(callback, item)
d.addCallbacks(
lambda _: channel.basic_ack(deliver.delivery_tag),
lambda _: channel.basic_nack(deliver.delivery_tag),
)
@staticmethod
def _read_item_err(error):
print(error)
def send(self):
"""If connected, send all waiting messages."""
if self.connected:
while self.factory.queued_messages:
(
exchange,
r_key,
message,
) = self.factory.queued_messages.pop(0)
self.send_message(exchange, r_key, message)
@inlineCallbacks
def send_message(self, exchange, routing_key, msg):
"""Send a single message."""
log.msg("%s (%s): %s" % (exchange, routing_key, repr(msg)), system="Pika:=>")
yield self._channel.exchange_declare(
exchange=exchange,
exchange_type=ExchangeType.topic,
durable=True,
auto_delete=False,
)
prop = spec.BasicProperties(delivery_mode=2)
try:
yield self._channel.basic_publish(
exchange=exchange, routing_key=routing_key, body=msg, properties=prop
)
except Exception as error: # pylint: disable=W0703
log.msg("Error while sending message: %s" % error, system=self.name)
class PikaFactory(protocol.ReconnectingClientFactory):
name = "AMQP:Factory"
def __init__(self, parameters):
self.parameters = parameters
self.client = None
self.queued_messages = []
self.read_list = []
def startedConnecting(self, connector):
log.msg("Started to connect.", system=self.name)
def buildProtocol(self, addr):
self.resetDelay()
log.msg("Connected", system=self.name)
self.client = PikaProtocol(self, self.parameters)
return self.client
def clientConnectionLost(self, connector, reason): # pylint: disable=W0221
log.msg("Lost connection. Reason: %s" % reason.value, system=self.name)
protocol.ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
def clientConnectionFailed(self, connector, reason):
log.msg("Connection failed. Reason: %s" % reason.value, system=self.name)
protocol.ReconnectingClientFactory.clientConnectionFailed(
self, connector, reason
)
def send_message(self, exchange=None, routing_key=None, message=None):
self.queued_messages.append((exchange, routing_key, message))
if self.client is not None:
self.client.send()
def read_messages(self, exchange, routing_key, callback):
"""Configure an exchange to be read from."""
self.read_list.append((exchange, routing_key, callback))
if self.client is not None:
self.client.read(exchange, routing_key, callback)
def main():
parameters = pika.ConnectionParameters(
host="localhost",
virtual_host="/",
credentials=pika.PlainCredentials("guest", "guest"),
)
def callback(tup):
url = tup[-1].decode('utf-8')
print(f'Received {url}')
f = PikaFactory(parameters)
f.read_messages(EXCHANGE, ROUTING_KEY, callback)
reactor.connectTCP(parameters._host, parameters._port, f)
reactor.run()
if __name__ == "__main__":
main()
这是您将 URL 提供给它的方式(并看到它们打印在标准输出上):
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.basic_publish(exchange='quotes', routing_key='urls', body='http://quotes.toscrape.com/page/1/')
print(" [x] Sent url to quotes exchange")
connection.close()
如何运行引号蜘蛛下载器(而不是简单地将 URL 打印到标准输出)?
谢谢!
【问题讨论】:
标签: python scrapy rabbitmq twisted