【问题标题】:Use Messenger to read queued message not sent with Messenger使用 Messenger 阅读未使用 Messenger 发送的排队消息
【发布时间】:2019-08-29 20:06:29
【问题描述】:

我正在尝试读取不是通过 Symfony Messenger 发送的排队消息(在 RabbitMQ 中)。 Messenger似乎添加了一些标题,例如

headers: 
    type: App\Message\Transaction

但是在读取外部消息的时候,这个header是不存在的。

那么,有没有办法告诉 Messenger 队列 A 中的每条消息都必须被视为消息类型 Transaction

我今天拥有的是:

framework:
    messenger:
        transports:
            # Uncomment the following line to enable a transport named "amqp"
            amqp:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: messages
                        type: direct
                    queue:
                        name: queue_messages

        routing:
            # Route your messages to the transports
             'App\Message\Transaction': amqp

我想补充的是:

        routing:
            # Route your messages to the transports
             amqp: 'App\Message\Transaction'

【问题讨论】:

  • 这不是真正的解决方案,但如果您可以控制消息创建,则可以将标头 type 与 FQCN 作为值。我也希望有一个更好的解决方案,比如在您的帖子中描述。
  • 我不够清楚。问题是,我无法控制消息的创建,我只知道将在这个队列中发送什么。
  • 同意,目前尚不清楚,实际上也不是可行的解决方案。会在symfony slack上询问是否可以实现这样的配置或工厂,我回复你。

标签: symfony rabbitmq symfony-messenger


【解决方案1】:

Ryan Weaver 就 symfony 的 slack 回复了类似的问题

如果消息不是来自 messenger,您将需要一个自定义的 messenger 序列化程序:)

1)您创建一个自定义序列化(从 Messenger 实现 SerializerInterface)并在 Messenger 配置下配置它

2) 在该序列化程序中,您以某种方式获取 JSON 并将其转换为您在代码中拥有的一些“消息”对象。 如何这取决于你 - 你需要能够以某种方式查看你的 JSON 并找出它应该映射到哪个消息类。然后您可以手动创建该对象并填充数据,或者使用 Symfony 的序列化程序。在退回之前将其包裹在信封中

3) 因为您的序列化程序现在正在返回一个“消息”对象(如果有的话),Messenger 使用其正常逻辑来查找该消息的处理程序并执行它们


我根据自己的需要做了一个快速的实现,由你来适应你的业务逻辑

1 - 创建一个 Serializer 来实现 SerializerInterface


   // I keeped the default serializer, and just override his decode method.

   /**
     * {@inheritdoc}
     */
    public function decode(array $encodedEnvelope): Envelope
    {
        if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
            throw new InvalidArgumentException('Encoded envelope should have at least a "body" and some "headers".');
        }

        if (empty($encodedEnvelope['headers']['action'])) {
            throw new InvalidArgumentException('Encoded envelope does not have an "action" header.');
        }

        // Call a factory to return the Message Class associate with the action
        if (!$messageClass = $this->messageFactory->getMessageClass($encodedEnvelope['headers']['action'])) {
            throw new InvalidArgumentException(sprintf('"%s" is not a valid action.', $encodedEnvelope['headers']['action']));
        }

        // ... keep the default Serializer logic

        return new Envelope($message, ...$stamps);
    }

2 - 使用工厂检索正确的Message

class MessageFactory
{
    /**
     * @param string $action
     * @return string|null
     */
    public function getMessageClass(string $action)
    {
        switch($action){
            case ActionConstants::POST_MESSAGE :
                return PostMessage::class ;
            default:
                return null;
        }
    }
}

3) 为 messenger 配置新的自定义序列化程序:

framework:
  messenger:
    serializer: 'app.my_custom_serializer'

我会尝试更进一步,找到一种直接“连接”队列的方法,然后通知您。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-02-10
    • 1970-01-01
    • 1970-01-01
    • 2016-09-06
    • 2017-06-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多