【问题标题】:Run code block in parallel while streaming data在流式传输数据时并行运行代码块
【发布时间】:2021-05-12 19:40:45
【问题描述】:

我有一个用于流式传输在线数据的代码块,但与此同时,我希望不时运行另一个代码块以进行分析。

class steaming_price():
def on_open(ws):
    print('opened connection')


def on_close(ws):
    print('closed connection')


def on_message(ws, message):
    global closes, in_position, current_time
    print('received message')
    json_message = json.loads(message)
    #pprint.pprint(json_message)

    candle = json_message['k'] #all kline data
    is_candle_closed = candle['x'] #if its closed
    close = candle['c'] #the Close Price

    #Print the Close Price
    if is_candle_closed:
        print("candle closed at {}".format(close))
        closes.append(float(close))
        current_time.append(datetime.datetime.now())
        print(f"Time:{datetime.datetime.now()} Close:{close}")


ws = websocket.WebSocketApp(SOCKET, on_open=on_open, on_close=on_close, on_message=on_message)
ws.run_forever()

以下应该每 1 分钟运行一次以进行分析

class strategy():
    def __init__(self):
        self.closes = deque(maxlen=500)

    def strategy(self, data):
        self.macd, self.macd_signal, self.macd_hist = talib.MACD(data, fastperiod=12, slowperiod=26, signalperiod=9)
        return self.macd, self.macd_signal, self.macd_hist

第一个代码块每秒都会获取数据。 我想让第二个代码块每 1 分钟并行运行一次以进行分析。例如,计算 20 bar 移动平均线并发出买单。

我考虑过异步,但它只运行一次(我也无法让它工作)。

有什么想法吗?非常感谢

【问题讨论】:

    标签: python websocket async-await


    【解决方案1】:

    简单的解决方案是检查新消息到达自收到上一条消息以来经过了多长时间,然后如果超过 1 分钟,则在同一函数中进行分析。

    您提到的另一个解决方案需要使用asyncio websockets library

    服务器:

    import asyncio
    import json
    from random import random
    
    import websockets
    
    
    async def hello(websocket, _):
        while True:
            value = random() * 50 + 50
            value = json.dumps({"v": value})
            await websocket.send(value)
            print(f"sent {value}")
            await asyncio.sleep(1)
    
    
    start_server = websockets.serve(hello, "localhost", 8080)
    
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
    
    

    客户:

    import asyncio
    import json
    from collections import deque
    
    import websockets
    
    q = deque(maxlen=5)
    
    async def calc_sum():
        while True:
            data = list(q)
            print(f"sum {sum(data)}")
            await asyncio.sleep(3.6)
    
    async def hello():
        uri = "ws://localhost:8080"
        async with websockets.connect(uri) as websocket:
            while True:
                value = await websocket.recv()
                value = json.loads(value)
                print(f"recv {value}")
                q.append(value['v'])
    
    event_loop = asyncio.get_event_loop()
    event_loop.create_task(calc_sum())
    event_loop.run_until_complete(hello())
    
    

    【讨论】:

      猜你喜欢
      • 2021-02-28
      • 1970-01-01
      • 1970-01-01
      • 2014-07-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多