【发布时间】:2021-07-30 15:43:34
【问题描述】:
我正在使用来自 Kucoin(加密交换)的流 API,它发送 order_book 更新,我正在尝试使用消息处理程序方法处理入站 order_book websocket 消息,然后仅在处理完所有消息后返回,但我认为我设计的模式错误或误解了异步和 websockets。
我想要的是这样的 - 下面的代码不起作用,但出于说明目的,这是我想要实现的,所以请耐心等待我的蜥蜴大脑。
async for message in websocket:
message_handled = handler_method(message) # this updates self.order_book
if websocket.index(message) + 1 == len(websocket): # check if this is final message
return self.order_book
这样做的正确方法是什么,以便所有messages 在返回之前由handler_method() 处理?
编辑:
请查看 websocket 的示例调试日志。
client - event = data_received(<1016 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741271,"change":"2346.05,buy,120","timestamp":1627660760709},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741272,"change":"2345.1,buy,33","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741273,"change":"2346.1,buy,3360","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741274,"change":"2346.15,sell,0","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741275,"change":"2346.85,sell,0","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741276,"change":"2346.05,buy,540","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client - event = data_received(<849 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741277,"change":"2344.8,buy,2145","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741278,"change":"2344.8,buy,1667","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741279,"change":"2346.1,buy,3780","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741280,"change":"2344.8,buy,1189","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741281,"change":"2344.8,buy,711","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client - event = data_received(<1690 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741282,"change":"2346.05,buy,120","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741283,"change":"2338.1,buy,407","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741284,"change":"2360.25,sell,9681","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741285,"change":"2337.8,buy,20","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741286,"change":"2338.1,buy,6","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741287,"change":"2334.1,buy,0","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741288,"change":"2346.1,buy,3360","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741289,"change":"2346.2,buy,120","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741290,"change":"2346.1,buy,3780","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741291,"change":"2354.3,sell,0","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)```
【问题讨论】:
-
流没有“长度”或“结束”。 “处理完所有消息后”是什么意思?流中的最后一条消息到底是什么?如果确实有一些标记,那么这变得相当简单:将所有消息加载到标记,处理它们,返回。 Websockets 本身有这样的标记,
FIN消息。但这是假设您实际使用它。 -
啊 - 对不起。每次接收到数据时,它都会作为一堆单独的帧进入(不确定这是否是正确的词),我想 1)使用
handler_method处理所有这些帧,然后 2)return某事,然后3) 将控制权交还给事件循环。使用日志查看更新的问题,这可能更好地说明数据是如何到达的。 -
那么你知道这些单独的框架是如何形成一个完整的数据集的吗?您知道如何检测到这一点(因为从您向我们展示的内容对我来说并不明显)?那么似乎是什么问题? “回归”是什么意思?使用该设置,您将等待来自给定数据集的所有消息,并在处理完所有消息后发送响应。那是你要的吗?我无法理解您的“return self.order_book”行。 websockets没有“返回”,只有接收和发送,它是一个流。您是否正在使用一些需要“返回”的框架?如果是这样,也许你应该改用别的东西。
-
感谢您的耐心等待 - 所以在上面的示例中,我希望我的
async for循环在处理所有 21 个示例帧的过程中返回 3 次,每次处理完后返回一次。即,处理前 6 帧,然后返回,然后处理下一批 5 帧,然后返回,然后处理接下来的 10 帧,然后返回。如果我只是在async for循环中放置一个 return 语句,它会在接收和处理每个帧后返回,而不是将它们批处理在一起,处理批处理中的每个帧,然后仅在处理完批处理中的所有帧后返回。 -
return,我的意思是正常意义上的 Python 函数,在函数完成时返回结果。
标签: python websocket python-asyncio