【问题标题】:Receiving BrokeredMessage from ServiceBus from a nodejs server从 nodejs 服务器接收来自 ServiceBus 的 BrokeredMessage
【发布时间】:2020-08-18 16:30:05
【问题描述】:

我正在从新的 nodejs 服务器读取来自现有 Azure ServiceBus 的消息。

这是消息从 .NET 服务器发送到 ServiceBus 的方式:

var topicClient = TopicClient.CreateFromConnectionString(serviceBusConnectionString, routingKey);    
var brokeredMessage = new BrokeredMessage(message.ToJson());
topicClient.Send(brokeredMessage);

其中 topicClient 是 Microsoft.ServiceBus.Messaging.TopicClient

我正在使用以下方法使用 azure-sb 包读取 nodejs 服务器上的消息:

sbClient = ServiceBusClient.createFromConnectionString(connectionString)  
subscriptionClient = this.sbClient.createSubscriptionClient(topicName, subscriptionName);
receiver = this.subscriptionClient.createReceiver(ReceiveMode.receiveAndDelete);
messages = await this.receiver.receiveMessages(10,10);
console.log(messages.map(message => { message.body }));

message.body 是一个缓冲区,当我执行 message.body.toString('utf-8') 时,我得到类似:

@string3http://schemas.microsoft.com/2003/10/Serialization/��{VALID JSON}

我当然对介于两者之间的有效 JSON 感兴趣。 在 .net 服务器中,我们只需执行 brokeredMessage.GetBody() 并获取对象,那么在 nodejs 上是否有一种简单的方法来做同样的事情?

【问题讨论】:

  • 您能告诉我您是如何发送消息的吗?
  • @JimXu - 我已经用相关信息更新了问题,谢谢

标签: node.js azure azureservicebus


【解决方案1】:

根据我的测试,如果我们在.Net应用中使用标准库Microsoft.Azure.ServiceBus发送消息,在节点应用中会直接JSON解析消息

例如

这是我发送消息的 C# 代码:

class Program
{
    static void Main(string[] args)
    {
        string connectionString = "Endpoint=sb://...";
        var client = new TopicClient(connectionString, "");
        var payload = JsonConvert.SerializeObject(new DemoMessage() { Title = $"hello!!! {DateTime.Now}" });
        var serviceBusMessage = new Message(Encoding.UTF8.GetBytes(payload));
        serviceBusMessage.SessionId = Guid.NewGuid().ToString("D");

        client.SendAsync(serviceBusMessage).Wait();

    }

    private class DemoMessage
    {
        public DemoMessage()
        {
        }

        public string Title { get; set; }
    }

这是我接收消息的 Node.js 代码:

const { ServiceBusClient, ReceiveMode } = require("@azure/service-bus");

// Define connection string and related Service Bus entity names here
const connectionString =
  "Endpoint=sb://";
const topicName = "***";
const subscriptionName = "***";

async function main() {
  const sbClient = ServiceBusClient.createFromConnectionString(
    connectionString,
  );
  const subscriptionClient = sbClient.createSubscriptionClient(
    topicName,
    subscriptionName,
  );
  const receiver = subscriptionClient.createReceiver(ReceiveMode.receiveAndDelete);

  try {
    const messages = await receiver.receiveMessages(1);
    console.log("Received messages:");
    console.log(messages.map((message) => message.body));
    await subscriptionClient.close();
  } finally {
    await sbClient.close();
  }
}

main().catch((err) => {
  console.log("Error occurred: ", err);
});

另外,如果你仍然使用库WindowsAzure.ServiceBus,我们需要使用BrokeredMessage(Stream messageBodyStream, bool ownsStream)来初始化一个对象。

因为我们使用BrokeredMessage(platload) 进行初始化,所以它将使用带有二进制 XmlDictionaryWriter 的 DataContractSerializer 来初始化一个对象。因此,负载正在使用带有二进制 XmlDictionaryWriter 的 DataContractSerializer 进行序列化,这就是为什么消息正文在其开头具有类型指示 @string3http://schemas.microsoft.com/2003/10/Serialization/ 的原因。

例如 这是我发送消息的 C# 代码:

var client =TopicClient.CreateFromConnectionString(connectionString, "test");
            var payload = JsonConvert.SerializeObject(new DemoMessage() { Title = $"hello BrokeredMessage!!! {DateTime.Now}" });
            using (Stream stream = new MemoryStream(Encoding.UTF8.GetBytes(payload))) {
                var serviceBusMessage = new BrokeredMessage(stream,true);
                await client.SendAsync(serviceBusMessage);

            }

我使用相同的代码来接收

更多详情请参考here

【讨论】:

  • 感谢您的详尽回答!所以看起来我们的旧实现不能互操作?更改当前实现的选项可能会破坏现有的读者,所以看起来唯一可行的选择是删除消息......
  • @Mihir 我想是的。您还有其他顾虑吗?如果您没有其他顾虑,您可以接受它作为答案吗?
  • 我只想检查是否有内置功能或包已经在执行此操作...因为现在我正在寻找一个开始的 '{' 和一个结束的 '}' 并且它看起来真的很不稳定......
  • @Mithir 你想知道如何从@string3http://schemas.microsoft.com/2003/10/Serialization/��{VALID JSON}读取JSON内容吗?
  • 是的……也许最简单的方法就是我做的那样……
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-08-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-06
  • 1970-01-01
相关资源
最近更新 更多