【问题标题】:How to Process requests one by one in flask?如何在烧瓶中一一处理请求?
【发布时间】:2020-06-22 02:48:30
【问题描述】:

我正在开发一个将数据发送到烧瓶后端的应用程序。然后烧瓶将接收到的数据插入到弹性搜索中。在插入到弹性搜索之前,它会检查 id 是否存在。如果 id 存在,则更新,否则将插入索引。

示例代码:

from flask import Flask
from flask import  jsonify, request
import jwt
from elasticsearch import Elasticsearch
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello():       
    try:         
        id  = request.form['id']
        database = "sample"
        es =Elasticsearch("localhost",port = 9200)
        cols=es.search(index=database,  body={ "query": {  "match": { "id": id}}})
        present =False
        if cols['hits']['hits']:
            x1=cols['hits']['hits'][0]['_source']
            eid = cols['hits']['hits'][0]['_id']
            present =True        
        if present == False:                          
            newvalues = {"url":"hello",'id':id}
            es.index(index=database, doc_type="logs", body=newvalues)      
        else: #if already there append data                        
            newvalues ={}                           
            es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})            
        return jsonify({'status': 'success'})
    except jwt.InvalidTokenError  as e:
        print(e)
        return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if  __name__=="__main__":
    try:
        app.run(host="localhost",port=5005,debug=True,processes =1)
    except Exception as e:
        print("exception in test",e)

这里的问题是,请求每 5 秒从前端发送一次。所以它有时会发生冲突,(即)每当收到带有 id 的请求并且同时发生 id 的插入过程时。第二个请求假定 id 不存在于数据库中,因此它还会在索引中插入 2 个具有相同 id 的数据。我应该怎么做才能一次插入一个而另一个应该等待?

python - 3.6

已编辑: 尝试使用信号量:

from flask import Flask
from flask import  jsonify, request
import jwt
from elasticsearch import Elasticsearch
import threading
sLock = threading.Semaphore()
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello(): 
    sLock.acquire()  
    try:         
        id  = request.form['id']
        database = "sample"
        es =Elasticsearch("localhost",port = 9200)
        cols=es.search(index=database,  body={ "query": {  "match": { "id": id}}})
        present =False
        if cols['hits']['hits']:
            x1=cols['hits']['hits'][0]['_source']
            eid = cols['hits']['hits'][0]['_id']
            present =True        
        if present == False:                          
            newvalues = {"url":"hello",'id':id}
            es.index(index=database, doc_type="logs", body=newvalues)      
        else: #if already there append data                        
            newvalues ={}                           
            es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})
        sLock.release()            
        return jsonify({'status': 'success'})
    except jwt.InvalidTokenError  as e:
        print(e)
        return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if  __name__=="__main__":
    try:
        app.run(host="localhost",port=5005,debug=True,processes =1)
    except Exception as e:
        print("exception in test",e)

提前致谢!

【问题讨论】:

  • 欢迎来到并发的美妙世界。防止对共享资源的并发访问通常使用locks
  • @brunodesthuilliers 是的,我尝试使用信号量,但它只允许第一个请求并阻止所有其他请求,即使第一个请求已完成。
  • 那你为什么不发布你的代码并尝试使用信号量呢?您当然需要 1/ 确保释放锁 2/ 如果资源已被锁定,请等到锁被释放。
  • @brunodesthuilliers 编辑了我的帖子,是的,我在处理后发布了它。
  • 好的,这里有三个问题。第一个,wsgi 应用程序通常部署为多个并发进程,因此线程(仅影响当前进程)在生产中不起作用。您需要一个适用于并发进程的锁定系统。第二个:如果你无法获得锁,你想等待它,而不是退出。最后,当您获得锁时,您要确保始终释放锁无论发生什么 - 如果发生异常,您不会释放它。

标签: python python-3.x elasticsearch flask process


【解决方案1】:

您可以使用 mget 方法并设置时间阈值。通过这种方式,您不会发送一个时间请求,而是发送一个带有 id 列表的请求 - doc here

from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
from elasticsearch import helpers    

idL = [] # create it before the flask route declaration
threshold = 5 #set a threshold in the same way
now = datetime.now()
delta = timedelta(seconds=30) # set a time threshold of 1 minute

def update(result):
    for success, info in helpers.parallel_bulk(client= es, actions=createUpdateElem(result ):
        if not success:
            print(info)
def index(result):
    for success, info in helpers.parallel_bulk(client= es, actions=createIndexElem(result ):
        if not success:
            print(info)

def createIndexElem(result):
     for elem in result:
         yield {
           '_op_type': 'index',
           '_index': 'database',
           '_id': elem,
           '_source': {'question': 'The life, universe and everything.'}
          }


def createUpdateElem(result):
    for elem in result:
         yield {
           '_op_type': 'update',
           '_index': 'database',
           '_id': elem,
           'doc': {'question': 'The life, universe and everything.'}
          }


def checkResponse(response, idL):
    updateL = []
    for elem in response['docs']:
        if elem['_id'] in idL:
            updateL.append(elem['_id'])
    toBeIndexed = list(set(idL) - set(updateL))
    return toBeIndexed,updateL




def multiget(idL):        
     response = es.mget(index = 'database',body = {'ids': idL})
     doc2BeIndicized = checkResponse(response, idL)
     now = datetime.now()
     idL = []
     return doc2BeIndicized


 @app.route('/test',methods=['POST'])
 def hello():       
    try:  

id  = request.form['id']
idL.append(id)
if len(idL) > threshold:
    result = multiget(idL)
    if result:
        indexed, updated = result
        if updated:
            update(updated)
        if indexed:
            index(indexed)
elif (now + delta) > datetime.now():
    result = multiget(idL)
    if result:
        indexed, updated = result
        if updated:
            update(updated)
        if indexed:
            index(indexed)
else:
     continue

以同样的方式,您可以索引或更新具有批量的文档列表 - 或在服务中更好的并行批量,因为它使用多线程。文档here。请记住,您需要解析 mget 调用的响应,因为在列表中可能只有某些元素存在于 es 中,而其他元素则不存在

【讨论】:

  • 这里现在是什么 + delta delta ??
  • 现在定义或在 python 脚本的开头,或在新的更新/指示之后。我创建了一个 30 秒的时间增量,然后检查 var now + 30 是否小于实际时间。因此,如果文档数量少于阈值数量,则它仅指示 30 秒内 1 次
  • 对不起,我更改了我的 var 名称,它不是 delta。我现在更正
  • 是的,我更正了它,但现在我面临这个错误:不支持的操作数类型 -: 'list' 和 'list'
  • 未插入索引,出现文档丢失错误
猜你喜欢
  • 1970-01-01
  • 2014-07-23
  • 2021-02-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多