【问题标题】:How to send data from redis subscriber to express route如何将数据从 redis 订阅者发送到快速路由
【发布时间】:2018-12-20 13:40:12
【问题描述】:

我有一个 redis pubsub 客户端,其中发布者在一个文件中,而订阅者在另一个文件中,工作正常

我有 2 个控制器,一个处理“/”路由的主控制器和一个处理“/data”路由的数据控制器

在我的 redis 订阅者中,我想更新我不断从发布者那里获得的变量的状态

如何在两个控制器发出请求时将此状态发送给他们

我在做

app.get('/', (req, res) => {
    c = redis.createClient()
    c.on("message", (channel, message) => {
    // Send data here
    })
})

这看起来不是一个好主意,它正在为对“/”端点的每个请求创建一个新的客户端

我希望能够做到

// home controller file
app.get('/', (req, res) => {
    res.json(state)
})
// data controller file
app.get('/data', (req, res) => {
    res.json(state)
})

如何实现这个状态

【问题讨论】:

  • 你有没有想过这个问题?

标签: express routes publish-subscribe node-redis


【解决方案1】:

经过一番研究,我决定使用 Node 原生的events module 来解决这个问题。本例使用ioredis而不是node_redis,但原理是一样的。

首先,我实例化了三个 redis 客户端。一个用于常规数据库工作,一个发布者和一个订阅者

/* redis.js */

const Redis = require('ioredis');

const redis = new Redis();
const publisher = new Redis();
const subscriber = new Redis();

// redis is the defaut export
// publisher and subscriber are "named" exports

const client = (module.exports = redis);
client.publisher = publisher;
client.subscriber = subscriber;

接下来,我们在节点中创建一个EventEmitter,每当订阅者从 redis 中的通道接收到消息时,它将发出一个事件。

/* emitter.js */

const EventEmitter = require('events');
const { subscriber } = require('./redis');

const eventEmitter = new EventEmitter();

subscriber.subscribe('my-channel', err => {
  if (err) { return console.log('Unable to subscribe to my-event channel') };
  console.log('Subscription to my-event channel successful');
});

subscriber.on('message', (channel, message) => {
  eventEmitter.emit('my-event', message);
});

module.exports = eventEmitter;

这里我们有两条路线。第一个处理一个 PUT 请求,该请求在 redis 中设置一个字段,然后使用更新的哈希键将消息发布到通道。第二条路由处理保持打开状态的 GET 请求(例如,作为 SSE 连接的 EventSource)。它监听来自发射器的事件,然后从 redis 发送更新后的 key 的数据

/* route.js*/

const express = require('express');
const redis = require('./redis');
const { publisher } = require('./redis');
const { eventEmitter } = require('./emitter');

const router = express.Router();

router.put('/content', async (req, res) => {
  const { key, field, content } = req.body;
  try {
    await redis.hset(key, field, content);
    res.sendStatus(200);
    return publisher.publish('my-channel', key);

  } catch(err) {
     res.status(500).send(err.message);
  }  
});

router.get('/content-stream', (req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    Connection: 'keep-alive'
  });

  res.write('\n');

  const handleEvent = async key => {
     try {
      const query = await redis.hgetall(key);
      res.write(`data: ${JSON.stringify(query)}\n\n`);
    } catch(err) {
      console.log('Something went wrong');
    }
  }

  eventEmitter.addListener('my-event', handleEvent);
  req.on('close', eventEmitter.removeListener('my-event', handleEvent));

module.exports = router;

这将有效地让您避免在每次连接时都实例化新的 redis 客户端。可能有更好的方法可以做到这一点,但这对我有用。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多