【问题标题】:Consume raw json queue messages in Laravel在 Laravel 中使用原始 json 队列消息
【发布时间】:2015-12-06 22:54:23
【问题描述】:

通常,Laravel 期望它将以后使用的任何消息排队。它创建一个带有job 属性的有效负载,该属性稍后指示如何处理队列消息。当你用 Laravel 排队作业,然后用 Laravel 处理它们时,效果很好!

但是,我有一些非 Laravel 应用程序将 json 消息发布到队列中。我需要 Laravel 来接收这些消息并处理它们。

我可以编写命令总线作业来处理消息,但我无法弄清楚如何告诉 queue:work 将消息发送到我的特定处理程序。

似乎 Laravel 有一个硬性假设,即要求它处理的任何队列消息都将按照预期的方式正确格式化、序列化和结构化。

我怎样才能让 Laravel 提取这些原始 json 有效负载,忽略结构(没有什么可以理解的),然后简单地将有效负载交给我的处理程序?

例如,如果我有类似的队列消息:

{
    "foo" : "bar"
}

同样,Laravel 在这里没有什么可以检查或理解。

但我有一个知道如何处理此问题的作业处理程序:

namespace App\Jobs;

class MyQueueHandler {
    public function handle($payload) {
        Log::info($payload['foo']); // yay!
    }
}

现在如何让queue:workqueue:listen 将任何有效负载简单地交给这个App\Jobs\MyQueueHandler 处理程序,我可以在哪里自己完成剩下的工作?

【问题讨论】:

    标签: php laravel


    【解决方案1】:

    如果您使用的是 Laravel 5.6+,请查看this package

    【讨论】:

      【解决方案2】:

      由于 Laravel 试图执行 Gearman 有效负载,因此您要求的内容是不可能的(请参阅 \Illuminate\Bus\Dispatcher)。

      我遇到了同样的情况,只是在 Laravel 作业类周围创建了一个包装器 command。这不是最好的解决方案,因为它将重新排队事件,进入 json 队列,但您不必触及现有的作业类。也许有更多经验的人知道如何在不通过网络再次发送工作的情况下分派工作。

      假设我们有一个名为GenerateIdentApplicationPdfJob 的常规 Laravel 工作者类。

      class GenerateIdentApplicationPdfJob extends Job implements SelfHandling, ShouldQueue
      {
          use InteractsWithQueue, SerializesModels;
      
          /** @var User */
          protected $user;
      
          protected $requestId;
      
          /**
           * Create a new job instance.
           *
           * QUEUE_NAME = 'ident-pdf';
           *
           * @param User $user
           * @param      $requestId
           */
          public function __construct(User $user, $requestId)
          {
              $this->user      = $user;
              $this->requestId = $requestId;
          }
      
          /**
           * Execute the job.
           *
           * @return void
           */
          public function handle(Client $client)
          {
              // ...
          }
      }
      

      为了能够处理这个类,我们需要提供我们自己的构造函数参数。这些是我们 json 队列中所需的数据。

      下面是一个 Laravel commandGearmanPdfWorker,它完成了 Gearman 连接和 json_decode 的所有样板,以便能够处理原始作业类。

      类 GearmanPdfWorker 扩展命令 {

          /**
           * The console command name.
           *
           * @var string
           */
          protected $name = 'pdf:worker';
      
          /**
           * The console command description.
           *
           * @var string
           */
          protected $description = 'listen to the queue for pdf generation jobs';
      
          /**
           * @var \GearmanClient
           */
          private $client;
      
          /**
           * @var \GearmanWorker
           */
          private $worker;
      
          public function __construct(\GearmanClient $client, \GearmanWorker $worker) {
              parent::__construct();
              $this->client = $client;
              $this->worker = $worker;
          }
      
          /**
           * Wrapper listener for gearman jobs with plain json payload
           *
           * @return mixed
           */
          public function handle()
          {
              $gearmanHost = env('CB_GEARMAN_HOST');
              $gearmanPort = env('CB_GEARMAN_PORT');
      
              if (!$this->worker->addServer($gearmanHost, $gearmanPort)) {
                  $this->error('Error adding gearman server: ' . $gearmanHost . ':' . $gearmanPort);
                  return 1;
              } else {
                  $this->info("added server $gearmanHost:$gearmanPort");
              }
      
              // use a different queue name than the original laravel command, since the payload is incompatible
              $queueName = 'JSON.' . GenerateIdentApplicationPdfJob::QUEUE_NAME;
              $this->info('using queue: ' . $queueName);
      
              if (!$this->worker->addFunction($queueName,
                  function(\GearmanJob $job, $args) {
                      $queueName = $args[0];
                      $decoded = json_decode($job->workload());
                      $this->info("[$queueName] payload: " . print_r($decoded, 1));
      
                      $job = new GenerateIdentApplicationPdfJob(User::whereUsrid($decoded->usrid)->first(), $decoded->rid);
                      $job->onQueue(GenerateIdentApplicationPdfJob::QUEUE_NAME);
                      $this->info("[$queueName] dispatch: " . print_r(dispatch($job)));
                  },
                  [$queueName])) {
                  $msg = "Error registering gearman handler to: $queueName";
                  $this->error($msg);
                  return 1;
              }
      
              while (1) {
                  $this->info("Waiting for job on `$queueName` ...");
                  $ret = $this->worker->work();
                  if ($this->worker->returnCode() != GEARMAN_SUCCESS) {
                      $this->error("something went wrong on `$queueName`: $ret");
                      break;
                  }
                  $this->info("... done `$queueName`");
              }
          }
      }
      

      GearmanPdfWorker 类需要像这样在你的\Bundle\Console\Kernel 中注册:

      class Kernel extends ConsoleKernel
      {
          protected $commands = [
              // ...
              \Bundle\Console\Commands\GearmanPdfWorker::class
          ];
      
          // ...
      

      一切就绪后,您可以调用 php artisan pdf:worker 来运行 worker 并通过命令行将一项作业放入 Gearman:gearman -v -f JSON.ident-pdf '{"usrid":9955,"rid":"ABC4711"}'

      然后就可以看到操作成功了

      added server localhost:4730
      using queue: JSON.ident-pdf
      Waiting for job on `JSON.ident-pdf` ...
      [JSON.ident-pdf] payload: stdClass Object
      (
          [usrid] => 9955
          [rid] => ABC4711
      )
      
      0[JSON.ident-pdf] dispatch: 1
      ... done `JSON.ident-pdf`
      Waiting for job on `JSON.ident-pdf` ...
      

      【讨论】:

      • 我应该回来更新的。我最终找到了这个包:github.com/kristianedlund/laravel-external-queue。边缘有点粗糙,但它起作用了。允许我从 SQS 获取原始 json 有效负载并将它们交给处理程序。我也很容易为 IronMQ 扩展它。
      • 我正在努力实现该库(使用rabbitmq),您在旅途中遇到过任何代码示例吗?谢谢
      【解决方案3】:

      您没有指定 Laravel 的哪个版本,所以我猜测是 5.1(在 L4.2 和 L5.x 中处理方式的巨大差异)。

      如果您已经设置了App\Jobs\MyQueueHandler,并希望使用您想要的任何数据从控制器排队作业,您可以这样做:

      use App\Jobs\MyQueueHandler;
      
      class MyController 
      {
          public function myFunction() 
          {
              $this->dispatch(new MyQueueHandler(['foo' => 'bar']));
          }
      }
      

      在您的MyQueueHandler-class 中,有效负载实际上进入了您的构造函数。但是,当您的队列被处理时,仍然会触发句柄方法。但是,如果您依赖依赖注入,则可以在 handle-method 上使用参数(read more here,就在“When Things Go Wrong”上方)所以应该这样做:

      namespace App\Jobs;
      
      class MyQueueHandler 
      {
      
          protected $payload;
      
          public function __construct($payload)
          {
              $this->payload = $payload;
          }
      
          public function handle() {
              Log::info($this->payload['foo']); // yay!
          }
      }
      

      注意:如果您想从主控制器(继承自标准 App\Http\Controller-class,请使用 DispatchesJobs 特征)外部调度作业;

      MyClass 
      {
          use DispatchesJobs;
      
          public function myFunction()
          {
              $this->dispatch(new MyQueueHandler(['foo' => 'bar']));
          }
      }
      

      (使用 Laravel 5.1.19 和 beanstalkd 队列适配器测试的代码)。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-11-27
        • 2017-02-07
        • 2019-02-14
        • 2011-02-18
        • 2016-07-29
        • 2019-12-22
        相关资源
        最近更新 更多