【问题标题】:Queue or other methods to handle tick data?队列或其他方法来处理分时数据?
【发布时间】:2020-01-07 06:56:11
【问题描述】:

在我们的电子交易系统中,我们需要根据 100+ 份合约的分时数据进行计算。

一条消息中没有收到合约的报价数据。一条消息仅包含一份合约的报价数据。合同的时间戳略有不同(有时差异很大,但我们忽略这种情况)。

eg: (first column is timestamp. Second is contract name)
below 2 data has 1ms diff
10:34:03.235,10002007,510050C2006A03500  ,0.0546
10:34:03.236,10001909,510050C2003A02750  ,0.3888

below 2 data has 3ms diff
10:34:03.594,10002154,510300C2003M03700  ,0.4985
10:34:03.597,10002118,510300C2001M03700  ,0.4514

只有价格变化的才会有数据。因此,我无法计算合同编号来了解我是否已收到此报价的所有数据。

但另一方面,我们不想等到收到所有的tick数据,因为有时数据可能会延迟很长时间,我们会想要排除它们。

需要低延迟。所以我认为我们将定义一个窗口——比如 50 毫秒——并开始根据我们在过去 50 毫秒内收到的任何数据进行计算。

处理此类用例的最佳方法是什么?

本来我想用redis流来维护一个小队列,每当收到一个合约的数据时,我都会把它推送到redis流中。但我不知道在特定时间(比如 50 毫秒)过去后立即提取数据的最佳方式是什么。

我正在考虑也许我应该使用其他一些技术? 任何建议表示赞赏。

【问题讨论】:

  • 不确定我是否关注。听起来 Redis 流应该可以解决问题。您能否详细说明您在使用 Redis 流时遇到了什么问题?
  • 其实我对redis流还不是很熟悉。我知道它像队列一样工作,但我怎样才能让它像这样工作:检查队列中的消息,如果(最新时间戳 - 最新时间戳)> 阈值,读取所有这些消息? redis流支持吗?

标签: redis time-series queue


【解决方案1】:

使用XRANGE myStream - + COUNT 1 获取第一个条目。

使用XREVRANGE myStream + - COUNT 1 获取最后一个条目。

XINFO STREAM myStream 也带来了第一个和最后一个条目,但the docs 说它是O(log N)

假设您使用时间戳作为 ID 或字段,那么您可以计算时间差。

如果您使用 Redis Streams 自动 ID (XADD myStream * ...),则 ID 的第一部分是以毫秒为单位的 UNIX 时间戳。

假设上述情况,您可以使用 Lua 脚本原子地进行检查:

EVAL "local first = redis.call('XRANGE', KEYS[1], '-', '+', 'COUNT', '1') local firstTime = {} if next(first) == nil then     return redis.error_reply('Stream is empty or key doesn`t exist') end for str in string.gmatch(first[1][1], '([^-]+)') do     table.insert(firstTime, tonumber(str)) end local last = redis.call('XREVRANGE', KEYS[1], '+', '-', 'COUNT', '1') local lastTime = {} for str in string.gmatch(last[1][1], '([^-]+)') do     table.insert(lastTime, tonumber(str)) end local ms = lastTime[1] - firstTime[1] if ms >= tonumber(ARGV[1]) then     return redis.call('XRANGE', KEYS[1], '-', '+') else     return redis.error_reply('Only '..ms..' ms') end" 1 myStream 50

参数是numKeys(1 here) streamKey timeInMs(50 here): 1 myStream 50

这里是 Lua 脚本的友好视图:

local first = redis.call('XRANGE', KEYS[1], '-', '+', 'COUNT', '1')
local firstTime = {}
if next(first) == nil then
    return redis.error_reply('Stream is empty or key doesn`t exist')
end
for str in string.gmatch(first[1][1], '([^-]+)') do
    table.insert(firstTime, tonumber(str))
end
local last = redis.call('XREVRANGE', KEYS[1], '+', '-', 'COUNT', '1')
local lastTime = {}
for str in string.gmatch(last[1][1], '([^-]+)') do
    table.insert(lastTime, tonumber(str))
end
local ms = lastTime[1] - firstTime[1]
if ms >= tonumber(ARGV[1]) then
    return redis.call('XRANGE', KEYS[1], '-', '+')
else
    return redis.error_reply('Only '..ms..' ms')
end

返回:

  • (error) Stream is empty or key doesn`t exist
  • (error) Only 34 ms 如果我们没有经过所需的时间
  • 如果第一条消息和最后一条消息之间的所需时间已过,则为实际条目列表。

请务必查看 Introduction to Redis Streams 以熟悉 Redis Streams,并查看 EVAL command 以了解 Lua 脚本。

【讨论】:

  • 非常感谢。我认为这将能够满足我的要求!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-10-06
  • 2018-04-24
  • 2011-03-03
  • 1970-01-01
相关资源
最近更新 更多