【问题标题】:Run a function in background using thread in Flask使用 Flask 中的线程在后台运行函数
【发布时间】:2021-05-19 07:42:37
【问题描述】:

我正在尝试使用烧瓶实现限价订单,我现在正在处理后端部分。我是烧瓶新手,所以我还在学习,我不太了解交易后端的工作原理,但我正在尝试通过这个小项目学习。

我在我的应用程序中创建了 3 个端点,它们添加订单、删除订单并给出订单状态的响应,这三个端点工作正常,用邮递员检查了它们。现在我正在尝试在后台运行一个函数,该函数将不断检查保存所有新订单的 json 文件中的新订单(买/卖)。如果用户的买单与其他用户的卖单匹配,它将一一挑选它们并根据价格找到匹配项,它将处理并将其存储在我想要返回或存储所有成功订单给用户的字典中。 这是我创建的类的代码:

import json
import bisect
import random
import os

class Process(object):

  def __init__(self):

    self.trade_book = []
    self.bid_prices = []
    self.ask_prices = []
    self.ask_book = {}
    self.bid_book = {}
    self.confirm_traded = []
    self.orders_history = {}
    self.traded = False
    self.counter = 0
    

def save_userdata(self,order, newId):
    
    orderid = order['order']['trader'] +"_"+ str(newId)

    user_list = order
    newJson = {
        "orders":[
            { orderid: order['order']}
        ]
    }
    with open('data_user.json', 'a+') as jsonFile:
        with open('data_user.json', 'r') as readableJson:
            try:
                 jsonObj = json.load(readableJson)
            except Exception as e:
                jsonObj = {}
        if jsonObj == {}:
            json.dump(newJson, jsonFile)
        else:
            with open('data_user.json', 'w+') as writeFile:
                exists = False
                for item in jsonObj['orders']:
                    if item.get(orderid, None) is not None:
                        item[orderid] = order['order']
                        exists = True
                        break
                if not exists:
                    jsonObj['orders'].append(newJson['orders'][0])
                json.dump(jsonObj, writeFile)
        
        return orderid


  def get_userdata(self):
    with open('data_user.json', 'r') as readableJson:
        return json.load(readableJson)


  def removeOrder(self, orderid):

    order_id = list(orderid.values())[0]
    with open('data_user.json') as data_file:
        data = json.load(data_file)
    newData = []
    for item in data['orders']:
        if item.get(order_id, None) is not None:
            del item[order_id]
        else:
            newData.append(item)
    data['orders'] = newData
    with open('data_user.json', 'w') as data_file:
        data = json.dump(data, data_file)

    return order_id



 def add_order_to_book(self, order):

    index = list(order.keys())[0]
    book_order = order[index]
    print(index)

    if order[index]['side'] == 'buy':
        book_prices = self.bid_prices
        book = self.bid_book
    
    else: #order[index]['side'] == 'sell'
        book_prices = self.ask_prices
        book = self.ask_book
    
    if order[index]['price'] in book_prices:
        book[order[index]['price']]['num_orders'] += 1
        book[order[index]['price']]['size'] += order[index]['quantity']
        book[order[index]['price']]['order_ids'].append(index)
        book[order[index]['price']]['orders'][index] = book_order
        

    else:
        bisect.insort(book_prices, order[index]['price'])
        book[order[index]['price']] = {'num_orders': 1, 'size': order[index]['quantity'],'order_ids': 
     [index],
                               'orders': {index: book_order}}



def confirm_trade(self,order_id, timestamp, order_quantity, order_price, order_side):
     
    
    trader = order_id.partition('_')[0]
    self.confirm_traded.append({ 'trader': trader,'quantity': order_quantity, 'side': order_side, 
 'price': order_price,
                                'status': 'Successful'})
    
    
    return self.confirm_traded


