定时任务统计查询
1.无论是定时任务还是es本身都有可能被异常停止服务
当服务恢复正常的时候进行补偿查询 把服务中断期间的数据进行统计查询
2.单次统计也可能由于程序本身和网络连接问题发生异常
把异常的统计时间段存入异常表 每次查询都从表中取出所有异常查询点 不断查询直到返回成功
#!/usr/bin/env python # -*- coding: utf-8 -*- import datetime import time import sys import os import json sys.path.append('../') from common import esConfig from common import timetools from common import calastTime from common.loggerConfig import loggerConfig from common import constConfig from common import dbtools indexname_prefix="sms-service-" logfile="../log/sms_service.log" logpath=os.path.abspath(logfile) log=loggerConfig("smsService",logpath) logger=log.create_logger() body={"aggs":{"3":{"terms":{"field":"APP_ID","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"4":{"terms":{"field":"code","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"5":{"terms":{"field":"CHANNEL_ID","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"6":{"terms":{"field":"SWJG_DM","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"7":{"terms":{"field":"STATUS","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"8":{"date_histogram":{"field":"mydate","interval":"1h","time_zone":"Asia/Shanghai","min_doc_count":1},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}}}}}}}}}}}}}}},"size":0,"stored_fields":["*"],"docvalue_fields":[{"field":"@timestamp","format":"date_time"},{"field":"mydate","format":"date_time"}],"query":{"bool":{"must":[{"match_phrase":{"metricsName":{"query":"SmsService"}}},{"range":{"mydate":{"gte":0,"lte":0,"format":"epoch_millis"}}}]}},"timeout":"30000ms"} def handle_res(res,index_date): outlist=[] dnow=datetime.datetime.now().strftime('%Y-%m-%d %H')+":00:00" for i3 in res["aggregations"]["3"]["buckets"]: for i4 in i3["4"]["buckets"]: for i5 in i4["5"]["buckets"]: for i6 in i5["6"]["buckets"]: for i7 in i6["7"]["buckets"]: for i8 in i7["8"]["buckets"]: timestr = i8["key_as_string"][:-6] newtime = timetools.formartTime(timestr) outlist.append({"appId":i3["key"],"code":i4["key"],"channelId":i5["key"],"swjdDm":i6["key"],"status":i7["key"],"fszl":i8["doc_count"],"fsl":i8["1"]["value"],"createTime":newtime,"statisticalTime":dnow}) indexname = indexname_prefix + index_date if esConfig.es.indices.exists(index=indexname): logger.info("%s 索引已经存在" % (indexname), extra={'result': '', "querytime": ""}) else: esConfig.es.indices.create(index=indexname) logger.info("%s 索引被成功创建" % (indexname), extra={'result': '', "querytime": ""}) for data in outlist: res2=esConfig.es.index(index=indexname, doc_type="doc", body=data) logger.info("%s被成功添加到%s索引当中" %(json.dumps(data),indexname),extra={'result': 'data', "querytime": ""}) def handler_hours(hours): retryCount = constConfig.MAX_RETRY cald=calastTime.CalcDaysHours(beginTime=None,endTime=None,logger=logger) for i in hours: while retryCount >= 1: try: index_date = i.strftime("%Y-%m-%d") index_hour = i.strftime("%H") res = cald.get_data_from_index(i,body) handle_res(res, index_date) # 日志中记录最后一次成功统计的时间 最后统计成功时间不一定是当前时间 logger.info("成功统计%d条记录" % (res['hits']['total']), extra={'result': 'success', "querytime": i}) successtask = dbtools.Task(name='smsService',time=i,status=1) dbtools.TaskOps().delete_FailTask(name='smsService',time=i) dbtools.TaskOps().Add_Task(successtask) time.sleep(10) break except Exception as e: retryCount -= 1 if retryCount > 0: logger.error("%d次统计发生异常%s,还剩%d次重试" % (retryCount + 1, str(e), retryCount), extra={'result': 'fail', "querytime": i}) time.sleep(60) continue else: failtask = dbtools.Task(name='smsService', time=i, status=0) dbtools.TaskOps().delete_FailTask(name='smsService',time=i) dbtools.TaskOps().Add_Task(failtask) logger.error("%d次统计全部失败,结束异常统计" % (constConfig.MAX_RETRY), extra={'result': 'fail', "querytime": i}) retryCount = constConfig.MAX_RETRY break pro=calastTime.ProccessData("smsService",body,logger) logger.info("开始统计补偿任务",extra={'result': 'start', "querytime": ""}) lost_hours=pro.run_buchang() handler_hours(lost_hours) logger.info("补偿任务统计结束",extra={'result': '', "querytime": ""}) logger.info("开始重新统计失败任务",extra={'result': '', "querytime": ""}) error_hours=pro.run_failtask() handler_hours(error_hours) logger.info("失败任务统计结束",extra={'result': 'end', "querytime": ""})