admin管理员组

文章数量:1627459

文章目录

    • 1. 需要的库
    • 2. js接口调用部分
      • 每日报平安 post.py
      • 早中午检
      • 以上所需字典文件 xg_json_bean.py
    • 3. 程序调用 start.py
    • 4. 定时启动
    • 5. 网页建站
      • 后端调用
        • 添加账号
          • 添加账号到临时列表
          • 更新临时列表到工作列表
        • 小功能
          • 单次填写
          • 检查是否填写过
      • 网站部分

分析过程省略
代码蹩脚结构混乱不想改
webdriver版效率太低废弃了,没有放进来,推荐列表里有其他同学写的
因为建站方案问题,使用了部分绝对地址,建议根据实际情况修改

1. 需要的库

忘了哪些是自带的了,自己根据报错去装吧,或者直接pycharm自动识别

2. js接口调用部分

每日报平安 post.py

功能为单次提交,服务器没有校验,漏卡可以自定义日期后提交
单人或少数人使用直接修改此部分,参考单次填写,第3节可忽略不看

import requests
import json
import time
import random
import datetime
from xg_json_bean import JBXX_JSON
from xg_json_bean import MRQK_JSON
from xg_json_bean import DATA_JSON
proxies = {'http': '127.0.0.1:1080'}
proxies = {}
#headers = {"User-Agent" : "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 QIHU 360SE"}
class AutoRobot:
    def __init__(self):
        pass

    def Login(self,username,password):
        #登录并进入应用
        session = requests.Session()
        self.session = session
        
        userInfo={'userName': username,
            'password': password,
            'isWeekLogin': 'false',
            'verifyCode': ''}
        loginUrl = "http://xgfx.bnuz.edu/xsdtfw/sys/emapfunauth/loginValidate.do"
        
        loginRes = session.post(loginUrl, userInfo,proxies=proxies)
        #loginData = json.loads(loginRes.text)
        #loginData = loginData['validate']
        loginData = loginRes.text
        #print(loginData)
        if 'success' in loginData:
            loginData = 'success'
            pass
        elif '用户名' in loginData:
            return '用户名或者密码不正确'
        elif '验证码' in loginData:
            return '验证码不正确'
        else:
            return '连接错误请重试'
        #print(loginRes.text.encode('utf8'))
        #print(mysession.cookies.get_dict())  #打印cookie   cookie转化为一个字典
        
        appUrl = 'http://xgfx.bnuz.edu/xsdtfw/sys/emaphome/appShow.do?id=d1a203e154ba4216bc84556cefe6f570'
        appRes = session.get(appUrl,proxies=proxies)
        
        return loginData

    def Cheak(self):
        #检查是否填写
        session = self.session
        cheakUrl = 'http://xgfx.bnuz.edu/xsdtfw/sys/xsyqxxsjapp/mrbpa/checkFilled.do'
        cheakRes = session.post(cheakUrl,proxies=proxies)
        #print(cheakRes.text)
        cheakData = json.loads(cheakRes.text)
        cheakData = cheakData['data']
        #print(len(cheakData))
        if len(cheakData) == 0:
            return False
        else:
            return True
        
    def GenenateWID(self):
        s = '01234567890abcdef'
        wid = ''
        for i in range(32):
            wid+=random.choice(s)
        return wid
    
    def LoadData(self):
        #获取json数据
        session = self.session

        jbxxUrl = 'http://xgfx.bnuz.edu/xsdtfw/sys/xsyqxxsjapp/modules/mrbpa/mrbpabd.do'
        jbxxRes = session.post(jbxxUrl,proxies=proxies)
        #print(mrqkRes.text)
        jbxxData = json.loads(jbxxRes.text)
        jbxxData = jbxxData['datas']['mrbpabd']['rows'][0]
        self.jbxxData = jbxxData
        #print(jbxxData)
        
        mrqkUrl = 'http://xgfx.bnuz.edu/xsdtfw/sys/xsyqxxsjapp/mrbpa/getJbxx.do'
        mrqkRes = session.post(mrqkUrl,proxies=proxies)
        #print(jbxxRes.text)
        mrqkData = json.loads(mrqkRes.text)
        mrqkData = mrqkData['data']
        #print(mrqkData)
        self.mrqkData = mrqkData

        self.WID = self.GenenateWID()
        #print(self.WID)

    def DataDeal(self):
        #重新打包json
        jbxxData = self.jbxxData
        mrqkData = self.mrqkData
        WID = self.WID
        #去除none
        #print(jbxxData)
        #print(mrqkData)
        for key in JBXX_JSON:
            if not key in jbxxData.keys():
                jbxxData[key] = ''
            elif jbxxData[key] == None:
                jbxxData[key] = ''

        for key in MRQK_JSON:
            if not key in mrqkData.keys():
                mrqkData[key] = ''
            elif mrqkData[key] == None:
                mrqkData[key] = ''

        JBXX_JSON['XH']             = jbxxData['XH']
        JBXX_JSON['XM']             = jbxxData['XM']
        JBXX_JSON['XBDM_DISPLAY']   = jbxxData['XBDM_DISPLAY']
        JBXX_JSON['XBDM']           = jbxxData['XBDM']
        JBXX_JSON['SFZJH']          = jbxxData['SFZJH']
        JBXX_JSON['DWDM_DISPLAY']   = jbxxData['DWDM_DISPLAY']
        JBXX_JSON['DWDM']           = jbxxData['DWDM']
        JBXX_JSON['ZYDM']           = jbxxData['ZYDM']
        JBXX_JSON['BJDM']           = jbxxData['BJDM']
        JBXX_JSON['LXDH']           = jbxxData['LXDH']
        JBXX_JSON['GJDQ_DISPLAY']   = jbxxData['GJDQ_DISPLAY']
        JBXX_JSON['GJDQ']           = jbxxData['GJDQ']
        JBXX_JSON['SZDQ_DISPLAY']   = jbxxData['SZDQ_DISPLAY']
        JBXX_JSON['SZDQ']           = jbxxData['SZDQ']
        JBXX_JSON['RYLB_DISPLAY']   = jbxxData['RYLB_DISPLAY']
        JBXX_JSON['RYLB']           = jbxxData['RYLB']
        JBXX_JSON['JJLXR']          = jbxxData['JJLXR']
        JBXX_JSON['JJLXRDH']        = jbxxData['JJLXRDH']
        JBXX_JSON['JJLXRJG_DISPLAY']= jbxxData['JJLXRJG_DISPLAY']
        JBXX_JSON['JJLXRJG']        = jbxxData['JJLXRJG']
        JBXX_JSON['JQQK_DISPLAY']   = jbxxData['JQQK_DISPLAY']
        JBXX_JSON['JQQK']           = jbxxData['JQQK']
        JBXX_JSON['SFDFHB_DISPLAY'] = jbxxData['SFDFHB_DISPLAY']
        JBXX_JSON['SFDFHB']         = jbxxData['SFDFHB']
        JBXX_JSON['JTXXDZ_DISPLAY'] = jbxxData['JTXXDZ_DISPLAY']
        JBXX_JSON['JTXXDZ']         = jbxxData['JTXXDZ']
        JBXX_JSON['XXDZ']           = jbxxData['XXDZ']
        JBXX_JSON['XSBH']           = jbxxData['XSBH']        

        MRQK_JSON['WID']            = WID
        #MRQK_JSON['BRJKZT_DISPLAY']= mrqkData['BRJKZT_DISPLAY']
        #MRQK_JSON['BRJKZT']        = mrqkData['BRJKZT']
        MRQK_JSON['XSBH']           = mrqkData['XSBH']
        MRQK_JSON['TBSJ']           = datetime.datetime.now().strftime('%Y-%m-%d')
        MRQK_JSON['TW']             = str(random.choice([36, 36.1, 36.2, 36.3, 36.4, 36.5, 36.6, 36.7, 36.8]))
        MRQK_JSON['MRSZDQ']         = mrqkData['MRSZDQ']
        MRQK_JSON['MRSZDQ_DISPLAY'] = mrqkData['MRSZDQ_DISPLAY']
        MRQK_JSON['MRXXDZ']         = mrqkData['MRXXDZ']
        #MRQK_JSON['JKZM']          = mrqkData['JKZM']

        #print(json.dumps(DATA_JSON, ensure_ascii=False))
        
        #处理成转义字符
        jbxxStr = json.dumps(JBXX_JSON, ensure_ascii=False)
        mrqkStr = json.dumps(MRQK_JSON, ensure_ascii=False)
        jbxxStrT = ''
        mrqkStrT = ''
        for c in jbxxStr:
            if c == '"':
                jbxxStrT += '\\"'
            else:
                jbxxStrT += c
        for c in mrqkStr:
            if c == '"':
                mrqkStrT += '\\"'
            else:
                mrqkStrT += c
        jbxxStr = jbxxStrT
        mrqkStr = mrqkStrT
        
        data = '{"JBXX":"' + jbxxStrT + '","MRQK":"' + mrqkStrT + '"}'
        
        #print(data)
        return data
        
    def Submit(self,data):
        session = self.session
        sumitData={'data': data}
        sumitUrl = 'http://xgfx.bnuz.edu/xsdtfw/sys/xsyqxxsjapp/mrbpa/saveMrbpa.do'
        sumitRes = session.post(sumitUrl,sumitData,proxies=proxies)
        sumitData = json.loads(sumitRes.text)
        return sumitData['msg']
        
    def AutoFill(self,username,password):

        loginSta = self.Login(username,password)
        #print(loginSta)
        if loginSta == 'success':
            pass
        elif '用户名' in loginSta or '验证码' in loginSta:
            return loginSta
        else:
            return '连接错误'
        try:
            fillSte = self.Cheak()
        except Exception as e:
            print(e)
            return '用户异常'
        if fillSte:
            return '已填跳过'
            pass
        else:
            pass
        try:
            self.LoadData()
            data = self.DataDeal()
        except Exception as e:
            print(e)
            return '数据异常'
        submitSta = self.Submit(data)
        #print(submitSta)
        self.session.close()
        if '成功' in submitSta:
            return '填写完成'
        else:
            print(submitSta)
            return '上传异常'

