hfut.session 源代码

# -*- coding:utf-8 -*-
"""
会话管理模块, 在 ``requests.Session`` 的基础上添加了一些针对接口的改进
"""
from __future__ import unicode_literals

import time
from collections import deque
from functools import reduce

import requests
import six
from six.moves.urllib import parse

from .exception import SystemLoginFailed, IPBanned, ValidationError
from .log import logger, report_response
from .value import HF, ENV

__all__ = ['BaseSession', 'GuestSession', 'StudentSession']


[文档]class BaseSession(requests.Session): """ 所有接口会话类的基类 """ def __init__(self, campus): """ 所以接口会话类的基类 :param campus: 校区代码, 请使用 ``value`` 模块中的 ``HF``, ``XC`` 分别来区分合肥校区和宣城校区 """ super(BaseSession, self).__init__() self.headers = ENV['DEFAULT_HEADERS'] self.histories = deque(maxlen=ENV['MAX_HISTORIES']) # 初始化时根据合肥选择不同的地址 self.campus = campus.upper() self.host = ENV[self.campus]
[文档] def prepare_request(self, request): parsed = parse.urlparse(request.url) # 非法字符检查 if ENV['REQUEST_ARGUMENTS_CHECK'] and (not parsed.netloc or parsed.netloc == parse.urlparse(self.host).netloc): for k, v in reduce(lambda x, y: x + list(y.items()), (request.params, request.data), []): pattern = ENV['ILLEGAL_CHARACTERS_PATTERN'] result = pattern.search(str(k)) or pattern.search(str(v)) if result: msg = ''.join(['参数中出现非法字符: ', result.group()]) raise ValidationError(msg) if not parsed.netloc: # requests 在准备 url 进行解析, 因此只能在准备前将 url 换成完整的地址 # requests.models.PreparedRequest#prepare_url request.url = parse.urljoin(self.host, request.url) return super(BaseSession, self).prepare_request(request)
[文档] def send(self, request, **kwargs): """ 所有接口用来发送请求的方法, 只是 :meth:`requests.sessions.Session.send` 的一个钩子方法, 用来处理请求前后的工作 """ response = super(BaseSession, self).send(request, **kwargs) if ENV['RAISE_FOR_STATUS']: response.raise_for_status() parsed = parse.urlparse(response.url) if parsed.netloc == parse.urlparse(self.host).netloc: response.encoding = ENV['SITE_ENCODING'] # 快速判断响应 IP 是否被封, 那个警告响应内容长度为 327 或 328, 保留一点余量确保准确 min_length, max_length, pattern = ENV['IP_BANNED_RESPONSE'] if min_length <= len(response.content) <= max_length and pattern.search(response.text): msg = '当前 IP 已被锁定, 如果可以请尝试切换教务系统地址, 否则请在更换网络环境或等待解封后重试!' raise IPBanned(msg) self.histories.append(response) logger.debug(report_response(response, redirection=kwargs.get('allow_redirects'))) return response
def __str__(self): return '<BaseSession(%s)>' % self.campus
[文档]@six.python_2_unicode_compatible class GuestSession(BaseSession): pass def __str__(self): return '<GuestSession(%s)>' % self.campus
[文档]@six.python_2_unicode_compatible class StudentSession(BaseSession): """ 学生教务接口, 继承了 :class:`models.GuestSession` 的所有接口, 因此一般推荐使用这个类 """ def __init__(self, account, password, campus): """ :param account: 学号 :param password: 密码 """ # 先初始化状态才能登陆 super(StudentSession, self).__init__(campus) self.account = account self.password = password self.name = None self.last_request_at = 0
[文档] def request(self, *args, **kwargs): if self.is_expired: self.login() self.last_request_at = time.time() return super(StudentSession, self).request(*args, **kwargs)
def __str__(self): return '<StudentSession(%s, %s)>' % (self.account, self.campus) @property def is_expired(self): """ asp.net 如果程序中没有设置session的过期时间,那么session过期时间就会按照IIS设置的过期时间来执行, IIS中session默认过期时间为20分钟,网站配置 最长24小时,最小15分钟, 页面级>应用程序级>网站级>服务器级. .那么当超过 15 分钟未操作会认为会话已过期需要重新登录 :return: 会话是否过期 """ now = time.time() return (now - self.last_request_at) >= 900
[文档] def login(self): """ 登录账户 """ # 登陆前清空 cookie, 能够防止再次登陆时因携带 cookie 可能提示有未进行教学评估的课程导致接口不可用 self.cookies.clear_session_cookies() if self.campus == HF: login_data = {'IDToken1': self.account, 'IDToken2': self.password} login_url = 'http://ids1.hfut.edu.cn/amserver/UI/Login' super(StudentSession, self).request('post', login_url, data=login_data) method = 'get' url = 'StuIndex.asp' data = None else: method = 'post' url = 'pass.asp' data = {"user": self.account, "password": self.password, "UserStyle": 'student'} # 使用重载的 request 会造成递归调用 response = super(StudentSession, self).request(method, url, data=data, allow_redirects=False) logged_in = response.status_code == 302 if not logged_in: msg = '登陆失败, 请检查你的账号和密码' raise SystemLoginFailed(msg) escaped_name = self.cookies.get('xsxm') if six.PY3: self.name = parse.unquote(escaped_name, ENV['SITE_ENCODING']) else: name = parse.unquote(escaped_name) self.name = name.decode(ENV['SITE_ENCODING'])