【问题标题】:Akka.Net PreRestart not executed when exception from async handler来自异步处理程序的异常时未执行 Akka.Net PreRestart
【发布时间】:2017-12-03 10:37:56
【问题描述】:

我有以下 Actor 我正在尝试重新启动并将失败的消息重新发送回 Actor:

public class BuildActor : ReceivePersistentActor
{
    public override string PersistenceId => "asdad3333";

    private readonly IActorRef _nextActorRef;

    public BuildActor(IActorRef nextActorRef)
    {
        _nextActorRef = nextActorRef;

        Command<Workload>(x => Build(x));

        RecoverAny(workload =>
        {
            Console.WriteLine("Recovering");
        });
    }

    public void Build(Workload Workload)
    {
        var context = Context;
        var self = Self;

        Persist(Workload, async x =>
        {
            //after this line executes
            //application goes into break mode
            //does not execute PreStart or Recover
            var workload = await BuildTask(Workload);

            _nextActorRef.Tell(workload);

            context.Stop(self);
        });
    }

    private Task<Workload> BuildTask(Workload Workload)
    {
        //works as expected if method made synchronous
        return Task.Run(() =>
        {
            //simulate exception
            if (Workload.ShowException)
            {
                throw new Exception();
            }

            return Workload;
        });
    }

    protected override void PreRestart(Exception reason, object message)
    {
        if (message is Workload workload)
        {
            Console.WriteLine("Prestart");

            workload.ShowException = false;

            Self.Tell(message);
        }
    }
}

Persist 的成功处理程序中,我试图模拟抛出的异常,但在出现异常时,应用程序会进入中断模式,并且不会调用PreRestart 挂钩。但是,如果我通过删除 Task.Run 使 BuildTask 方法同步,那么在异常时会调用 PreRestartRecover&lt;T&gt; 方法。

如果有人能指出我应该推荐什么模式以及我哪里出错了,我将不胜感激。

【问题讨论】:

  • 你确定这和持久性有关吗?我似乎记得异步方法中的一个异常导致我的整个演员系统终止并且我没有使用持久性。我最终进行了编码,以便异步调用捕获所有异常并使用一些错误消息执行 Tell。 (不适合“让它失败”的理念,但这是一种权宜之计。)

标签: c# akka task akka.net


【解决方案1】:

很可能,Akka.Persistence 不是解决您的问题的好方法。

Akka.Persistence 使用eventsourcing 原则来存储actor 的状态。在这种情况下有几个重要的关键点:

  • 您发送给actor的是命令。它描述了一项工作,你想完成。执行该命令可能会导致执行一些实际处理,并最终可能导致以事件的形式持久化actor的线性状态更改历史。
  • 在 Akka.NET 中,Persist 方法仅用于存储 事件 - 它们描述了发生了某事的事实:因此,它们不能被拒绝并且它们不会失败(您在 Persist 回调中所做的事情)。
  • 当演员在任何时间点重新启动时,它总是会尝试通过重播所有事件Persisted 直到最后一个已知时间点来重新创建自己的状态。出于这个原因,Recover 方法应该只专注于重放参与者的状态(它可以在同一个事件上多次调用)并且永远不会产生副作用(副作用的例子是发送电子邮件),这一点很重要。那里抛出的任何异常都意味着,actor 状态已不可恢复地损坏,并且该actor 将被杀死。

如果您想将消息重新发送给您的演员,您可以:

  1. 将可靠的消息队列(即 RabbitMQ 或 Azure 服务总线)或日志(Kafka 或事件中心)放在您的 Actor 处理管道前面。这实际上是很多情况下最合理的方案。
  2. 使用来自 Akka.Persistence 的 at-least-once delivery 语义 - 但恕我直言,仅当由于某种原因您不能使用第一种解决方案时。
  3. 最简单和不可靠的选项(因为消息只驻留在内存中并且从不持久化)是dead letter queue。每个未处理的消息都发送到那里。您可以订阅它并过滤传入的数据,以检测应将哪些消息再次发送给其收件人。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-03-10
    • 1970-01-01
    • 1970-01-01
    • 2011-04-07
    • 2011-12-10
    • 1970-01-01
    • 2018-07-19
    相关资源
    最近更新 更多