早中午检

只有robot,服务器没有重复校验,多次提交会有多个记录

class AutoRobot:
    def __init__(self):
        pass

    def Login(self,username,password):
        #登录并进入应用
        session = requests.Session()
        self.session = session

        userInfo={'userName': username,
                  'password': password,
                  'isWeekLogin': 'false',
                  'verifyCode': ''}
        loginUrl = "http://xgfx.bnuz.edu/xsdtfw/sys/emapfunauth/loginValidate.do"

        loginRes = session.post(loginUrl, userInfo,proxies=proxies)
        #loginData = json.loads(loginRes.text)
        #loginData = loginData['validate']
        loginData = loginRes.text
        #print(loginData)
        if 'success' in loginData:
            pass
        else:
            return loginData
        #print(loginRes.text.encode('utf8'))
        #print(mysession.cookies.get_dict())  #打印cookie   cookie转化为一个字典

        appUrl = 'http://xgfx.bnuz.edu/xsdtfw/sys/swpubapp/MobileCommon/getMenuInfo.do'
        appInfo = {
            'data' : '{"APPID":"5811260348942403","APPNAME":"swmxsyqxxsjapp"}'
        }
        appRes = session.post(appUrl,appInfo,proxies=proxies)

        return loginData

    def Cheak(self):
        # 检查是否填写
        session = self.session
        cheakUrl = 'http://xgfx.bnuz.edu/xsdtfw/sys/swmxsyqxxsjapp/modules/schoolTemperature/getStuTwList.do'
        nowTime = datetime.datetime.now().strftime('%Y-%m-%d')
        cheakInfo = {'data':'{"pageNumber": 1,"pageSize": 10,"CLRQ":'+nowTime+'}'}
        cheakRes = session.post(cheakUrl, cheakInfo, proxies=proxies)
        #print(cheakRes.text)
        cheakData = json.loads(cheakRes.text)
        cheakData = cheakData['data']
        stateWID = [False,False,False]
        for cheakWID in cheakData:
            #print(cheakWID['WID'])

            WIDUrl = 'http://xgfx.bnuz.edu/xsdtfw/sys/swmxsyqxxsjapp/modules/schoolTemperature/getStuTwDetail.do'
            WIDInfo = {
                'data': '{"WID":"'+ cheakWID['WID'] +'"}'
            }
            WIDRes = session.post(WIDUrl, WIDInfo, proxies=proxies)
            WIDData = json.loads(WIDRes.text)['data']['PCMC']
            if('晨'in WIDData):stateWID[0] = True
            elif('午'in WIDData):stateWID[1] = True
            elif ('晚' in WIDData):stateWID[2] = True
        #     print(WIDData)
        # print(stateWID)
        return stateWID

    def GetData(self):
        session = self.session
        dataUrl = 'http://xgfx.bnuz.edu/xsdtfw/sys/swmxsyqxxsjapp/modules/schoolTemperature/queryTemperatureInfo.do'
        dataInfo = {'data':'{}'}
        dataRes = session.post(dataUrl, dataInfo, proxies=proxies)
        dataData = json.loads(dataRes.text)
        dataData = dataData['data']

        self.dataData = dataData

        # print(dicta)

    def DataDeal(self):
        time = self.time
        dataData = self.dataData
        dicta = dataData[time]
        dicta.update({'CLTW': str(random.choice([36.2, 36.3, 36.4, 36.5, 36.6, 36.7, 36.8, 36.9]))})
        self.dataData = dicta

    def Submit(self):
        session = self.session
        submitStr = json.dumps(self.dataData, ensure_ascii=False)
        sumitData={'data': submitStr}
        sumitUrl = 'http://xgfx.bnuz.edu/xsdtfw/sys/swmxsyqxxsjapp/modules/schoolTemperature/saveSchoolTempl.do'
        sumitRes = session.post(sumitUrl,sumitData,proxies=proxies)
        sumitData = json.loads(sumitRes.text)
        #print(self.dataData['XM'],end='\t')
        self.session.close()
        return sumitData['msg']

    def AutoFill(self,username,password,time):
        self.time = time
        loginSta = self.Login(username,password)
        #print(loginSta)
        if 'success' in loginSta:
            pass
        elif '用户名' in loginSta or '验证码' in loginSta:
            return loginSta
        else:
            print(loginSta)
            return '连接错误'
        fillSta = self.Cheak()
        if fillSta[time]:
            return '已填跳过'
        else:
            pass
        self.GetData()
        if len(self.dataData) == 0:
            return '用户组错误'
        else:
            pass
        self.DataDeal()
        self.Submit()
        return '填写完成'

