【问题标题】:Consume JSON messages with Symfony Messenger & Rabbitmq使用 Symfony Messenger 和 Rabbitmq 使用 JSON 消息
【发布时间】:2021-06-18 10:30:35
【问题描述】:

我正在使用 Symfony 5 和 Messenger 通过 Rabbitmq 主题队列使用来自外部应用程序的 JSON 消息。

消费者在序列化步骤中出错。它还没有进入我的处理程序,所以我将在这里专注于序列化程序。

这是使用它找到的第一条消息时的错误:

编码的信封没有“类型”标头。

我尝试将type 属性添加到值为application/json 的消息头,然后是json,但我得到了类似的错误:

无法对“json”类型的对象进行非规范化,找不到支持的规范化器。

理想情况下,我不需要在外部系统上添加标头,但如有必要,我可以。

如何配置 Symfony 或消息本身以克服 json 序列化错误?

这是我的设置:

消息示例:["ABC", 1]

messenger.yaml:

framework:
    messenger:
        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                serializer: messenger.transport.symfony_serializer
                retry_strategy:
                    delay: 500
                options:
                    exchange:
                        name: tln_pot
                        type: topic
                        flags: 0
                        default_publish_routing_key: tln_pot
                    queues:
                        tln_pot:
                            binding_keys: [tln_pot]

bin/console config:dump 框架信使

    ...
    serializer:

        # Service id to use as the default serializer for the transports.
        default_serializer:   messenger.transport.native_php_serializer
        symfony_serializer:

            # Serialization format for the messenger.transport.symfony_serializer service (which is not the serializer used by default).
            format:               json

            # Context array for the messenger.transport.symfony_serializer service (which is not the serializer used by default).
            context:

                # Prototype
                name:                 ~

【问题讨论】:

  • 我认为application/json 类型不正确,应该是数组?也许尝试在没有附加消费者的情况下先将数据发送到队列并检查消息。如果您根本不想附加类型标头,您可能需要编写一个类似于AmqpReceiver.php 的自定义接收器

标签: symfony rabbitmq


【解决方案1】:

我们最近做了同样的事情,为了让其他人更好地了解这一点,我添加了一些东西作为答案。 首先,如果您将消息发送到队列,则配置中指定的传输是您的发送者,而当您使用消息时,它也是您的接收者。
实际上,您正在从外部源接收数据,您可以定义自己的序列化器,它将充当您的接收器,在序列化器中,您可以对消息进行解码或编码,并根据信使消息格式对其进行建模。

我们的 Messenger 配置如下所示:

framework:
messenger:
    # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
    # failure_transport: failed

    transports:
        # https://symfony.com/doc/current/messenger.html#transport-configuration
        async: '%env(MESSENGER_TRANSPORT_DSN)%'
        failed: 'doctrine://default?queue_name=failed'
        # sync: 'sync://'
        external_messages:
          dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
          serializer: App\Messenger\ExternalJsonMessageSerializer

    routing:

        # Route your messages to the transports
     
        'App\Message\YourAqmpMessageClass': external_messages

在 ExternalJsonMessageSerializer 类中,您可以接收解码的消息并根据 symfony 消息标准返回消息的信封,

....

class ExternalJsonMessageSerializer implements SerializerInterface
{
   public function decode(array $encodedEnvelope): Envelope
   {
      $body = $encodedEnvelope['body'] ;
      $headers = $encodedEnvelope['headers'];

      $data = unserialize($body);

      if (null === $data) {
        throw new MessageDecodingFailedException('Invalid JSON');
      }

      // in case of redelivery, unserialize any stamps
      $stamps = [];
      if (isset($headers['stamps'])) {
         $stamps = unserialize($headers['stamps']);
      }

      $envelope = new Envelope(new YourAqmpMessageClass(serialize($data)));
      $envelope = $envelope->with(... $stamps);

      return $envelope;
  }
....

这里是快速查看https://symfonycasts.com/screencast/messenger/transport-serializer

我们从发送者应用发送到队列的数据是

$sendDataToQueue = json_encode([

'headers'=>['type'=>'mail'],

'body' => serialize(['data'=>$data])
]);

然后您可以将该数据放入队列中。

如果您通过 messenger 使用消息,那么您需要遵循信封/消息格式。

当您的外部序列化程序将收到消息时,该消息将已被 json 解码。然后,您需要做的就是正确检查消息。

您的消费命令将如下所示:

bin/console messenger:consume external_messages -vv

希望它能为你澄清事情。

进一步阅读:

https://www.slideshare.net/CodeId/mastering-message-queues-tobias-nyholm-codeid

https://tool.lu/en_US/deck/jh/detail

【讨论】:

    猜你喜欢
    • 2020-03-14
    • 1970-01-01
    • 1970-01-01
    • 2019-08-29
    • 1970-01-01
    • 2012-03-18
    • 1970-01-01
    • 2021-07-28
    • 2012-02-17
    相关资源
    最近更新 更多