flipped-1121

说明

由于子墨大佬的教程已足够详细,不再赘叙。传送门

  • 此为AUST(安徽理工大学)专版。其实就是为了方便校友
  • 基于腾讯云函数
  • 使用了自建api。大佬造好的轮子。
  • 消息通知改为Server酱

所用代码

config.yml

#登陆相关配置
login:
  #开放的模拟登陆api服务器地址
  api: "http://cpdaily.kangblogs.top:8080/wisedu-unified-login-api-v1.0/api/login" #  有条件的可以自建
#用户组配置
users:
  #单个用户配置
  - user:
      #username 学号或者工号
      username: \'201********\'
      #password 密码
      password: \'********\'
      #address 地址,定位信息
      address: 中国安徽省淮南市田家庵区
      #email 接受通知消息的邮箱
      email: 
      #school 学校全称
      school: 安徽理工大学
      #lon 经度
      lon: \'117.029931\'
      #lat 纬度
      lat: \'32.555161\'
      #abnormalReason 反馈信息
      abnormalReason:
      #photo 签到照片,不需要可不填,或者直接删除
      photo: 
  #多用户配置,将下面的注释去掉即可,如果有表单内容有图片,不建议使用多用户配置
#今日校园相关配置
cpdaily:
  #是否检查问题,true检查,false不检查
  check: false
  #附加信息组默认选项配置,没有可留空
  defaults:
  #附加信息组默认选项配置,按顺序,可留空
    - default:
        title: 您目前的体温是多少
        type: 2
        value: 腋下温度37.3℃以下
    - default:
        title: 有无发热、干咳等呼吸道症状
        type: 2
        value: 无
    - default:
        title: 有无呕吐、腹泻等消化道症状
        type: 2
        value: 无
    - default:
        title: 有无其他身体不适症状
        type: 2
        value: 无

generate.py

# -*- coding: utf-8 -*-
import index as app
import yaml

config = app.config


