【问题标题】:Fetching data in realtime from database in python从python中的数据库实时获取数据
【发布时间】:2021-03-10 07:35:42
【问题描述】:

我在 Python 中有一个用于多处理的类,它创建了 3 个不同的进程。第一个过程用于检查我的硬件是否有任何信号并将其推送到队列中,第二个过程用于将数据从队列中取出并将其推送到数据库中,第三个过程用于将数据从数据库中取出并将其推送到服务器上。

obj = QE()
stdFunct = standardFunctions()

watchDogProcess = multiprocessing.Process(target=obj.watchDog)
watchDogProcess.start()
        
pushToDBSProcess = multiprocessing.Process(target=obj.pushToDBS)
pushToDBSProcess.start()
        
pushToCloud = multiprocessing.Process(target=stdFunct.uploadCycleTime)
pushToCloud.start()
        
watchDogProcess.join()
pushToDBSProcess.join()
pushToCloud.join()

我的前两个进程运行良好,但我正在努力处理第三个进程。以下是我的第三个流程的代码:

def uploadCycleTime(self):
    while True:
        uploadCycles = []
        lastUpPointer = "SELECT id FROM lastUploaded"
        lastUpPointer = self.dbFetchone(lastUpPointer)
        lastUpPointer = lastUpPointer[0]
        # print("lastUploaded :"+str(lastUpPointer))
        cyclesToUploadSQL = "SELECT id,machineId,startDateTime,endDateTime,type FROM cycletimes WHERE id > "+str(lastUpPointer)
        cyclesToUpload = self.dbfetchMany(cyclesToUploadSQL,15)
        cyclesUploadLength = len(cyclesToUpload)
        if(cyclesUploadLength>0):
            for cycles in cyclesToUpload:
                uploadCycles.append({"dataId":cycles[0],"machineId":cycles[1],"startDateTime":cycles[2].strftime('%Y-%m-%d %H:%M:%S.%f'),"endDateTime":cycles[3].strftime('%Y-%m-%d %H:%M:%S.%f'),"type":cycles[4]})        
            # print("length : "+str(cyclesUploadLength))
            lastUpPointer = uploadCycles[cyclesUploadLength-1]["dataId"]
            
            uploadCycles = json.dumps(uploadCycles)
            api = self.dalUrl+"/cycle-times"
            uploadResponse = self.callPostAPI(api,str(uploadCycles))
            print(lastUpPointer)
            changePointerSQL = "UPDATE lastUploaded SET id="+str(lastUpPointer)
            try:
                changePointerSQL = self.dbAbstraction(changePointerSQL)
            except Exception as errorPointer:
                print("Pointer change Error : "+str(errorPointer))
            time.sleep(2)

现在我正在保存一个指针来记住最后上传的 id,并从那里继续上传 15 个数据包。当数据库中存在数据时,代码运行良好,但是如果在启动进程并随后发送数据时没有存在,则它无法从数据库中获取数据。

我尝试实时打印长度,它一直给我 0,尽管数据不断被实时推送到数据库中。

【问题讨论】:

    标签: python mysql sql python-3.x python-2.7


    【解决方案1】:

    在我的上传过程中,我错过了一次 commit()

    def dbFetchAll(self,dataString):
        # dataToPush = self.cycletimeQueue.get()
        # print(dataToPush)
        dbTry = 1
        try:
            while(dbTry == 1): # This while is to ensure the data has been pushed
                sql = dataString
                self.conn.execute(sql)
                response = self.conn.fetchall()
                dbTry = 0
                return response
                # print(self.conn.rowcount, "record inserted.")
        except Exception as error:
            print ("Error : "+str(error))
            return dbTry
    
        ***finally:
            self.mydb.commit()***
    

    【讨论】:

      猜你喜欢
      • 2019-01-17
      • 1970-01-01
      • 1970-01-01
      • 2021-03-04
      • 1970-01-01
      • 1970-01-01
      • 2019-07-12
      • 2019-09-30
      • 2011-08-26
      相关资源
      最近更新 更多