定时任务统计查询

     1.无论是定时任务还是es本身都有可能被异常停止服务

         当服务恢复正常的时候进行补偿查询  把服务中断期间的数据进行统计查询

     2.单次统计也可能由于程序本身和网络连接问题发生异常

         把异常的统计时间段存入异常表 每次查询都从表中取出所有异常查询点 不断查询直到返回成功

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import datetime
import time
import sys
import os
import json
sys.path.append('../')

from common import esConfig
from common import timetools
from common import calastTime
from common.loggerConfig import loggerConfig
from common import constConfig
from common import dbtools


indexname_prefix="sms-service-"
logfile="../log/sms_service.log"
logpath=os.path.abspath(logfile)
log=loggerConfig("smsService",logpath)
logger=log.create_logger()



body={"aggs":{"3":{"terms":{"field":"APP_ID","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"4":{"terms":{"field":"code","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"5":{"terms":{"field":"CHANNEL_ID","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"6":{"terms":{"field":"SWJG_DM","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"7":{"terms":{"field":"STATUS","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"8":{"date_histogram":{"field":"mydate","interval":"1h","time_zone":"Asia/Shanghai","min_doc_count":1},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}}}}}}}}}}}}}}},"size":0,"stored_fields":["*"],"docvalue_fields":[{"field":"@timestamp","format":"date_time"},{"field":"mydate","format":"date_time"}],"query":{"bool":{"must":[{"match_phrase":{"metricsName":{"query":"SmsService"}}},{"range":{"mydate":{"gte":0,"lte":0,"format":"epoch_millis"}}}]}},"timeout":"30000ms"}

def handle_res(res,index_date):
    outlist=[]
    dnow=datetime.datetime.now().strftime('%Y-%m-%d %H')+":00:00"
    for i3 in res["aggregations"]["3"]["buckets"]:
      for i4 in i3["4"]["buckets"]:
        for i5 in i4["5"]["buckets"]:
          for i6 in i5["6"]["buckets"]:
            for i7 in i6["7"]["buckets"]:
                for i8 in i7["8"]["buckets"]:
                    timestr = i8["key_as_string"][:-6]
                    newtime = timetools.formartTime(timestr)
                    outlist.append({"appId":i3["key"],"code":i4["key"],"channelId":i5["key"],"swjdDm":i6["key"],"status":i7["key"],"fszl":i8["doc_count"],"fsl":i8["1"]["value"],"createTime":newtime,"statisticalTime":dnow})
    indexname = indexname_prefix + index_date
    if esConfig.es.indices.exists(index=indexname):
        logger.info("%s 索引已经存在" % (indexname), extra={'result': '', "querytime": ""})
    else:
        esConfig.es.indices.create(index=indexname)
        logger.info("%s 索引被成功创建" % (indexname), extra={'result': '', "querytime": ""})

    for data in outlist:
        res2=esConfig.es.index(index=indexname, doc_type="doc", body=data)
        logger.info("%s被成功添加到%s索引当中" %(json.dumps(data),indexname),extra={'result': 'data', "querytime": ""})

def handler_hours(hours):
    retryCount = constConfig.MAX_RETRY
    cald=calastTime.CalcDaysHours(beginTime=None,endTime=None,logger=logger)
    for i in hours:
        while retryCount >= 1:
            try:
                index_date = i.strftime("%Y-%m-%d")
                index_hour = i.strftime("%H")
                res = cald.get_data_from_index(i,body)
                handle_res(res, index_date)
                # 日志中记录最后一次成功统计的时间 最后统计成功时间不一定是当前时间
                logger.info("成功统计%d条记录" % (res['hits']['total']), extra={'result': 'success', "querytime": i})
                successtask = dbtools.Task(name='smsService',time=i,status=1)
                dbtools.TaskOps().delete_FailTask(name='smsService',time=i)
                dbtools.TaskOps().Add_Task(successtask)
                time.sleep(10)
                break
            except Exception as e:
                retryCount -= 1
                if retryCount > 0:
                    logger.error("%d次统计发生异常%s,还剩%d次重试" % (retryCount + 1, str(e), retryCount),
                                 extra={'result': 'fail', "querytime": i})
                    time.sleep(60)
                    continue
                else:
                    failtask = dbtools.Task(name='smsService', time=i, status=0)
                    dbtools.TaskOps().delete_FailTask(name='smsService',time=i)
                    dbtools.TaskOps().Add_Task(failtask)
                    logger.error("%d次统计全部失败,结束异常统计" % (constConfig.MAX_RETRY), extra={'result': 'fail', "querytime": i})
                    retryCount = constConfig.MAX_RETRY
                    break

pro=calastTime.ProccessData("smsService",body,logger)
logger.info("开始统计补偿任务",extra={'result': 'start', "querytime": ""})
lost_hours=pro.run_buchang()
handler_hours(lost_hours)
logger.info("补偿任务统计结束",extra={'result': '', "querytime": ""})
logger.info("开始重新统计失败任务",extra={'result': '', "querytime": ""})
error_hours=pro.run_failtask()
handler_hours(error_hours)
logger.info("失败任务统计结束",extra={'result': 'end', "querytime": ""})
View Code

相关文章:

  • 2021-08-03
  • 2021-12-04
  • 2021-09-04
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-07-04
猜你喜欢
  • 2021-04-25
  • 2021-12-06
  • 2021-12-13
  • 2021-07-01
  • 2022-12-23
  • 2022-12-23
  • 2021-10-06
相关资源
相似解决方案