# 生成默认配置
def generate():
    user = config[\'users\'][0]
    apis = app.getCpdailyApis(user)
    session = app.getSession(user, apis)
    params = app.getUnSignedTasks(session, apis)
    task = app.getDetailTask(session, params, apis)
    extraFields = task[\'extraField\']
    if len(extraFields) < 1:
        app.log(\'没有附加问题需要填写\')
        exit(-1)
    defaults = []
    for i in range(0, len(extraFields)):
        extraField = extraFields[i]
        extraFieldItems = extraField[\'extraFieldItems\']
        print(\'额外问题%d \' % (i + 1) + extraField[\'title\'])
        default = {}
        one = {}
        for j in range(0, len(extraFieldItems)):
            extraFieldItem = extraFieldItems[j]
            print(\'\t%d \' % (j + 1) + extraFieldItem[\'content\'])
        choose = int(input("请输入对应的序号:"))
        if choose < 1 or choose > len(extraFieldItems):
            app.log(\'输入错误\')
            exit(-1)
        one[\'title\'] = extraField[\'title\']
        one[\'value\'] = extraFieldItems[choose - 1][\'content\']
        if extraFieldItem[\'isOtherItems\'] == 1:
            text = input(\'\t\' + extraFieldItems[choose - 1][\'content\'] + \',请输入额外文本:\')
            one[\'other\'] = text
        default[\'default\'] = one
        defaults.append(default)
    print(\'======================分隔线======================\')
    print(yaml.dump(defaults, allow_unicode=True))


if __name__ == "__main__":
    generate()

index.py

# -*- coding: utf-8 -*-
import sys
import json
import uuid
import oss2
import yaml
import base64
import requests
from pyDes import des, CBC, PAD_PKCS5
from datetime import datetime, timedelta, timezone
from urllib.parse import urlparse
from urllib3.exceptions import InsecureRequestWarning

# debug模式
debug = False
if debug:
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


# 读取yml配置
def getYmlConfig(yaml_file=\'config.yml\'):
    file = open(yaml_file, \'r\', encoding="utf-8")
    file_data = file.read()
    file.close()
    config = yaml.load(file_data, Loader=yaml.FullLoader)
    return dict(config)


# 全局配置
config = getYmlConfig(yaml_file=\'config.yml\')


# 获取当前utc时间,并格式化为北京时间
def getTimeStr():
    utc_dt = datetime.utcnow().replace(tzinfo=timezone.utc)
    bj_dt = utc_dt.astimezone(timezone(timedelta(hours=8)))
    return bj_dt.strftime("%Y-%m-%d %H:%M:%S")


# 输出调试信息,并及时刷新缓冲区
def log(content):
    print(getTimeStr() + \' \' + str(content))
    sys.stdout.flush()


# 获取今日校园api
def getCpdailyApis(user):
    headers = { \'User-Agent\': \'Mozilla/5.0 (iPad; U; CPU OS 3_2_2 like Mac OS X; en-us) AppleWebKit/531.21.10 (KHTML, like Gecko) Version/4.0.4 Mobile/7B500 Safari/531.21.10\' }
    apis = {}
    user = user[\'user\']
    schools = requests.get(url=\'https://mobile.campushoy.com/v6/config/guest/tenant/list\', verify=not debug).json()[
        \'data\']
    flag = True
    for one in schools:
        if one[\'name\'] == user[\'school\']:
            if one[\'joinType\'] == \'NONE\':
                log(user[\'school\'] + \' 未加入今日校园\')
                exit(-1)
            flag = False
            params = {
                \'ids\': one[\'id\']
            }
            res = requests.get(url=\'https://mobile.campushoy.com/v6/config/guest/tenant/info\', params=params,
                               verify=not debug)
            data = res.json()[\'data\'][0]
            joinType = data[\'joinType\']
            idsUrl = data[\'idsUrl\']
            ampUrl = data[\'ampUrl\']
            if \'campusphere\' in ampUrl or \'cpdaily\' in ampUrl:
                parse = urlparse(ampUrl)
                host = parse.netloc
                res = requests.get(parse.scheme + \'://\' + host)
                parse = urlparse(res.url)
                apis[
                    \'login-url\'] = idsUrl + \'/login?service=\' + parse.scheme + r"%3A%2F%2F" + host + r\'%2Fportal%2Flogin\'
                apis[\'host\'] = host

            ampUrl2 = data[\'ampUrl2\']
            if \'campusphere\' in ampUrl2 or \'cpdaily\' in ampUrl2:
                parse = urlparse(ampUrl2)
                host = parse.netloc
                res = requests.get(parse.scheme + \'://\' + host)
                parse = urlparse(res.url)
                apis[
                    \'login-url\'] = idsUrl + \'/login?service=\' + parse.scheme + r"%3A%2F%2F" + host + r\'%2Fportal%2Flogin\'
                apis[\'host\'] = host
            break
    if flag:
        log(user[\'school\'] + \' 未找到该院校信息,请检查是否是学校全称错误\')
        exit(-1)
    log(apis)
    return apis


# 登陆并获取session
def getSession(user, apis):
    user = user[\'user\']
    params = {
        # \'login_url\': \'http://authserverxg.swu.edu.cn/authserver/login?service=https://swu.cpdaily.com/wec-counselor-sign-apps/stu/sign/getStuSignInfosInOneDay\',
        \'login_url\': apis[\'login-url\'],
        \'needcaptcha_url\': \'\',
        \'captcha_url\': \'\',
        \'username\': user[\'username\'],
        \'password\': user[\'password\']
    }

    cookies = {}
    # 借助上一个项目开放出来的登陆API,模拟登陆
    res = requests.post(url=config[\'login\'][\'api\'], data=params, verify=not debug)
    # cookieStr可以使用手动抓包获取到的cookie,有效期暂时未知,请自己测试
    # cookieStr = str(res.json()[\'cookies\'])
    cookieStr = str(res.json()[\'cookies\'])
    log(cookieStr)
    if cookieStr == \'None\':
        log(res.json())
        exit(-1)
    # log(cookieStr)

    # 解析cookie
    for line in cookieStr.split(\';\'):
        name, value = line.strip().split(\'=\', 1)
        cookies[name] = value
    session = requests.session()
    session.cookies = requests.utils.cookiejar_from_dict(cookies, cookiejar=None, overwrite=True)
    return session


# 获取最新未签到任务
def getUnSignedTasks(session, apis):
    headers = {
        \'Accept\': \'application/json, text/plain, */*\',
        \'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36\',
        \'content-type\': \'application/json\',
        \'Accept-Encoding\': \'gzip,deflate\',
        \'Accept-Language\': \'zh-CN,en-US;q=0.8\',
        \'Content-Type\': \'application/json;charset=UTF-8\'
    }
    # 第一次请求每日签到任务接口,主要是为了获取MOD_AUTH_CAS
    res = session.post(
        url=\'https://{host}/wec-counselor-sign-apps/stu/sign/getStuSignInfosInOneDay\'.format(host=apis[\'host\']),
        headers=headers, data=json.dumps({}), verify=not debug)
    # 第二次请求每日签到任务接口,拿到具体的签到任务
    res = session.post(
        url=\'https://{host}/wec-counselor-sign-apps/stu/sign/getStuSignInfosInOneDay\'.format(host=apis[\'host\']),
        headers=headers, data=json.dumps({}), verify=not debug)
    if len(res.json()[\'datas\'][\'unSignedTasks\']) < 1:
        log(\'当前没有未签到任务\')
        exit(-1)
    # log(res.json())
    latestTask = res.json()[\'datas\'][\'unSignedTasks\'][0]
    return {
        \'signInstanceWid\': latestTask[\'signInstanceWid\'],
        \'signWid\': latestTask[\'signWid\']
    }


# 获取签到任务详情
def getDetailTask(session, params, apis):
    headers = {
        \'Accept\': \'application/json, text/plain, */*\',
        \'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36\',
        \'content-type\': \'application/json\',
        \'Accept-Encoding\': \'gzip,deflate\',
        \'Accept-Language\': \'zh-CN,en-US;q=0.8\',
        \'Content-Type\': \'application/json;charset=UTF-8\'
    }
    res = session.post(
        url=\'https://{host}/wec-counselor-sign-apps/stu/sign/detailSignInstance\'.format(host=apis[\'host\']),
        headers=headers, data=json.dumps(params), verify=not debug)
    data = res.json()[\'datas\']
    return data


# 填充表单
def fillForm(task, session, user, apis):
    user = user[\'user\']
    form = {}
    if task[\'isPhoto\'] == 1:
        fileName = uploadPicture(session, user[\'photo\'], apis)
        form[\'signPhotoUrl\'] = getPictureUrl(session, fileName, apis)
    else:
        form[\'signPhotoUrl\'] = \'\'
    if task[\'isNeedExtra\'] == 1:
        extraFields = task[\'extraField\']
        defaults = config[\'cpdaily\'][\'defaults\']
        extraFieldItemValues = []
        for i in range(0, len(extraFields)):
            default = defaults[i][\'default\']
            extraField = extraFields[i]
            if config[\'cpdaily\'][\'check\'] and default[\'title\'] != extraField[\'title\']:
                log(\'第%d个默认配置项错误,请检查\' % (i + 1))
                exit(-1)
            extraFieldItems = extraField[\'extraFieldItems\']
            for extraFieldItem in extraFieldItems:
                if extraFieldItem[\'content\'] == default[\'value\']:
                    extraFieldItemValue = {\'extraFieldItemValue\': default[\'value\'],
                                           \'extraFieldItemWid\': extraFieldItem[\'wid\']}
                    # 其他,额外文本
                    if extraFieldItem[\'isOtherItems\'] == 1:
                        extraFieldItemValue = {\'extraFieldItemValue\': default[\'other\'],
                                               \'extraFieldItemWid\': extraFieldItem[\'wid\']}
                    extraFieldItemValues.append(extraFieldItemValue)
        # log(extraFieldItemValues)
        # 处理带附加选项的签到
        form[\'extraFieldItems\'] = extraFieldItemValues
    # form[\'signInstanceWid\'] = params[\'signInstanceWid\']
    form[\'signInstanceWid\'] = task[\'signInstanceWid\']
    form[\'longitude\'] = user[\'lon\']
    form[\'latitude\'] = user[\'lat\']
    form[\'isMalposition\'] = task[\'isMalposition\']
    form[\'abnormalReason\'] = user[\'abnormalReason\']
    form[\'position\'] = user[\'address\']
    form[\'uaIsCpadaily\'] = True
    return form


# 上传图片到阿里云oss
def uploadPicture(session, image, apis):
    url = \'https://{host}/wec-counselor-sign-apps/stu/sign/getStsAccess\'.format(host=apis[\'host\'])
    res = session.post(url=url, headers={\'content-type\': \'application/json\'}, data=json.dumps({}), verify=not debug)
    datas = res.json().get(\'datas\')
    fileName = datas.get(\'fileName\')
    accessKeyId = datas.get(\'accessKeyId\')
    accessSecret = datas.get(\'accessKeySecret\')
    securityToken = datas.get(\'securityToken\')
    endPoint = datas.get(\'endPoint\')
    bucket = datas.get(\'bucket\')
    bucket = oss2.Bucket(oss2.Auth(access_key_id=accessKeyId, access_key_secret=accessSecret), endPoint, bucket)
    with open(image, "rb") as f:
        data = f.read()
    bucket.put_object(key=fileName, headers={\'x-oss-security-token\': securityToken}, data=data)
    res = bucket.sign_url(\'PUT\', fileName, 60)
    # log(res)
    return fileName


# 获取图片上传位置
def getPictureUrl(session, fileName, apis):
    url = \'https://{host}/wec-counselor-sign-apps/stu/sign/previewAttachment\'.format(host=apis[\'host\'])
    data = {
        \'ossKey\': fileName
    }
    res = session.post(url=url, headers={\'content-type\': \'application/json\'}, data=json.dumps(data), verify=not debug)
    photoUrl = res.json().get(\'datas\')
    return photoUrl


# DES加密
def DESEncrypt(s, key=\'b3L26XNL\'):
    key = key
    iv = b"\x01\x02\x03\x04\x05\x06\x07\x08"
    k = des(key, CBC, iv, pad=None, padmode=PAD_PKCS5)
    encrypt_str = k.encrypt(s)
    return base64.b64encode(encrypt_str).decode()


# 提交签到任务
def submitForm(session, user, form, apis):
    user = user[\'user\']
    # Cpdaily-Extension
    extension = {
        "lon": user[\'lon\'],
        "model": "OPPO R11 Plus",
        "appVersion": "8.1.14",
        "systemVersion": "4.4.4",
        "userId": user[\'username\'],
        "systemName": "android",
        "lat": user[\'lat\'],
        "deviceId": str(uuid.uuid1())
    }

    headers = {
        # \'tenantId\': \'1019318364515869\',
        \'User-Agent\': \'Mozilla/5.0 (Linux; Android 4.4.4; OPPO R11 Plus Build/KTU84P) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/33.0.0.0 Safari/537.36 okhttp/3.12.4\',
        \'CpdailyStandAlone\': \'0\',
        \'extension\': \'1\',
        \'Cpdaily-Extension\': DESEncrypt(json.dumps(extension)),
        \'Content-Type\': \'application/json; charset=utf-8\',
        \'Accept-Encoding\': \'gzip\',
        # \'Host\': \'swu.cpdaily.com\',
        \'Connection\': \'Keep-Alive\'
    }
    res = session.post(url=\'https://{host}/wec-counselor-sign-apps/stu/sign/submitSign\'.format(host=apis[\'host\']),
                       headers=headers, data=json.dumps(form), verify=not debug)
    message = res.json()[\'message\']
    if message == \'SUCCESS\':
        log(\'自动签到成功\')
        sendMessageByWeChat(\'自动签到成功\', \'SUCCESS

分类:

技术点:

相关文章: