【问题标题】:How does Akka.NET persistence handle replaying messages containing IActorRef?Akka.NET 持久性如何处理包含 IActorRef 的重播消息?
【发布时间】:2017-05-27 21:09:13
【问题描述】:

如果我向 Akka.NET Actor 发送一条消息,该消息是一个包含 IActorRef 的对象,然后持久化该消息,写入日志表的 JSON 如下所示:

{"$id":"1","$type":"LearningAkka.Program+BindReference, LearningAkka","Reference":{"$id":"2","$type":"Akka.Actor.ActorRefBase+Surrogate, Akka","Path":"akka://LearningAkka/user/$b#1222898859"}}

如果我理解正确的话,这只是对演员实例的引用;创建它所需的“道具”未存储在此消息中。

奇怪的是,我在重新启动应用程序后 am 看到了一个对象。但是,正如预期的那样,它并没有像重启之前那样构建。这个演员是哪里来的? Akka Persistence 是否找到了一个“足够相似”的演员并使用它来代替?

以下 C# 测试应用程序创建一个对象并将其绑定到其他三个对象之一。在处理完参与者系统后,该对象会从持久性(SQL Server)中重新创建并检查引用。

我的预期行为是以下任何一种(我不确定什么是最合适的):

  • 无法创建 Actor,因为其中一条消息包含无法解析的引用。
  • actor 引用为 null,因为它无法解析。
  • actor 引用指向死信或类似内容。

控制台输出:

[WARNING][27/05/2017 21:02:27][Thread 0001][ActorSystem(LearningAkka)] NewtonSoftJsonSerializer has been detected as a default serializer. It will be obsoleted in Akka.NET starting from version 1.5 in the favor of Hyperion (for more info visit: http://getakka.net/docs/Serialization#how-to-setup-hyperion-as-default-serializer ). If you want to suppress this message set HOCON `akka.suppress-json-serializer-warning` config flag to on.
From the first run B

[WARNING][27/05/2017 21:02:28][Thread 0001][ActorSystem(LearningAkka)] NewtonSoftJsonSerializer has been detected as a default serializer. It will be obsoleted in Akka.NET starting from version 1.5 in the favor of Hyperion (for more info visit: http://getakka.net/docs/Serialization#how-to-setup-hyperion-as-default-serializer ). If you want to suppress this message set HOCON `akka.suppress-json-serializer-warning` config flag to on.
From the second run B

C#:

using Akka.Actor;
using Akka.Event;
using Akka.Persistence;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace LearningAkka
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var actorSystem = ActorSystem.Create("LearningAkka"))
            {
                var referenceA = actorSystem.ActorOf(Props.Create(() => new TestReferencedActor("From the first run A")));
                var referenceB = actorSystem.ActorOf(Props.Create(() => new TestReferencedActor("From the first run B")));
                var referenceC = actorSystem.ActorOf(Props.Create(() => new TestReferencedActor("From the first run C")));
                var actor = actorSystem.ActorOf(Props.Create(() => new TestActor()));
                actor.Tell(new BindReference { Reference = referenceB });
                actor.Tell(new CheckReference());
                Console.ReadLine();
            }

            using (var actorSystem = ActorSystem.Create("LearningAkka"))
            {
                var referenceA = actorSystem.ActorOf(Props.Create(() => new TestReferencedActor("From the second run A")));
                var referenceB = actorSystem.ActorOf(Props.Create(() => new TestReferencedActor("From the second run B")));
                var referenceC = actorSystem.ActorOf(Props.Create(() => new TestReferencedActor("From the second run C")));
                var actor = actorSystem.ActorOf(Props.Create(() => new TestActor()));
                actor.Tell(new CheckReference());
                Console.ReadLine();
            }
        }

        public struct BindReference { public IActorRef Reference; }
        public struct CheckReference { }

        public sealed class TestActor : ReceivePersistentActor
        {
            public override string PersistenceId => "test hardcoded";

            private IActorRef StoredFromMessage;

            public TestActor()
            {
                Command<CheckReference>(m => StoredFromMessage.Tell(m));
                Command<BindReference>(m => Persist(m, m2 => StoredFromMessage = m2.Reference));
                Recover<BindReference>(m => StoredFromMessage = m.Reference);
            }
        }

        public sealed class TestReferencedActor : ReceiveActor
        {
            public TestReferencedActor(string ourLabel)
            {
                Receive<CheckReference>(m => Console.WriteLine(ourLabel));
            }
        }
    }
}

霍康:

      akka {
        persistence {
          journal {
            plugin = "akka.persistence.journal.sql-server"
            sql-server {
              class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
              connection-string = "Data Source=(localdb)\\MSSQLLocalDB;Initial Catalog=LearningAkka;Integrated Security=True;Connect Timeout=30;Encrypt=False;TrustServerCertificate=True;ApplicationIntent=ReadWrite;MultiSubnetFailover=False"
              schema-name = dbo
              table-name = Journal
              auto-initialize = on
            }
          }
          snapshot-store {
            plugin = "akka.persistence.snapshot-store.sql-server"
            sql-server {
              class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
              connection-string = "Data Source=(localdb)\\MSSQLLocalDB;Initial Catalog=LearningAkka;Integrated Security=True;Connect Timeout=30;Encrypt=False;TrustServerCertificate=True;ApplicationIntent=ReadWrite;MultiSubnetFailover=False"
              schema-name = dbo
              table-name = Snapshot
              auto-initialize = on
            }
          }
        }
      }

有人可以评论一下这里的行为吗?谢谢。

【问题讨论】:

  • 如果在重播事件时无法获得演员参考,IActorRef 将指向死信。您可能会看到,如果您将 akka.loglevel 增加到 DEBUG,通过该引用发送的消息将到达那里。
  • 这正是我所期望的,但不是我所看到的;如上所示,IActorRef 似乎指向了自创建新角色系统以来创建的不相关角色。如果我坚持对在第一个参与者系统中创建的第二个参与者的引用,则在恢复时我会获得对在第二个参与者系统中创建的第二个参与者的引用,这依赖于实例化顺序是确定性的。

标签: c# akka.net akka.net-persistence


【解决方案1】:

从序列化数据中可以看出——你的 IActorRef 指向这个地址akka://LearningAkka/user/$b。其中$b 通常用于未命名的演员。所以它永远是你在actor系统根目录中创建的第二个未命名的actor(据我所知)。

所以你是对的 - 系统行为在这里未定义。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-12-06
    • 1970-01-01
    • 1970-01-01
    • 2014-02-05
    • 2018-01-20
    • 1970-01-01
    相关资源
    最近更新 更多