【问题标题】:Using ServiceStack and RabbitMQ to send a stream使用 ServiceStack 和 RabbitMQ 发送流
【发布时间】:2017-05-09 12:01:38
【问题描述】:

我正在尝试使用 RabbitMQ 和 Servicestack(使用 .NET Core 的 v1.0.41)发送流。

我的Request实现了ServiceStack.Web.IRequiresRequestStream,并且在客户端设置了stream属性,但是当它到达服务器时,stream是NULL

完成回购
服务器代码:

using System;
using System.IO;
using System.Threading.Tasks;
using Funq;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.DependencyInjection;
using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;

namespace Server
{
    class Program
    {

        public static void Main(string[] args)
        {
            IWebHost host = new WebHostBuilder()
                .UseServer(new RabbitServer())
                .UseStartup<Startup>()
                .Build();

            host.Run();
        }
    }

    public class RabbitServer : IServer
    {
        public void Dispose(){}

        public void Start<TContext>(IHttpApplication<TContext> application){}

        public IFeatureCollection Features { get; } = new FeatureCollection();
    }

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging();
        }

        public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            app.UseServiceStack((AppHostBase)Activator.CreateInstance<AppHost>());
            app.Run((RequestDelegate)(context => (Task)Task.FromResult<int>(0)));
        }

    }

    public class AppHost : AppHostBase
    {
        public AppHost()
            : base("My Test Service", typeof(MyService).GetAssembly())
        {
        }

        public override void Configure(Container container)
        {
            var mqServer = new RabbitMqServer("127.0.0.1");
            container.Register<IMessageService>(mqServer);
            mqServer.RegisterHandler<HelloRequest>(ExecuteMessage);
            mqServer.Start();
        }
    }

    public class MyService : Service
    {
        public HelloResponse Any(HelloRequest request)
        {
            Console.WriteLine($"Stream is null: {request.RequestStream == null}");
            return new HelloResponse { Counter = request.Counter };

        }
    }

    public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
    {
        public int Counter { get; set; }

        public Stream RequestStream { get; set; }
    }

    public class HelloResponse
    {
        public int Counter { get; set; }
    }
}

客户代码:

using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;
using System;
using System.IO;
using System.Text;

namespace Client
{
    class Program
    {

        static void Main(string[] args)
        {
            RabbitMqServer messageService = new RabbitMqServer("127.0.0.1");
            RabbitMqQueueClient mqClient = messageService.MessageFactory.CreateMessageQueueClient() as RabbitMqQueueClient;
            var responseQueueName = mqClient.GetTempQueueName();
            MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes("Hello World!")) { Position = 0 };
            HelloRequest request = new HelloRequest { Counter = 100, RequestStream = ms };  //Counter is just some arbitary extra data
            Guid messageId = Guid.NewGuid();

            mqClient.Publish(QueueNames<HelloRequest>.In, new Message<HelloRequest>(request) { ReplyTo = responseQueueName, Id = messageId });
        }
    }

    public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
    {
        public int Counter { get; set; }
        public Stream RequestStream { get; set; }
    }

    public class HelloResponse
    {
        public int Counter { get; set; }
    }
}

注意:我意识到我可以在我的请求对象中只使用一个 byte[],但我很想使用提供的IRequiresRequestStream 接口,以便将来我可以切换回使用 HTTP 而不是 AMQP。

我还应该说,我可能不会使用 servicestack 提供的 RabbitMQ 客户端,因为我正在编写自定义逻辑以从 HTTP 转换为 AMQP,所以我将手动构建 rabbitMQ 请求 - 上面的代码只是以最简单的方式展示我遇到的问题。

我将假设这不仅仅适用于 AMQP(就像它使用 HTTP 一样) - 所以我认为我需要做一些事情,比如将流序列化为 byte[] 和将它包含在 RabbitMQ 消息中,然后填充 ServiceStack 在服务器上神奇地重新水化的 dto。

所以真的有两个问题...
1. 我在正确的轨道上吗?
2. 如果是这样,我如何挂钩到服务器上的反序列化代码,以便我可以访问原始 RabbitMQ 消息,以便将我的 byte[] 转换回流并在我的 dto 上设置流?

【问题讨论】:

  • 添加了关于我的用例的额外信息
  • 但从概念上讲,您不能通过rabbit发送流,因为“发送”只是将消息放入队列中,服务器从中使用这些消息。
  • @Evk 是的,我知道 - 但是 ServiceStack 为您处理了很多此类问题,在服务通信和传输之间添加了一层抽象......话虽如此 - 在在这种情况下它没有,这就是为什么我要问是否在队列消息中发送一个 byte[] 是处理该场景的方式,或者是否有其他 ServiceStack 可以为我处理它。跨度>

标签: c# rabbitmq streaming servicestack amqp


【解决方案1】:

您不能将 IRequiresRequestStream 请求 DTO 发送到 MQ,因为它不是普通的序列化请求 DTO,而是指示 ServiceStack 跳过反序列化请求 DTO,而是注入 HTTP 请求流,以便服务可以执行自己的相反,反序列化与普通的 Request DTO 不同,后者是序列化的,可以作为 MQ 消息的主体发送。

如果您想在IRequiresRequestStream Service 和可由 MQ 调用的 Service 之间共享实现,一个选择是仅委托给接受字节的公共 Service,例如:

//Called from HTTP
public object Any(HelloStream request) => 
    Any(new HelloBytes { Bytes = request.RequestStream.ReadFully() });

//Called from HTTP or MQ
public object Any(HelloBytes request) 
{
    //= request.Bytes
}

【讨论】:

  • 好吧,我就是这么想的。 ...this is different to a normal Request DTO which is serialized and can be sent as the body in an MQ Message. - 这是否意味着 ServiceStack 具有某种功能可以将流作为 MQ 消息的主体发送来代替 DTO,还是我需要自己编写代码?昨晚我正在修改这个,现在我有一个 dto 基类,它既实现了IRequiresRequestStream,又为byte[] 提供了一个属性,方法是GetBytes(),它返回一个字节数组... [继续]
  • ... 所以我的 http 请求(我转换为 AMQP)仍然具有来自接口的标准 RequestStream 属性,而我进行转换的代码(从 HTTP 到 AMQP)将以字节为单位读取从流中并填充 dto 上的 byte[] 属性。我的服务方法现在具有实现这个基类的请求,并且服务代码总是调用GetBytes(),所以现在我想如果它不适合我可以在将来转储 AMQP。除非 SS 这样做,否则我想不出更好的方法来使用 SS 和 AMQP 通过网络发送流式内容。
  • @Jay 所有 ServiceStack 对 IRequiresRequestStream 所做的事情都没有尝试反序列化它,而是使用 HTTP 请求正文输入流填充 ReqeustStream 属性。 MQ 中没有流式消息,整个消息需要在 MQ 主体中发送。如果您想共享相同的 impl,我已经用一个潜在的解决方案更新了我的答案,否则如果您希望 HTTP impl 做一些不同的事情,则只需使用 2 个服务。
  • 好的,这很酷(我明白你不能在 MQ 中进行流式传输!)所以现在我知道这是我需要使用上面描述的代码为自己解决的问题,或您建议的其他服务方法,将流内容作为 MQ 消息的正文移动。感谢您的宝贵时间!
猜你喜欢
  • 1970-01-01
  • 2013-03-09
  • 2023-03-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-07-23
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多