以上所需字典文件 xg_json_bean.py

import json
JBXX_JSON = {
   'XH':'',
   'XM':'',
   'XBDM_DISPLAY':'',
   'XBDM':'',
   'SFZJH':'',
   'XZNJ':'',
   'DWDM_DISPLAY':'',
   'DWDM':'',
   'ZYDM':'',
   'BJDM':'',
   'LXDH':'',
   'GJDQ_DISPLAY':'',
   'GJDQ':'',
   'SZDQ_DISPLAY':'',
   'SZDQ':'',
   'RYLB_DISPLAY':'',
   'RYLB':'',
   'JJLXR':'',
   'JJLXRDH':'',
   'JJLXRJG_DISPLAY':'',
   'JJLXRJG':'',
   'JQQK_DISPLAY':'',
   'JQQK':'',
   'GCKSRQ':'',
   'GCJSRQ':'',
   'SFDFHB_DISPLAY':'',
   'SFDFHB':'',
   'DFHTJHBSJ':'',
   'JTXXDZ_DISPLAY':'',
   'JTXXDZ':'',
   'XXDZ':'',
   'ZDRQJCQK':'',
   'JTXC':'',
   'JQQTQK':'',
   'XSBH':''
}

MRQK_JSON = {
    'WID':'',
    'BRJKZT_DISPLAY':'',
    'BRJKZT':'',
    'SFJZ_DISPLAY':'',
    'SFJZ':'',
    'JTCYJKZK_DISPLAY':'',
    'JTCYJKZK':'',
    'XLZK_DISPLAY':'',
    'XLZK':'',
    'QTQK':'',
    'XSBH':'',
    'TBSJ':'',
    'TW':'',
    'MRSZDQ_DISPLAY':'',
    'MRSZDQ':'',
    'MRXXDZ':'',
    'BY1':'',
    'SFFRHKS_DISPLAY':'否',
    'SFFRHKS':'0',
    'SFFRHKSQKSM':'',
    'SFQGJCHB_DISPLAY':'否',
    'SFQGJCHB':'0',
    'SFQGJCHBSQKSM':'',
    'SFJCQZ_DISPLAY':'否',
    'SFJCQZ':'0',
    'SFJCQZQKSM':'',
    'SFQZHYS_DISPLAY':'否',
    'SFQZHYS':'0',
    'SFQZHYSQKSM':'',
    'SFBGL_DISPLAY':'否',
    'SFBGL':'0',
    'SFBGLQKSM':'',
    'JKZM':''
}

