【问题标题】:Why isn't my Akka.NET Stream Subscriber receiving messages?为什么我的 Akka.NET 流订阅者收不到消息?
【发布时间】:2018-02-04 07:43:11
【问题描述】:

我正在尝试编写一个简单的 Akka.NET 流。来源是IActorRef。接收器是ISubscriber。我正在使用 TestKit 将其实现为单元测试:

[Fact]
public void AkkaStreams_ActorSourcePublisherSink_Works()
{
    using (var materializer = Sys.Materializer())
    {
        var probe = CreateTestProbe();
        var source = Source.ActorRef<HandlerErrorEvent>(10, OverflowStrategy.DropNew);
        var subscriber = new Mock<ISubscriber<HandlerErrorEvent>>();
        var sink = Sink.FromSubscriber<HandlerErrorEvent>(subscriber.Object);
        var graph = source.ToMaterialized(sink, Keep.Both);
        var (actor, publisher) = graph.Run(materializer);

        subscriber.Verify(s => s.OnSubscribe(It.IsAny<ISubscription>()));

        var evnt = new HandlerErrorEvent("", HandlerResult.NotHandled);
        actor.Tell(evnt, ActorRefs.Nobody);

        base.AwaitCondition(() =>
        {
            try
            {
                subscriber.Verify(s => s.OnNext(It.IsAny<HandlerErrorEvent>()));
                return true;
            }
            catch(MockException)
            {
                return false;
            }
        });
    }
}

OnSubscribe 方法的初始Verify 调用顺利通过,但模拟订阅者从未收到对OnNext 的调用。

我做错了什么?

netcoreapp2.0 运行。参考资料:

"Akka.TestKit.Xunit2" Version="1.3.2"
"Microsoft.NET.Test.Sdk" Version="15.5.0"
"Moq" Version="4.8.0-rc1"
"xunit" Version="2.3.1"
"xunit.runner.visualstudio" Version="2.3.1"
"dotnet-xunit" Version="2.3.1"

【问题讨论】:

    标签: c# .net-core akka.net akka.net-streams


    【解决方案1】:

    您的 ISubscriber&lt;&gt; 模拟不符合 Reactive Streams specification。它指出,为了在订阅后获得任何消息,订阅者必须首先使用ISubscription.Request(long) 方法传达需求。

    一般来说,如果您使用 Akka.Streams 测试套件,则不需要模拟订阅。只需下载Akka.Streams.TestKit 即可获取 Akka.Streams 的扩展方法。这样,您只需在 TestKit 类中调用 this.CreateManualSubscriberProbe&lt;HandlerErrorEvent&gt;(); 就可以建立一个假订阅者。它包含一个可用于断言的dozens of methods

    例子:

    public class ExampleTest : TestKit
    {
        [Fact]
        public void Select_should_map_output()
        {
            using (var materializer = Sys.Materializer())
            {
                // create test probe for subscriptions
                var probe = this.CreateManualSubscriberProbe<int>();
    
                // create flow materialized as publisher
                var publisher = Source.From(new[] { 1, 2, 3 })
                    .Select(i => i + 1)
                    .RunWith(Sink.AsPublisher<int>(fanout: false), materializer);
    
                // subscribe probe and receive subscription
                publisher.Subscribe(probe);
                var subscription = probe.ExpectSubscription();
    
                // request number of elements to receive, here drain source utill the end
                subscription.Request(4); 
    
                // validate assertions
                probe.ExpectNext(2);
                probe.ExpectNext(3);
                probe.ExpectNext(4);
    
                // since source had finite number of 3 elements, expect it to complete
                probe.ExpectComplete();
            }
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2017-11-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-27
      相关资源
      最近更新 更多