【发布时间】:2017-05-09 12:01:38
【问题描述】:
我正在尝试使用 RabbitMQ 和 Servicestack(使用 .NET Core 的 v1.0.41)发送流。
我的Request实现了ServiceStack.Web.IRequiresRequestStream,并且在客户端设置了stream属性,但是当它到达服务器时,stream是NULL。
完成回购
服务器代码:
using System;
using System.IO;
using System.Threading.Tasks;
using Funq;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.DependencyInjection;
using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;
namespace Server
{
class Program
{
public static void Main(string[] args)
{
IWebHost host = new WebHostBuilder()
.UseServer(new RabbitServer())
.UseStartup<Startup>()
.Build();
host.Run();
}
}
public class RabbitServer : IServer
{
public void Dispose(){}
public void Start<TContext>(IHttpApplication<TContext> application){}
public IFeatureCollection Features { get; } = new FeatureCollection();
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddLogging();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.UseServiceStack((AppHostBase)Activator.CreateInstance<AppHost>());
app.Run((RequestDelegate)(context => (Task)Task.FromResult<int>(0)));
}
}
public class AppHost : AppHostBase
{
public AppHost()
: base("My Test Service", typeof(MyService).GetAssembly())
{
}
public override void Configure(Container container)
{
var mqServer = new RabbitMqServer("127.0.0.1");
container.Register<IMessageService>(mqServer);
mqServer.RegisterHandler<HelloRequest>(ExecuteMessage);
mqServer.Start();
}
}
public class MyService : Service
{
public HelloResponse Any(HelloRequest request)
{
Console.WriteLine($"Stream is null: {request.RequestStream == null}");
return new HelloResponse { Counter = request.Counter };
}
}
public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
{
public int Counter { get; set; }
public Stream RequestStream { get; set; }
}
public class HelloResponse
{
public int Counter { get; set; }
}
}
客户代码:
using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;
using System;
using System.IO;
using System.Text;
namespace Client
{
class Program
{
static void Main(string[] args)
{
RabbitMqServer messageService = new RabbitMqServer("127.0.0.1");
RabbitMqQueueClient mqClient = messageService.MessageFactory.CreateMessageQueueClient() as RabbitMqQueueClient;
var responseQueueName = mqClient.GetTempQueueName();
MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes("Hello World!")) { Position = 0 };
HelloRequest request = new HelloRequest { Counter = 100, RequestStream = ms }; //Counter is just some arbitary extra data
Guid messageId = Guid.NewGuid();
mqClient.Publish(QueueNames<HelloRequest>.In, new Message<HelloRequest>(request) { ReplyTo = responseQueueName, Id = messageId });
}
}
public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
{
public int Counter { get; set; }
public Stream RequestStream { get; set; }
}
public class HelloResponse
{
public int Counter { get; set; }
}
}
注意:我意识到我可以在我的请求对象中只使用一个 byte[],但我很想使用提供的IRequiresRequestStream 接口,以便将来我可以切换回使用 HTTP 而不是 AMQP。
我还应该说,我可能不会使用 servicestack 提供的 RabbitMQ 客户端,因为我正在编写自定义逻辑以从 HTTP 转换为 AMQP,所以我将手动构建 rabbitMQ 请求 - 上面的代码只是以最简单的方式展示我遇到的问题。
我将假设这不仅仅适用于 AMQP(就像它使用 HTTP 一样) - 所以我认为我需要做一些事情,比如将流序列化为 byte[] 和将它包含在 RabbitMQ 消息中,然后填充 ServiceStack 在服务器上神奇地重新水化的 dto。
所以真的有两个问题...
1. 我在正确的轨道上吗?
2. 如果是这样,我如何挂钩到服务器上的反序列化代码,以便我可以访问原始 RabbitMQ 消息,以便将我的 byte[] 转换回流并在我的 dto 上设置流?
【问题讨论】:
-
添加了关于我的用例的额外信息
-
但从概念上讲,您不能通过rabbit发送流,因为“发送”只是将消息放入队列中,服务器从中使用这些消息。
-
@Evk 是的,我知道 - 但是 ServiceStack 为您处理了很多此类问题,在服务通信和传输之间添加了一层抽象......话虽如此 - 在在这种情况下它没有,这就是为什么我要问是否在队列消息中发送一个 byte[] 是处理该场景的方式,或者是否有其他 ServiceStack 可以为我处理它。跨度>
标签: c# rabbitmq streaming servicestack amqp