说明
由于子墨大佬的教程已足够详细,不再赘叙。传送门
- 此为
AUST (安徽理工大学)专版。其实就是为了方便校友 - 基于腾讯云函数
- 使用了自建api。大佬造好的轮子。
- 消息通知改为Server酱
所用代码
config.yml
#登陆相关配置
login:
#开放的模拟登陆api服务器地址
api: "http://cpdaily.kangblogs.top:8080/wisedu-unified-login-api-v1.0/api/login" # 有条件的可以自建
#用户组配置
users:
#单个用户配置
- user:
#username 学号或者工号
username: \'201********\'
#password 密码
password: \'********\'
#address 地址,定位信息
address: 中国安徽省淮南市田家庵区
#email 接受通知消息的邮箱
email:
#school 学校全称
school: 安徽理工大学
#lon 经度
lon: \'117.029931\'
#lat 纬度
lat: \'32.555161\'
#abnormalReason 反馈信息
abnormalReason:
#photo 签到照片,不需要可不填,或者直接删除
photo:
#多用户配置,将下面的注释去掉即可,如果有表单内容有图片,不建议使用多用户配置
#今日校园相关配置
cpdaily:
#是否检查问题,true检查,false不检查
check: false
#附加信息组默认选项配置,没有可留空
defaults:
#附加信息组默认选项配置,按顺序,可留空
- default:
title: 您目前的体温是多少
type: 2
value: 腋下温度37.3℃以下
- default:
title: 有无发热、干咳等呼吸道症状
type: 2
value: 无
- default:
title: 有无呕吐、腹泻等消化道症状
type: 2
value: 无
- default:
title: 有无其他身体不适症状
type: 2
value: 无
generate.py
# -*- coding: utf-8 -*-
import index as app
import yaml
config = app.config
# 生成默认配置
def generate():
user = config[\'users\'][0]
apis = app.getCpdailyApis(user)
session = app.getSession(user, apis)
params = app.getUnSignedTasks(session, apis)
task = app.getDetailTask(session, params, apis)
extraFields = task[\'extraField\']
if len(extraFields) < 1:
app.log(\'没有附加问题需要填写\')
exit(-1)
defaults = []
for i in range(0, len(extraFields)):
extraField = extraFields[i]
extraFieldItems = extraField[\'extraFieldItems\']
print(\'额外问题%d \' % (i + 1) + extraField[\'title\'])
default = {}
one = {}
for j in range(0, len(extraFieldItems)):
extraFieldItem = extraFieldItems[j]
print(\'\t%d \' % (j + 1) + extraFieldItem[\'content\'])
choose = int(input("请输入对应的序号:"))
if choose < 1 or choose > len(extraFieldItems):
app.log(\'输入错误\')
exit(-1)
one[\'title\'] = extraField[\'title\']
one[\'value\'] = extraFieldItems[choose - 1][\'content\']
if extraFieldItem[\'isOtherItems\'] == 1:
text = input(\'\t\' + extraFieldItems[choose - 1][\'content\'] + \',请输入额外文本:\')
one[\'other\'] = text
default[\'default\'] = one
defaults.append(default)
print(\'======================分隔线======================\')
print(yaml.dump(defaults, allow_unicode=True))
if __name__ == "__main__":
generate()
index.py
# -*- coding: utf-8 -*-
import sys
import json
import uuid
import oss2
import yaml
import base64
import requests
from pyDes import des, CBC, PAD_PKCS5
from datetime import datetime, timedelta, timezone
from urllib.parse import urlparse
from urllib3.exceptions import InsecureRequestWarning
# debug模式
debug = False
if debug:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# 读取yml配置
def getYmlConfig(yaml_file=\'config.yml\'):
file = open(yaml_file, \'r\', encoding="utf-8")
file_data = file.read()
file.close()
config = yaml.load(file_data, Loader=yaml.FullLoader)
return dict(config)
# 全局配置
config = getYmlConfig(yaml_file=\'config.yml\')
# 获取当前utc时间,并格式化为北京时间
def getTimeStr():
utc_dt = datetime.utcnow().replace(tzinfo=timezone.utc)
bj_dt = utc_dt.astimezone(timezone(timedelta(hours=8)))
return bj_dt.strftime("%Y-%m-%d %H:%M:%S")
# 输出调试信息,并及时刷新缓冲区
def log(content):
print(getTimeStr() + \' \' + str(content))
sys.stdout.flush()
# 获取今日校园api
def getCpdailyApis(user):
headers = { \'User-Agent\': \'Mozilla/5.0 (iPad; U; CPU OS 3_2_2 like Mac OS X; en-us) AppleWebKit/531.21.10 (KHTML, like Gecko) Version/4.0.4 Mobile/7B500 Safari/531.21.10\' }
apis = {}
user = user[\'user\']
schools = requests.get(url=\'https://mobile.campushoy.com/v6/config/guest/tenant/list\', verify=not debug).json()[
\'data\']
flag = True
for one in schools:
if one[\'name\'] == user[\'school\']:
if one[\'joinType\'] == \'NONE\':
log(user[\'school\'] + \' 未加入今日校园\')
exit(-1)
flag = False
params = {
\'ids\': one[\'id\']
}
res = requests.get(url=\'https://mobile.campushoy.com/v6/config/guest/tenant/info\', params=params,
verify=not debug)
data = res.json()[\'data\'][0]
joinType = data[\'joinType\']
idsUrl = data[\'idsUrl\']
ampUrl = data[\'ampUrl\']
if \'campusphere\' in ampUrl or \'cpdaily\' in ampUrl:
parse = urlparse(ampUrl)
host = parse.netloc
res = requests.get(parse.scheme + \'://\' + host)
parse = urlparse(res.url)
apis[
\'login-url\'] = idsUrl + \'/login?service=\' + parse.scheme + r"%3A%2F%2F" + host + r\'%2Fportal%2Flogin\'
apis[\'host\'] = host
ampUrl2 = data[\'ampUrl2\']
if \'campusphere\' in ampUrl2 or \'cpdaily\' in ampUrl2:
parse = urlparse(ampUrl2)
host = parse.netloc
res = requests.get(parse.scheme + \'://\' + host)
parse = urlparse(res.url)
apis[
\'login-url\'] = idsUrl + \'/login?service=\' + parse.scheme + r"%3A%2F%2F" + host + r\'%2Fportal%2Flogin\'
apis[\'host\'] = host
break
if flag:
log(user[\'school\'] + \' 未找到该院校信息,请检查是否是学校全称错误\')
exit(-1)
log(apis)
return apis
# 登陆并获取session
def getSession(user, apis):
user = user[\'user\']
params = {
# \'login_url\': \'http://authserverxg.swu.edu.cn/authserver/login?service=https://swu.cpdaily.com/wec-counselor-sign-apps/stu/sign/getStuSignInfosInOneDay\',
\'login_url\': apis[\'login-url\'],
\'needcaptcha_url\': \'\',
\'captcha_url\': \'\',
\'username\': user[\'username\'],
\'password\': user[\'password\']
}
cookies = {}
# 借助上一个项目开放出来的登陆API,模拟登陆
res = requests.post(url=config[\'login\'][\'api\'], data=params, verify=not debug)
# cookieStr可以使用手动抓包获取到的cookie,有效期暂时未知,请自己测试
# cookieStr = str(res.json()[\'cookies\'])
cookieStr = str(res.json()[\'cookies\'])
log(cookieStr)
if cookieStr == \'None\':
log(res.json())
exit(-1)
# log(cookieStr)
# 解析cookie
for line in cookieStr.split(\';\'):
name, value = line.strip().split(\'=\', 1)
cookies[name] = value
session = requests.session()
session.cookies = requests.utils.cookiejar_from_dict(cookies, cookiejar=None, overwrite=True)
return session
# 获取最新未签到任务
def getUnSignedTasks(session, apis):
headers = {
\'Accept\': \'application/json, text/plain, */*\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36\',
\'content-type\': \'application/json\',
\'Accept-Encoding\': \'gzip,deflate\',
\'Accept-Language\': \'zh-CN,en-US;q=0.8\',
\'Content-Type\': \'application/json;charset=UTF-8\'
}
# 第一次请求每日签到任务接口,主要是为了获取MOD_AUTH_CAS
res = session.post(
url=\'https://{host}/wec-counselor-sign-apps/stu/sign/getStuSignInfosInOneDay\'.format(host=apis[\'host\']),
headers=headers, data=json.dumps({}), verify=not debug)
# 第二次请求每日签到任务接口,拿到具体的签到任务
res = session.post(
url=\'https://{host}/wec-counselor-sign-apps/stu/sign/getStuSignInfosInOneDay\'.format(host=apis[\'host\']),
headers=headers, data=json.dumps({}), verify=not debug)
if len(res.json()[\'datas\'][\'unSignedTasks\']) < 1:
log(\'当前没有未签到任务\')
exit(-1)
# log(res.json())
latestTask = res.json()[\'datas\'][\'unSignedTasks\'][0]
return {
\'signInstanceWid\': latestTask[\'signInstanceWid\'],
\'signWid\': latestTask[\'signWid\']
}
# 获取签到任务详情
def getDetailTask(session, params, apis):
headers = {
\'Accept\': \'application/json, text/plain, */*\',
\'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36\',
\'content-type\': \'application/json\',
\'Accept-Encoding\': \'gzip,deflate\',
\'Accept-Language\': \'zh-CN,en-US;q=0.8\',
\'Content-Type\': \'application/json;charset=UTF-8\'
}
res = session.post(
url=\'https://{host}/wec-counselor-sign-apps/stu/sign/detailSignInstance\'.format(host=apis[\'host\']),
headers=headers, data=json.dumps(params), verify=not debug)
data = res.json()[\'datas\']
return data
# 填充表单
def fillForm(task, session, user, apis):
user = user[\'user\']
form = {}
if task[\'isPhoto\'] == 1:
fileName = uploadPicture(session, user[\'photo\'], apis)
form[\'signPhotoUrl\'] = getPictureUrl(session, fileName, apis)
else:
form[\'signPhotoUrl\'] = \'\'
if task[\'isNeedExtra\'] == 1:
extraFields = task[\'extraField\']
defaults = config[\'cpdaily\'][\'defaults\']
extraFieldItemValues = []
for i in range(0, len(extraFields)):
default = defaults[i][\'default\']
extraField = extraFields[i]
if config[\'cpdaily\'][\'check\'] and default[\'title\'] != extraField[\'title\']:
log(\'第%d个默认配置项错误,请检查\' % (i + 1))
exit(-1)
extraFieldItems = extraField[\'extraFieldItems\']
for extraFieldItem in extraFieldItems:
if extraFieldItem[\'content\'] == default[\'value\']:
extraFieldItemValue = {\'extraFieldItemValue\': default[\'value\'],
\'extraFieldItemWid\': extraFieldItem[\'wid\']}
# 其他,额外文本
if extraFieldItem[\'isOtherItems\'] == 1:
extraFieldItemValue = {\'extraFieldItemValue\': default[\'other\'],
\'extraFieldItemWid\': extraFieldItem[\'wid\']}
extraFieldItemValues.append(extraFieldItemValue)
# log(extraFieldItemValues)
# 处理带附加选项的签到
form[\'extraFieldItems\'] = extraFieldItemValues
# form[\'signInstanceWid\'] = params[\'signInstanceWid\']
form[\'signInstanceWid\'] = task[\'signInstanceWid\']
form[\'longitude\'] = user[\'lon\']
form[\'latitude\'] = user[\'lat\']
form[\'isMalposition\'] = task[\'isMalposition\']
form[\'abnormalReason\'] = user[\'abnormalReason\']
form[\'position\'] = user[\'address\']
form[\'uaIsCpadaily\'] = True
return form
# 上传图片到阿里云oss
def uploadPicture(session, image, apis):
url = \'https://{host}/wec-counselor-sign-apps/stu/sign/getStsAccess\'.format(host=apis[\'host\'])
res = session.post(url=url, headers={\'content-type\': \'application/json\'}, data=json.dumps({}), verify=not debug)
datas = res.json().get(\'datas\')
fileName = datas.get(\'fileName\')
accessKeyId = datas.get(\'accessKeyId\')
accessSecret = datas.get(\'accessKeySecret\')
securityToken = datas.get(\'securityToken\')
endPoint = datas.get(\'endPoint\')
bucket = datas.get(\'bucket\')
bucket = oss2.Bucket(oss2.Auth(access_key_id=accessKeyId, access_key_secret=accessSecret), endPoint, bucket)
with open(image, "rb") as f:
data = f.read()
bucket.put_object(key=fileName, headers={\'x-oss-security-token\': securityToken}, data=data)
res = bucket.sign_url(\'PUT\', fileName, 60)
# log(res)
return fileName
# 获取图片上传位置
def getPictureUrl(session, fileName, apis):
url = \'https://{host}/wec-counselor-sign-apps/stu/sign/previewAttachment\'.format(host=apis[\'host\'])
data = {
\'ossKey\': fileName
}
res = session.post(url=url, headers={\'content-type\': \'application/json\'}, data=json.dumps(data), verify=not debug)
photoUrl = res.json().get(\'datas\')
return photoUrl
# DES加密
def DESEncrypt(s, key=\'b3L26XNL\'):
key = key
iv = b"\x01\x02\x03\x04\x05\x06\x07\x08"
k = des(key, CBC, iv, pad=None, padmode=PAD_PKCS5)
encrypt_str = k.encrypt(s)
return base64.b64encode(encrypt_str).decode()
# 提交签到任务
def submitForm(session, user, form, apis):
user = user[\'user\']
# Cpdaily-Extension
extension = {
"lon": user[\'lon\'],
"model": "OPPO R11 Plus",
"appVersion": "8.1.14",
"systemVersion": "4.4.4",
"userId": user[\'username\'],
"systemName": "android",
"lat": user[\'lat\'],
"deviceId": str(uuid.uuid1())
}
headers = {
# \'tenantId\': \'1019318364515869\',
\'User-Agent\': \'Mozilla/5.0 (Linux; Android 4.4.4; OPPO R11 Plus Build/KTU84P) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/33.0.0.0 Safari/537.36 okhttp/3.12.4\',
\'CpdailyStandAlone\': \'0\',
\'extension\': \'1\',
\'Cpdaily-Extension\': DESEncrypt(json.dumps(extension)),
\'Content-Type\': \'application/json; charset=utf-8\',
\'Accept-Encoding\': \'gzip\',
# \'Host\': \'swu.cpdaily.com\',
\'Connection\': \'Keep-Alive\'
}
res = session.post(url=\'https://{host}/wec-counselor-sign-apps/stu/sign/submitSign\'.format(host=apis[\'host\']),
headers=headers, data=json.dumps(form), verify=not debug)
message = res.json()[\'message\']
if message == \'SUCCESS\':
log(\'自动签到成功\')
sendMessageByWeChat(\'自动签到成功\', \'SUCCESS