DATA_JSON = {
    'JBXX':JBXX_JSON,
    'MRQK':MRQK_JSON
}

3. 程序调用 start.py

此段带账号管理,无多人或建站需求可以忽略

import sys
import time
import random
import datetime
from post import AutoRobot


def Start(addr):
    robot = AutoRobot()
    with open(addr + 'UserList.txt', 'r', encoding='utf-8') as f:
        users_info = [x.rstrip().split() for x in f.readlines()]
    if len(users_info) == 0:
        print("void\r\n")
        return False
    invalidAccount = []
    retry = False
    for user, pwd in users_info:
        try:
            sta = robot.AutoFill(user, pwd)
            if '用户名'in sta:

                invalidAccount.append(user)

            elif '验证码'in sta or '连接错误' in sta:
                retry = True
        except Exception as e:
             print(e)
             sta = '程序异常'
             robot.session.close()
             retry = True
        print(user,sta)
        Log(datetime.datetime.now().strftime('%H:%M') + '\t' + user + '\t' + sta + '\n')
    DeletInvalidAccount(addr,users_info,invalidAccount)
    return retry

def Log(log):
    nowTime = datetime.datetime.now().strftime('%m-%d')  # 现在
    f=open(".\\LOG\\signlog-{}.txt".format(nowTime), 'a', encoding='utf-8')
    f.write(log)
    f.close()

