【发布时间】:2020-06-22 02:48:30
【问题描述】:
我正在开发一个将数据发送到烧瓶后端的应用程序。然后烧瓶将接收到的数据插入到弹性搜索中。在插入到弹性搜索之前,它会检查 id 是否存在。如果 id 存在,则更新,否则将插入索引。
示例代码:
from flask import Flask
from flask import jsonify, request
import jwt
from elasticsearch import Elasticsearch
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello():
try:
id = request.form['id']
database = "sample"
es =Elasticsearch("localhost",port = 9200)
cols=es.search(index=database, body={ "query": { "match": { "id": id}}})
present =False
if cols['hits']['hits']:
x1=cols['hits']['hits'][0]['_source']
eid = cols['hits']['hits'][0]['_id']
present =True
if present == False:
newvalues = {"url":"hello",'id':id}
es.index(index=database, doc_type="logs", body=newvalues)
else: #if already there append data
newvalues ={}
es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})
return jsonify({'status': 'success'})
except jwt.InvalidTokenError as e:
print(e)
return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if __name__=="__main__":
try:
app.run(host="localhost",port=5005,debug=True,processes =1)
except Exception as e:
print("exception in test",e)
这里的问题是,请求每 5 秒从前端发送一次。所以它有时会发生冲突,(即)每当收到带有 id 的请求并且同时发生 id 的插入过程时。第二个请求假定 id 不存在于数据库中,因此它还会在索引中插入 2 个具有相同 id 的数据。我应该怎么做才能一次插入一个而另一个应该等待?
python - 3.6
已编辑: 尝试使用信号量:
from flask import Flask
from flask import jsonify, request
import jwt
from elasticsearch import Elasticsearch
import threading
sLock = threading.Semaphore()
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello():
sLock.acquire()
try:
id = request.form['id']
database = "sample"
es =Elasticsearch("localhost",port = 9200)
cols=es.search(index=database, body={ "query": { "match": { "id": id}}})
present =False
if cols['hits']['hits']:
x1=cols['hits']['hits'][0]['_source']
eid = cols['hits']['hits'][0]['_id']
present =True
if present == False:
newvalues = {"url":"hello",'id':id}
es.index(index=database, doc_type="logs", body=newvalues)
else: #if already there append data
newvalues ={}
es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})
sLock.release()
return jsonify({'status': 'success'})
except jwt.InvalidTokenError as e:
print(e)
return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if __name__=="__main__":
try:
app.run(host="localhost",port=5005,debug=True,processes =1)
except Exception as e:
print("exception in test",e)
提前致谢!
【问题讨论】:
-
欢迎来到并发的美妙世界。防止对共享资源的并发访问通常使用locks。
-
@brunodesthuilliers 是的,我尝试使用信号量,但它只允许第一个请求并阻止所有其他请求,即使第一个请求已完成。
-
那你为什么不发布你的代码并尝试使用信号量呢?您当然需要 1/ 确保释放锁 2/ 如果资源已被锁定,请等到锁被释放。
-
@brunodesthuilliers 编辑了我的帖子,是的,我在处理后发布了它。
-
好的,这里有三个问题。第一个,wsgi 应用程序通常部署为多个并发进程,因此线程(仅影响当前进程)在生产中不起作用。您需要一个适用于并发进程的锁定系统。第二个:如果你无法获得锁,你想等待它,而不是退出。最后,当您获得锁时,您要确保始终释放锁无论发生什么 - 如果发生异常,您不会释放它。
标签: python python-3.x elasticsearch flask process