def process_trade_orders(self, order):
    
    self.traded = False
    index = list(order.keys())[0]
    if order[index]['side'] == 'buy':
        book = self.ask_book
        if order[index]['price'] in self.ask_prices:
            remainder = order[index]['quantity']
            while remainder > 0:
                book_order_id = book[order[index]['price']]['order_ids'][0]
                book_order = book[order[index]['price']]['orders'][book_order_id]

                if remainder >= book_order['quantity']:
                    self.trade_book.append({'order_id': book_order_id, 'timestamp': order[index]['timestamp'], 
                        'price': order[index]['price'],
                        'quantity': order[index]['quantity'], 'side': book_order['side']})
                    self.confirm_trade(index, order[index]['timestamp'], order[index]['quantity'], order[index]['price'], order[index]['side'])
                    self.traded = True

                    remainder = remainder - book_order['quantity']
                    self.save_historty_orders(index, order[index])
                    break
                            

                else:
                    self.traded = True
                    self.trade_book.append({'order_id': index, 'timestamp': order[index]['timestamp'], 
                        'price': order[index]['price'],
                        'quantity': order[index]['quantity'], 'side': order[index]['side']})
                    self.confirm_trade(index, order[index]['timestamp'], order[index]['quantity'], order[index]['price'], order[index]['side'])
                    self.save_historty_orders(index, order[index])
                    break

        else:
            self.add_order_to_book(order)
            self.save_historty_orders(index, order[index])


    
    else: #order['side'] == 'sell'
        book = self.bid_book
        if order[index]['price'] in self.bid_prices:
            remainder = order[index]['quantity']
            while remainder > 0:
                book_order_id = book[order[index]['price']]['order_ids'][0]
                book_order = book[order[index]['price']]['orders'][book_order_id]

                if remainder >= book_order['quantity']:

                    self.trade_book.append({'order_id': book_order_id, 'timestamp': order[index]['timestamp'], 
                        'price': order[index]['price'],
                        'quantity': order[index]['quantity'], 'side': order[index]['side']})
                    self.traded = True
                    self.confirm_trade(index, order[index]['timestamp'], order[index]['quantity'], order[index]['price'], order[index]['side'])

                    remainder = remainder - book_order['quantity']
                    self.save_historty_orders(index, order[index])
                    break


                else:
                    self.traded = True
                    self.trade_book.append({'order_id': book_order_id, 'timestamp': order[index]['timestamp'], 
                        'price': order[index]['price'],
                        'quantity': order[index]['quantity'], 'side': order[index]['side']})
                    self.confirm_trade(index, order[index]['timestamp'], order[index]['quantity'], order[index]['price'], order[index]['side'])
                    self.save_historty_orders(index, order[index])
                    break

        else:
            self.add_order_to_book(order)
            self.save_historty_orders(index, order[index])
            

这个类过程我在我的 app.py 中创建对象并在函数 processing() 中调用函数 process_trade_orders:

    app = Flask(__name__)
    app.config['DEBUG'] = True

    newUser = Process()
    succorder = Success()
    #sched = BackgroundScheduler()

    def generate_orderid():
        num = 0
        while num < 1000:
            yield num
            num = num + 1

    genid = generate_orderid()
  
   proc = Process()
   sucorder = Success()

   #Processing orders to find if they have a match
   def processing():

       get_orders_data = proc.get_userdata()
       print(get_orders_data)
       print("\n")

       for data in get_orders_data['orders']:

           index = list(data.keys())[0]
           if data[index]['status'] == 'Successful':
               sucorder.add_trader_orders(data[index],index)
           else:
               proc.process_trade_orders(data)


   # sched = BackgroundScheduler()
   # sched.add_job(func = processing, trigger="interval", seconds = 2)
   # sched.start()

我确实使用了 APSbackground-scheduler,但我想使用线程。我正在考虑在无限循环中运行一个主线程作为守护进程,并使用工作线程在 app.py 中运行这个函数 processing() ,每隔几秒就会调用一次以检查是否有任何成功的订单,它将返回值到主线程和每个新的字典列表,我可以向用户返回一个响应或其他方式,关于这个成功的订单得到匹配。

请注意,这将以 5 秒的短时间间隔运行,并且将添加多个添加命令,并将持续异步运行检查,因此我不确定如何返回这些值。我只是很困惑,所以如果有人能帮助我,将不胜感激。

【问题讨论】:

    标签: python json flask python-multithreading


    【解决方案1】:

    如果你想创建一个在后台运行的线程函数,只需使用threading 模块,如下所示:

    from threading import Thread
    def bg_func():
        doSomething
    t = Thread(target=bg_func)
    t.start() # will start the function and continue, even if the function still runs
    doSomethingelseAtSameTime # runs with bg_func
    

    你也可以有多个后台线程。

    查看文档了解更多信息。

    【讨论】:

    • 我以前试过这个,但我想与主线程共享我的数据,以便我可以给用户一些响应或返回值。由于此功能将在每 5 秒后连续运行,我担心数据会经常更新,那么我该如何使用它。我在上面提到了一种我正在考虑的方式,比如使用主线程和运行工作线程。请你检查我的问题一次。谢谢!!
    • 我对在烧瓶中实施限价单和初学者也是新手,所以我有点困惑。
    • 如果您使用类或(坏的)全局变量,该函数也可以工作。没有什么是分开的,你只是一次做多件事。
    猜你喜欢
    • 2018-05-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-07
    相关资源
    最近更新 更多