def DeletInvalidAccount(addr, srcList, account):
    if len(account) == 0:
        return
    print('发现密码错误')
    with open(addr + 'UserList.txt',"w",encoding="utf-8") as f:
        for user, pwd in srcList:
            if user in account:
                continue
            f.write(user +  ' ' + pwd + '\n')

#if __name__ == "__main__":
#    Start('C:\web\\')

4. 定时启动

可以设置系统任务定时启动
因为挂在服务器不用考虑太多,直接写了个程序,顺便加上简单的出错重试机制

import sys
sys.path.append('C:/web')
import time
import datetime
from start import Start

retry = True
delayTime = 0;
THO = 0
TMO = 0
CH = 10
CM = 0

def CountTime(hm,nh,nm,de):
    while de > 60:
        de = de - 60
        nh = nh + 1
    nm += de
    if nm >=60:
        nm -= 60
        nh += 1
    if hm:
        return nh
    else:
        return nm
def getDelay():
    return ((int(datetime.datetime.now().strftime('%H')) - CH) * 60) + int(datetime.datetime.now().strftime('%M')) + 30
    
def isTime(H,M,D):
    if H == CountTime(True,CH,CM,D) and M == CountTime(False,CH,CM,D):
        return True
    else:
        return False
#Start('C:\my\\')
#updateUser()
#Start('C:\web\\')#test
while True:
    nowH = int(datetime.datetime.now().strftime('%H'))
    nowM = int(datetime.datetime.now().strftime('%M'))

    #print(nowH,nowM)
    if not TMO == (nowM//10):
        #print(nowH,nowM)
        TMO = (nowM//10)
    
    #if nowH == 0 and nowM == 0://建站时需要使用
    #    print('更新用户')
    #    updateUser()
    #    time.sleep(60)
    if isTime(nowH, nowM, delayTime):
        print((datetime.datetime.now().strftime('%m')) + '-' + (datetime.datetime.now().strftime('%d')) + "  " + (datetime.datetime.now().strftime('%H')) + ":"  + (datetime.datetime.now().strftime('%M')) + '开始填写')
        delayTime = 0
        retry = Start('C:\web\\')
        if retry:
            delayTime = getDelay()
        print((datetime.datetime.now().strftime('%m')) + '-' + (datetime.datetime.now().strftime('%d')) + "  " + (datetime.datetime.now().strftime('%H')) + ":"  + (datetime.datetime.now().strftime('%M')) + '填写结果' + str(not retry))
        time.sleep(60)
    
    if delayTime > 720:
        delayTime = 0

    time.sleep(30)
    #if nowH == 23 or nowH == 0 or nowH == 1 or nowH == 2:
    #    time.sleep(1)
    #else:
    #    time.sleep(abs(nowH - 8) * 10)

5. 网页建站

后端调用

添加账号
添加账号到临时列表

后端调用入口,命令行调用

import sys
sys.path.append('C:\\web')
#from Logfile import Log

if __name__ == "__main__":
    try:
        #Log(str(sys.argv[1]) + '\t Add\n')
        f=open('C:\\web\\userListNew.txt', 'a', encoding='utf-8')
        f.write(str(sys.argv[1])+ '\t' + str(sys.argv[2])+ '\n')
        f.close()
        print('完成')
    except:
        print('失败')

更新临时列表到工作列表

调用位置为上一节定时任务

import sys
import datetime

if __name__ == "__main__":
    nowTime = datetime.datetime.now().strftime('%Y-%m-%d')
    fb=open("C:\\web\\UserBak-{}.txt".format(nowTime), 'a', encoding='utf-8')
    users_info = []
    with open('C:\\web\\UserList.txt', 'r', encoding='utf-8') as fl:
        for x in fl.readlines():
            line = x.rstrip().split()
            if len(line)==0:continue
            users_info.append(line[0])
            fb.write(line[0] +  ' ' + line[1] + '\n')
    #print(users_info)

    fb.close()
    
    with open('C:\\web\\UserListNew.txt', 'r', encoding='utf-8') as f:
        users_info_new = [x.rstrip().split() for x in f.readlines()]
    
    with open('C:\\web\\UserList.txt',"a",encoding="utf-8") as fs:
        for user, pwd in users_info_new:
            if user in users_info:
                continue
            fs.write(user +  ' ' + pwd + '\n')

    with open('C:\\web\\UserListNew.txt', 'w', encoding='utf-8') as f:
        pass
小功能

单次填写
import sys
sys.path.append('C:\\web')
#from Logfile import Log
from post import AutoRobot

if __name__ == "__main__":
    #Log(str(sys.argv[1]) + '\t oneTime\n')
    try:
        robot = AutoRobot()
        print(robot.AutoFill(str(sys.argv[1]), str(sys.argv[2])))
    except Exception as e:
        print(str(e))

检查是否填写过
import sys
sys.path.append('C:\\web')
#from Logfile import Log
from post import AutoRobot

if __name__ == "__main__":
    #Log(str(sys.argv[1]) + '\t cheakFill\n')
    try:
        robot = AutoRobot()
        robot.Login(str(sys.argv[1]), str(sys.argv[2]))
        sta = robot.Cheak()
        if sta:
            print('已经填写')
        else:
            print('未填写')
    except Exception as e:
        print(str(e))

。。。。。。

网站部分

实际网站部分暂时不想整理

本文标签: 系统BNUZ