有点意思,我也来一份手机验证码登录代码
# Created by 咸鱼 at 2020/9/27 8:48 下午
"""
requirement:
httpx==0.15.4
pycrypto==2.6.1
"""
import os
import time
import base64
import asyncio
from datetime import datetime
from Crypto.Cipher import AES
from httpx import AsyncClient
from typing import Coroutine, Optional
from http.cookiejar import MozillaCookieJar
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36',
'Referer': 'https://www.114yygh.com/',
'Request-Source': 'PC',
'Origin': 'https://www.114yygh.com',
'Host': 'www.114yygh.com',
'Content-Type': 'application/json;charset=UTF-8'
}
def encrypt(text: str) -> str:
"""AES加密"""
key = 'hyde2019hyde2019'
BS = AES.block_size
pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
encryptor = AES.new(key, AES.MODE_ECB)
text = encryptor.encrypt(pad(text))
text = base64.b64encode(text).decode('utf8')
text = text.replace('+', '-').replace('/', '_')
return text
async def check_status(client: AsyncClient) -> bool:
"""检查用户登录状态
:param client:
:return:
"""
url = f'https://www.114yygh.com/web/user/info?_time={int(time.time() * 1000)}'
r = await client.get(url, headers=headers)
r = r.json()
if r.get('resCode') == 0 and r.get('data'):
return True
return False
async def login(username: str) -> AsyncClient:
"""手机号登录114"""
cookies = MozillaCookieJar(f'{username}.ck')
if os.path.exists(f'{username}.ck'):
cookies.load()
client = AsyncClient(cookies=cookies)
if await check_status(client):
return client
await client.get('https://www.114yygh.com/')
url = 'https://www.114yygh.com/web/common/verify-code/get?_time' \
f'={int(time.time() * 1000)}&mobile={username}&smsKey=LOGIN'
await client.get(url, headers=headers)
code = input(f'{username}手机验证码: ')
url = f'https://www.114yygh.com/web/login?_time={int(time.time() * 1000)}'
payload = {
'mobile': encrypt(username),
'code': encrypt(code)
}
res = await client.post(url, data=str(payload), headers=headers)
res = res.json()
if res.get('resCode') != 0:
raise RuntimeError(res.get('msg', ''))
cookies.save()
return client
async def check_order(client: AsyncClient, firstDeptCode: str,
hosCode: str, secondDeptCode: str) -> None:
"""检查是否有号
:param client:
:param firstDeptCode: 科室一级代码?
:param hosCode: 医院代码?
:param secondDeptCode: 科室二级代码?
:return:
"""
url = f'https://www.114yygh.com/web/product/list?_time={int(time.time() * 1000)}'
payload = {
'firstDeptCode': firstDeptCode,
'hosCode': hosCode,
'secondDeptCode': secondDeptCode,
'week': 1
}
r = await client.post(url, data=str(payload), headers=headers)
for data in r.json().get('data', {}).get('calendars', []):
if data.get('status') == 'AVAILABLE':
# TODO call mail()
pass
break
print(f'{datetime.now().strftime("%X")} success call check_order')
def listen_event(cors: Coroutine, args: tuple = (),
interval: Optional[float] = None) -> None:
"""向事件循环添加回调函数,实现定时任务
:param cors: 协程
:param args: 协程参数
:param interval: 非None时为定时事件
:return:
"""
loop = asyncio.get_event_loop()
loop.create_task(cors(*args))
if interval:
loop.call_later(interval, listen_event, cors, args, interval)
async def main():
# for m in ['手机号1', '手机号2', ]:
# client = await login(m)
# listen_event(
# check_order,
# (client, '75fec1a900e3d4c238cf384556de46de', '142', '200051666'),
# interval=30
# )
client = await login('155xxxxx609')
listen_event(
check_order,
(client, '75fec1a900e3d4c238cf384556de46de', '142', '200051666'),
interval=30
)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
忽然想到,我没有女朋友,还高高兴兴写个鸡儿代码?