【问题标题】:How to implement a RabbitMQ consumer using Pyspark Streaming module?如何使用 Pyspark Streaming 模块实现 RabbitMQ 消费者?
【发布时间】:2016-09-28 16:55:54
【问题描述】:

我有一个 Apache Spark 集群和一个 RabbitMQ 代理,我想使用 pyspark.streaming 模块来使用消息并计算一些指标。

问题是我只找到this package,但是用JavaScala实现的。除此之外,我没有在 Python 中找到任何示例或桥接实现。

我有一个使用 Pika 实现的消费者,但我不知道如何将有效负载传递给我的 StreamingContext

【问题讨论】:

  • 好吧,我刚刚发现PysparkRabbitMQ 都使用MQTT 协议。这可能是一个解决方案,但它们是一些权衡和限制
  • 在 RabbitMQ 集群上使用 MQTT 协议意味着更改队列配置。对我来说,这不是一个解决方案。我找到了解决它的方法。完成测试后,我将发布解决方案
  • 嘿,有进展吗?我现在面临同样的问题。就我而言,我什至无法设置 MQTT 概念验证。
  • 是的,这比我想象的要容易。我使用 TCP 连接 从我的 pika 消费者发送我的消息以触发。我将在几个小时内发布正式答案
  • 谢谢!我被困在这里:stackoverflow.com/questions/39331781/…

标签: python rabbitmq pyspark spark-streaming pika


【解决方案1】:

此解决方案使用 Spark Streaming 中的 pika asynchronous consumer examplesocketTextStream 方法

  1. 下载示例并将其保存为.py 文件
  2. 修改文件以使用您自己的 RabbitMQ 凭据和连接参数。就我而言,我必须修改 Consumer
  3. if __name__ == '__main__': 下,我们需要打开一个套接字,其中HOSTPORT 对应于您与Spark Streaming 的TCP 连接。我们必须将方法 sendall 从套接字保存到一个变量中,并将其传递给 Consumer

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
      s.bind((HOST, PORT))
      s.listen(1)
      conn, addr = s.accept()
      dispatcher = conn.sendall #assigning sendall to dispatcher variable
    consumer = Consumer(dispatcher)
    try:
      consumer.run()
    except Exception as e:
      consumer.stop()
      s.close()
    
  4. 修改Consumer中的__init__方法传递dispatcher

    def __init__(self,dispatcher):
      self._connection = None
      self._channel = None
      self._closing = False
      self._consumer_tag = None
      self._url = amqp_url
      #new code
      self._dispatcher = dispatcher
    
  5. 在Consumer内部的方法on_message中我们调用self._dispatcher来发送AMQP消息的body

    def on_message(self, unused_channel, basic_deliver, properties, body):
      self._channel.basic_ack(basic_deliver.delivery_tag)
      try:
        # we need an '\n' at the each row Spark socketTextStream
        self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8"))
      except Exception as e:
        raise
    
  6. 在 Spark 中,将 ssc.socketTextStream(HOST, int(PORT))HOSTPORT 对应于我们的 TCP 套接字。 Spark 将管理连接

  7. 先运行消费者,然后运行 ​​Spark 应用程序

最后的评论:

  • 尝试在不同的机器上运行您的消费者,而不是您的 Spark 机器
  • 任何超过 10000 的端口都可以。不要让内核随意打开一些端口
  • 平台:Linux Debian 7 和 8,以及 Ubuntu 14.04 和 16.04
  • 鼠兔版本 0.10.0
  • Python 版本 3.5.2
  • Spark 版本 1.6.1、1.6.2 和 2.0.0

【讨论】:

  • 能否请您提供一些关于如何使用 pyspark 向 rabbitmq 发布消息的指南。具体来说,我正在使用 Azure Databricks,我正在尝试向 rabbitmq 发布消息,但不知道如何实现这一目标
  • 对 EXCHANGE 和 EXCHANGE_KEY 参数有什么要求,或者我可以随意设置吗? (EXCHANGE='', EXCHANGE_KEY='topic' ?)
  • Channel 1 was closed: (Stream connection lost: TypeError('exchange must be a str or unicode str, but got <bound method RabbitConsumer.on_exchange_declareok of <RabbitConsumer object at 0x7f996653c3c8>>',))。我收到了这个错误,我的 Rabbitmq 连接参数如下:EXCHANGE = 'message',EXCHANGE_TYPE = 'topic',QUEUE = 'item',ROUTING_KEY = 'item'。我有一个使用 routing_key 'item' 运行 'item' 的队列。当我尝试您的解决方案时,弹出了上述错误。请帮忙!
猜你喜欢
  • 2012-07-06
  • 2020-11-02
  • 2014-08-13
  • 1970-01-01
  • 2011-04-15
  • 2013-10-10
  • 1970-01-01
  • 2017-02-23
  • 1970-01-01
相关资源
最近更新 更多