【问题标题】:Implementing fanout strategy using senecajs使用 senecajs 实现扇出策略
【发布时间】:2017-06-04 23:59:14
【问题描述】:

我正在使用 senecajs 开发一个 node.js 社交网络应用程序,并且需要实现一个生产者可以向多个消费者发送相同消息的场景。我发现一篇文章似乎说明了使用 senecajs 完成此操作的示例代码。问题是我正在尝试将其转换为我的场景,这是本文中的示例(https://github.com/senecajs/seneca-amqp-transport/issues/27):

我有 1 个客户端向 2 个侦听器发布事件。

客户:

.client({
type: 'amqp',
pin: 'incomingMessage:*',
url: process.env.AMQP_URL,
exchange: {
  name: process.env.NODE_ENV + ':events',
  type: 'fanout'
}
});

听众:

.listen({
type: 'amqp',
pin:  'incomingMessage:*', //maybe useless
url:  process.env.AMQP_URL,
name: process.env.NODE_ENV + ':service1',
exchange: {
  name: process.env.NODE_ENV + ':events',
  type: 'fanout'
}
});

.listen({
type: 'amqp',
pin:  'incomingMessage:*', //maybe useless?
url:  process.env.AMQP_URL,
name: process.env.NODE_ENV + ':service2',
exchange: {
  name: process.env.NODE_ENV + ':events',
  type: 'fanout'
}
});

有几项令人困惑:

  1. 对于客户端设置,名称似乎最终会变成“development:events”或“production:events”。我的想法正确吗?

  2. 对于exchange对象外的监听器的name字段,这个字段的作用是什么?

  3. 当我调用 add 方法时,我需要传入一个名称,该名称映射到侦听器收到消息时进行的函数调用,我是否会将“incomingMessage:*”传递给 add 调用?

  4. 这段代码是否真的可以使用 senecajs 有效地提供扇出功能?

【问题讨论】:

    标签: node.js node-amqp seneca


    【解决方案1】:

    我已经测试了样本。 (感谢 GitHub 上的作者)

    1、通常NODE_ENV环境变量指的是开发、生产、登台等。 它可以是任何东西(比如“apple”),但诀窍是这个“name”属性指的是“queue”名称,它对于每个侦听器/订阅者来说必须是唯一的。

    2, 就像我上面提到的,这个“名称”属性(在交换对象之外)用于队列的名称。 每个侦听器必须是唯一的。每个侦听器都有自己的队列,该队列绑定到交换器。

    3, 可以是任何东西。但这也很棘手。

    对于将“发布”到扇出交换的客户端,它需要在选项中具有与被调用操作的模式相同的引脚。例如,如果客户端会调用:

    seneca.act('event:orderReceived', optionalPayload)
    

    那么引脚必须是:

    {
        pin: 'event:orderReceived',
        ...
    }
    

    请参阅 Seneca API docs 了解模式的工作原理。

    对于侦听器,在这种情况下(对于 RPC,这不是真的)他们不必匹配 pin。当您调用 seneca.add 函数时,只有模式必须匹配(有点奇怪)。例如:

    seneca.add('event:orderReceived', function (msg, respond) {
        // some logic
    })
    

    pin 属性可以是:

    {
         pin: 'something:different'
    }
    

    4、最后一个问题的答案是正确的,它可以提供发布/订阅功能。我已经测试过了。

    请注意,如果您在“发布”交换时没有将回调传递给 seneca.act,那么它不会等待任何响应。例如:

    seneca.act('event:orderReceived', { some: "payload" })
    

    还请注意,如果您不需要响应,则无论如何都必须在侦听器服务中使用null 为第一个(错误)参数调用响应回调,否则它将超时。请参阅下面的示例。

    这是我的代码版本:

    出版商

    const seneca = require('seneca')
    const si = seneca()
    
    si
        .use('seneca-amqp-transport')
    
        .client({
            type: 'amqp',
            pin: 'event:orderReceived',
            url: 'amqp://localhost:5672',
            exchange: {
                name: 'order-service',
                type: 'fanout'
            }
        })
        .ready( function() {
            si.log.info('order-service application is running...')
            si.act('event:orderReceived', {})
        })
    
    process.on('SIGTERM', () => {
        si.log.info('Got SIGTERM. Graceful shutdown start')
        si.act('role:seneca,cmd:close')
    })
    

    订阅者 1

    const seneca = require('seneca')
    const si = seneca()
    
    si
        .use('seneca-amqp-transport')
    
        .add('event:orderReceived', function(msg, respond) {
            si.log.info(msg)
            respond(null)
        })
    
        .listen({
            type: 'amqp',
            pin:  'some:pin1',
            url:  'amqp://localhost:5672',
            name: 'delivery-service-queue',
            exchange: {
                name: 'order-service',
                type: 'fanout'
            }
        })
    
        .ready( function() {
            si.log.info('delivery-service application is running...')
        })
    
    process.on('SIGTERM', () => {
        si.log.info('Got SIGTERM. Graceful shutdown start')
        si.act('role:seneca,cmd:close')
    })
    

    订阅者 2

    const seneca = require('seneca')
    const si = seneca()
    
    si
        .use('seneca-amqp-transport')
    
        .add('event:orderReceived', function(msg, respond) {
            si.log.info(msg)
            respond(null)
        })
    
        .listen({
            type: 'amqp',
            pin:  'some:pin2',
            url:  'amqp://localhost:5672',
            name: 'financial-service-queue',
            exchange: {
                name: 'order-service',
                type: 'fanout'
            }
        })
    
        .ready( function() {
            si.log.info('financial-service application is running...')
        })
    
    process.on('SIGTERM', () => {
        si.log.info('Got SIGTERM. Graceful shutdown start')
        si.act('role:seneca,cmd:close')
    })
    

    我不确定这是否正确,但似乎可行。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-04-16
      • 2011-11-23
      • 1970-01-01
      • 1970-01-01
      • 2022-07-12
      • 2021-11-16
      • 2021-07-16
      • 1970-01-01
      相关资源
      最近更新 更多