1. 调度对象封装
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys,os,logging
import asyncio
import datetime
from pytz import timezone
from api_monitor.utils import scheduler_config
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.events import EVENT_ALL
def default_func():
pass
class APIScheduler():
def __init__(self,executors=scheduler_config.executors,
jobstores=scheduler_config.jobstores,
job_defaults=scheduler_config.job_defaults):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._sched = AsyncIOScheduler(executors=executors,
jobstores=jobstores,
job_defaults=job_defaults,
timezone=timezone(\'Asia/Shanghai\'))
self.default_id = \'default\'
#设置scheduler默认为运行状态
self._sched.state = 1
# self._sched.add_listener(self.listener,EVENT_ALL)
self._sched._logger = scheduler_config.logger()
#解决job从redis获取后没有scheduler属性问题
for jobstore in jobstores:
jobstores[jobstore]._scheduler = self._sched
def listener(self,event):
if event.exception:
print(\'CHUCUO\')
else:
print(\'正常。。。\')
def add_job(self,**kwargs):
"""
添加任务到数据库中
:param kwargs:
func – callable (or a textual reference to one) to run at the given time
trigger (str|apscheduler.triggers.base.BaseTrigger) – trigger that determines when func is called
args (list|tuple) – list of positional arguments to call func with
kwargs (dict) – dict of keyword arguments to call func with
id (str|unicode) – explicit identifier for the job (for modifying it later)
name (str|unicode) – textual description of the job
misfire_grace_time (int) – seconds after the designated runtime that the job is still allowed to be run
coalesce (bool) – run once instead of many times if the scheduler determines that the job should be run more than once in succession
max_instances (int) – maximum number of concurrently running instances allowed for this job
next_run_time (datetime) – when to first run the job, regardless of the trigger (pass None to add the job as paused)
jobstore (str|unicode) – alias of the job store to store the job in
executor (str|unicode) – alias of the executor to run the job with
replace_existing (bool) – True to replace an existing job with the same id (but retain the number of runs from the existing one)`
:return:
"""
#state=1时才会调用_real_add_job方法添加数据到数据库
job = self._sched.add_job(**kwargs)
def _add_default_job(self,job_id=\'default\',jobstore=\'default\'):
self.add_job(
func=default_func, trigger=\'interval\', seconds=1,
jobstore=jobstore,next_run_time=datetime.datetime.now(),
id=job_id, replace_existing=True, misfire_grace_time=3, coalesce=True,
max_instances=1
)
def remove_job(self,job_id,jobstore=\'default\'):
"""
根据job_id删除任务
:param job_id:
:param jobstore:
:return:
"""
self._sched.remove_job(job_id=job_id,jobstore=jobstore)
def remove_all_jobs(self,jobstore=\'default\',default_jobid=\'default\'):
"""
从jobstore内删除所有除job id为default的任务
:param jobstore:
:param default_jobid:
:return:
"""
valid_job_ids = (job.id for job in self._sched.get_jobs(jobstore=jobstore)
if job.id != default_jobid)
try:
for job_id in valid_job_ids:
self.remove_job(job_id,jobstore)
except AttributeError as e:
print(e.with_traceback())
def pause_job(self,job_id,jobstore=\'default\'):
"""
根据job_id暂停任务
:param job_id:
:param jobstore:
:return:
"""
self._sched.pause_job(job_id=job_id,jobstore=jobstore)
def resume_job(self,job_id,jobstore=\'default\'):
"""
根据job_id唤醒任务
:param job_id:
:param jobstore:
:return:
"""
self._sched.resume_job(job_id=job_id,jobstore=jobstore)
def star(self,add_default_job=True,jobstore=\'default\'):
"""
开始处理任务
:return:
"""
try:
if add_default_job:
self._add_default_job(jobstore=jobstore)
self._sched.state = 0
self._sched.start()
loop = asyncio.get_event_loop()
loop.run_forever()
finally:
self.shutdown()
def shutdown(self,wait=True):
self._sched.shutdown(wait=True)
2. 调度任务存储配置
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
from apscheduler.jobstores.redis import RedisJobStore
executors = {
\'default\':ThreadPoolExecutor(200),
\'processpool\':ProcessPoolExecutor(5)
}
jobstores = {
\'default\':RedisJobStore(db=0,
jobs_key=\'apscheduler.jobs\',
run_times_key=\'apscheduler.run_times\',
host=\'127.0.0.1\',
port=\'6379\',
password=\'\',
),
}
job_defaults = {
\'coalesce\': True,
\'max_instances\': 3
}
def logger(log_file=\'/tmp/scheduler.log\'):
logging.basicConfig(level=logging.INFO,
format=\'%(asctime)s %(levelname)s %(message)s\',
datefmt=\'%Y-%m-%d %H:%M:%S\',
filename=log_file,
filemode=\'a\')
return logging
3.任务函数
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pycurl
from urllib import parse
from io import StringIO,BytesIO
import json
import os,sys
import pymysql
import time
db_host = "xxxxxx"
db_user = "xxxx"
db_passwd = "xxxxxx"
db_database = "xxxxxx"
conn = pymysql.connect(db_host,db_user,db_passwd,db_database)
cursor = conn.cursor()
def curl(url,params,method,header=[],save_data=True,time_out=6,**kwargs):
api_info = {
\'api_monitor_id\': kwargs[\'api_id\'],
\'api_monitor_node_id\': kwargs[\'node_id\'],
}
buffer = BytesIO()
c = pycurl.Curl()
c.setopt(pycurl.HTTPHEADER, header)
c.setopt(pycurl.WRITEDATA, buffer)
# 请求超时时间
c.setopt(pycurl.TIMEOUT, time_out)
if method.lower() == \'get\':
#url = f\'{url}?{parse.urlencode(params)}\'
url = url + \'?\' + parse.urlencode(params)
else:
c.setopt(pycurl.POSTFIELDS, json.dumps(params))
c.setopt(pycurl.URL,url)
try:
c.perform()
api_info[\'err_message\'] = 0
except pycurl.error as e:
api_info[\'err_message\'] = str(e)
finally:
api_info.update(
{
\'http_code\': c.getinfo(pycurl.HTTP_CODE),
\'totle_response_time\': c.getinfo(pycurl.TOTAL_TIME),
\'dns_time\': c.getinfo(pycurl.NAMELOOKUP_TIME),
\'connect_time\': c.getinfo(pycurl.CONNECT_TIME),
\'redriect_time\': c.getinfo(pycurl.REDIRECT_TIME),
\'ssl_time\': c.getinfo(pycurl.APPCONNECT_TIME),
\'size_download\': c.getinfo(pycurl.SIZE_DOWNLOAD),
\'speed_down\': c.getinfo(pycurl.SPEED_DOWNLOAD),
\'content\': buffer.getvalue().decode(\'utf-8\'),
}
)
if save_data:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
sql = """insert into api_monitor_apimonitorhistory (http_code, totle_response_time, dns_time, connect_time, redriect_time,ssl_time,size_download, speed_down, content, err_message, api_monitor_id,api_monitor_node_id, create_time)values({http_code},{totle_response_time},{dns_time},{connect_time},{redriect_time},{ssl_time},{size_download},{speed_down},\'{content}\',{err_message},{api_monitor_id},{api_monitor_node_id},\'{current_time}\')""".format(http_code=api_info.get(\'http_code\'),totle_response_time=api_info.get(\'totle_response_time\'),dns_time=api_info.get(\'dns_time\'),connect_time=api_info.get(\'connect_time\'),
redriect_time=api_info.get(\'redriect_time\'),
ssl_time=api_info.get(\'ssl_time\'),
size_download=api_info.get(\'size_download\'),
speed_down=api_info.get(\'speed_down\'),
content=api_info.get(\'content\'),
err_message=api_info.get(\'err_message\'),
api_monitor_id=api_info.get(\'api_monitor_id\'),
api_monitor_node_id=api_info.get(\'api_monitor_node_id\'),
current_time=current_time)
print(sql)
try:
cursor.execute(sql)
conn.commit()
except Exception as e:
print(e)
conn.rollback()
#ApiMonitorHistory.objects.create(**api_info)
else:
pass
c.close()
4.执行调度任务
server端添加任务到redis
job_obj = scheduler.APIScheduler() # 调用job对象
for node_id in api_monitor_node_id_list:
api_conf = {
"url":monitor_url,
"params": params_data,
"method": request_method,
"header": http_header,
"api_id": str(api_obj_id),
"node_id": node_id
}
job_obj.add_job(id=str(node_id), func=tasks.curl, kwargs=api_conf,
trigger=\'interval\',
seconds=5,next_run_time=datetime.datetime.now(),
replace_existing=True, misfire_grace_time=3, coalesce=True,
max_instances=2,
jobstore=\'default\') # 加入任务到redis, 必须设置scheduler state=1才能加入redis
agent端监听任务
from api_monitor.utils import scheduler
job_obj = scheduler.APIScheduler()
job_obj.star() # 启动监听
#job_obj.remove_job(job_id="default") # 根据任务id删除任务
参考链接: https://zhuanlan.zhihu.com/p/44185271
flask中的应用:https://www.cnblogs.com/zydev/p/13865535.html
可以将flask-apscheduler当一个定时任务平台,平常将任务添加到其中
https://www.cnblogs.com/shenh/p/13366583.html
参考1:https://www.sohu.com/a/407444741_658944
参考2:https://www.cnblogs.com/yueerwanwan0204/p/5480870.html
参考3:https://www.jb51.net/article/184003.htm
相关文章: