实现了通过企业微信发送消息,平时用于运维的告警还是不错的,相对于邮件来说,实时性更高,不过就是企业微信比较麻烦,此处不做过多解释。
企业微信api的详细请看:http://work.weixin.qq.com/api/doc#10167
话不多说,直接代码
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 # @Time : 2018/4/25 17:06 5 # @Author : zms 6 # @Site : 7 # @File : WeChat.py 8 # @Software: PyCharm Community Edition 9 10 # !/usr/bin/env python 11 # coding:utf-8 12 # file wechat.py 13 14 import time 15 import requests 16 import json 17 18 import sys 19 20 reload(sys) 21 sys.setdefaultencoding(\'utf8\') 22 23 24 class WeChat: 25 def __init__(self): 26 self.CORPID = \'***********\' 27 self.CORPSECRET = \'*********************************\' 28 self.AGENTID = \'**************\' 29 self.TOUSER = "**********" # 接收者用户名 30 31 def _get_access_token(self): 32 url = \'https://qyapi.weixin.qq.com/cgi-bin/gettoken\' 33 values = {\'corpid\': self.CORPID, 34 \'corpsecret\': self.CORPSECRET, 35 } 36 req = requests.post(url, params=values) 37 data = json.loads(req.text) 38 # print data 39 return data["access_token"] 40 41 def get_access_token(self): 42 try: 43 with open(\'./tmp/access_token.conf\', \'r\') as f: 44 t, access_token = f.read().split() 45 except: 46 with open(\'./tmp/access_token.conf\', \'w\') as f: 47 access_token = self._get_access_token() 48 cur_time = time.time() 49 f.write(\'\t\'.join([str(cur_time), access_token])) 50 return access_token 51 else: 52 cur_time = time.time() 53 if 0 < cur_time - float(t) < 7260: 54 return access_token 55 else: 56 with open(\'./access_token.conf\', \'w\') as f: 57 access_token = self._get_access_token() 58 f.write(\'\t\'.join([str(cur_time), access_token])) 59 return access_token 60 61 def send_data(self, message): 62 msg = message.encode(\'utf-8\') 63 send_url = \'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=\' + self.get_access_token() 64 send_values = { 65 "touser": self.TOUSER, 66 "msgtype": "text", 67 "agentid": self.AGENTID, 68 "text": { 69 "content": msg 70 }, 71 "safe": "0" 72 } 73 send_data = \'{"msgtype": "text", "safe": "0", "agentid": %s, "touser": "%s", "text": {"content": "%s"}}\' % ( 74 self.AGENTID, self.TOUSER, msg) 75 r = requests.post(send_url, send_data) 76 # print r.content 77 return r.content 78 79 80 if __name__ == \'__main__\': 81 wx = WeChat() 82 wx.send_data("test")