【问题标题】:Amphp : Run many async loops with same connection (eventstore client)Amphp:使用相同的连接运行许多异步循环(eventstore 客户端)
【发布时间】:2020-07-07 22:41:35
【问题描述】:

我正在使用使用 amphp 的 eventstore 客户端。我需要在我的应用程序中重用多个部分的连接。

所以我创建了一个连接提供者:

public function getConnection(): EventStoreConnection
{
    if ($this->connection) {
        return $this->connection;
    }
    $this->connection = $this->createConnection();
    wait($this->connection->connectAsync());

    return $this->connection;
}

然后我在很多地方都使用了这个连接:

\Amp\Loop::run(function () use ($eventStoreEvents, $streamName) {
    $connection = $this->connectionProvider->getConnection();

    // Creation of an event stream
    yield $connection->appendToStreamAsync($streamName, ExpectedVersion::ANY, $eventStoreEvents);
    // sleep(10); // This sleep does not work, code continue like nothing happend
});

\Amp\Loop::run(function () use ($streamName, $aggregateFqcn, &$aggregateRoot) {

    $start = 0;
    $count = \Prooph\EventStore\Internal\Consts::MAX_READ_SIZE;

    $connection = $this->connectionProvider->getConnection();

    do {
        $events = [];
        /** @var StreamEventsSlice $streamEventsSlice */
        $streamEventsSlice = yield $connection
            ->readStreamEventsForwardAsync(
                $streamName,
                $start,
                $count,
                true
            );

        if (!$streamEventsSlice->status()->equals(SliceReadStatus::success())) {
            dump($streamEventsSlice); // Event stream does not exist
            // Error here: the event stream doesn't exist at this point.
            throw new RuntimeGangxception('Impossible to generate the aggregate');
        }
    } while (! $streamEventsSlice->isEndOfStream());
});

问题:似乎第一个请求还没有结束,但第二个循环已经开始了。未注释的 sleep 没有任何影响!

但是事件流最终是用里面的相关事件创建的,所以第一个请求成功了。

如果我启动一个连接然后关闭然后启动一个新连接,它就可以工作。但由于每次新连接的握手开销,它很慢。

我用 Amphp 的 WebSocket 库尝试了一个类似的例子,它成功了。你看出什么不对了吗?

这是我对 websocket 的测试:

$connection = \Amp\Promise\wait(connect('ws://localhost:8080'));
Amp\Loop::run(function () use ($connection) {
   /** @var Connection $connection */
   yield $connection->send("Hello...");
   sleep(10); // This sleep works!
});

Amp\Loop::run(function () use ($connection) {
   /** @var Connection $connection */
   yield $connection->send("... World !");
});

$connection->close();

【问题讨论】:

  • 你不应该有两个Amp\Loop::run,因为你只需要一个事件循环(见amphp.org/amp/event-loop/api)。我认为您并不完全理解正在使用的概念,因为yield 在第一个上下文中基本上意味着“在我准备好之前不要走得更远”并允许事件循环处理其他事件,所以sleep(10); 在上面的yield 准备好/返回之前不会到达/执行。
  • 睡眠只是一个测试。我注意到使用不同工具的相同情况会导致不同的结果。我使用 yield 来等待异步工作结束(我将其理解为 amphp 循环内的等待)。问题好像..其实还没做完!?

标签: php amphp


【解决方案1】:

您尝试做的事情毫无意义。你应该阅读amphp's documenation

Amp 对事件循环使用全局访问器,因为每个应用程序只有一个事件循环。同时运行两个循环是没有意义的,因为它们只需要以繁忙的等待方式相互调度才能正确运行。

也就是说,实际上没有第二个循环。

【讨论】:

  • 使用 2 个循环绝对不是最好的方法。但这是有道理的。我发现来自 prooph 的 eventstore 客户端库没有按预期工作。 (yield 实际上并不等待建立连接)我建议你在这里阅读首页上的文档amphp.org/amp/#preamble 那说很明显这个库不适合我的用例,因为你是主要维护者,我应该停止使用它,我会的。感谢您的宝贵时间。
【解决方案2】:

Prooph eventstore 库基于 amphp 但并不遵循所有原则:您不能等待连接准备好。如果你尝试大规模使用它会更糟,所以不要试图等待承诺完成。

作为替代方案,您可以为以后设置一个承诺并检查连接是否为空。这就是库在内部处理进一步步骤的实际操作。

在我这边,我决定停止使用这个库。但作为替代方案,您可以使用使用 HTTP 客户端的库,它也来自 prooph 团队。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-19
    • 1970-01-01
    • 1970-01-01
    • 2014-06-29
    相关资源
    最近更新 更多