Python代码requests改成 grequests
本帖最后由 sun4ay 于 2023-7-24 08:47 编辑改成每秒钟发包10次requests改成 grequests 还有就是时间能不能优化一下
import time
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import requests
from fake_useragent import UserAgent
import grequests
url = 'https://www.baidu.com'
ua = UserAgent()
headers = {
#'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.43',
'Tyauthtoken': '4FE86EA2C64241BCA0220AFB0C517C2F', # 改账号密码
"User-Agent": ua.random
}
json = {
1231231
123123
}
def execute_command(num):
#data = requests.post(url=url, headers=headers, json=json)
data = grequests.requests("post",url=url, headers=headers, json=json)
print(data.json())
status = data.json()['status']
utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
SHA_TZ = timezone(
timedelta(hours=8),
name='Asia/Shanghai',
)
beijing_now = utc_now.astimezone(SHA_TZ)
print(beijing_now)
now_fmt = beijing_now.strftime('%H:%M:%S:%f')
print(now_fmt)
if status == 301:
print("第{}请求失败".format(num))
num = num + 1
time.sleep(0.1)
execute_command(num)
elif status == 500:
print("第{}请求失败".format(num))
num = num + 1
time.sleep(1)
execute_command(num)
else:
print('成功')
def wait_until_8pm():
target_time = datetime.now().replace(hour=7, minute=2, second=59, microsecond=504500)
while True:
now = datetime.now()
if now >= target_time:
break
time.sleep(0.0001)
if __name__ == "__main__":
wait_until_8pm()
execute_command(1) # -*- coding: utf-8 -*-
import time
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import grequests
from fake_useragent import UserAgent
url = 'https://www.baidu.com'
ua = UserAgent()
headers = {
'Tyauthtoken': '4FE86EA2C64241BCA0220AFB0C517C2F',
"User-Agent": ua.random
}
json = {
# 1231231
# 123123
}
def execute_command(num=1):
reqs = [grequests.post(url=url, headers=headers, json=json)
for _ in range(num)]
resps = grequests.map(reqs)
for i, data in enumerate(resps):
content_type = data.headers.get('Content-Type', '').lower()
if data.status_code == 200 and content_type.startswith('application/json'):
json_data = data.json()
print(json_data)
if 'status' in json_data:
status = json_data['status']
utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
SHA_TZ = timezone(
timedelta(hours=8),
name='Asia/Shanghai',
)
beijing_now = utc_now.astimezone(SHA_TZ)
print(beijing_now)
now_fmt = beijing_now.strftime('%H:%M:%S:%f')
print(now_fmt)
if status == 301:
print(f"第{i + 1}个请求失败,状态码为301")
time.sleep(0.1)
elif status == 500:
print(f"第{i + 1}个请求失败,状态码为500")
time.sleep(1)
else:
print('成功')
return
else:
print(f"Error: json 响应里没有 'status' 字段")
else:
print(f"Error: 状态码 {data.status_code}, 响应格式 {content_type}")
def wait_until_8pm():
target_time = datetime.now().replace(hour=7, minute=2, second=59, microsecond=504500)
while True:
now = datetime.now()
if now >= target_time:
break
time.sleep(0.0001)
if __name__ == "__main__":
wait_until_8pm()
start_time = time.perf_counter()
execute_command(10)
end_time = time.perf_counter()
print(f"耗时: {end_time - start_time:.6f} 秒") 多线程试试 下次发帖用下代码编辑器 要利用grequests并发发请求
import grequests
req_list = [ # 请求列表
grequests.get('http://httpbin.org/get?a=1&b=2'),
grequests.post('http://httpbin.org/post', data={'a':1,'b':2}),
grequests.put('http://httpbin.org/post', json={'a': 1, 'b': 2}),
]
res_list = grequests.map(req_list) # 并行发送,等最后一个运行完后返回
print(res_list.text)# 打印第一个请求的响应文本 本帖最后由 sun4ay 于 2023-7-24 10:53 编辑
嗯我开的抓包工具 sun4ay 发表于 2023-7-24 10:38
C:%users\Administrator\PycharmProjects\pythonProject\venv\Scripts\python. ...
for i, data in enumerate(resps):
#改成:
fordata in resps: import time
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import requests
from fake_useragent import UserAgent
import grequests
url = 'https://www.baidu.com'
ua = UserAgent()
headers = {
'Tyauthtoken': '4FE86EA2C64241BCA0220AFB0C517C2F', # 改账号密码
"User-Agent": ua.random
}
json_data = {# 修改成正确的JSON格式
"key1": "value1",
"key2": "value2"
}
def execute_command(num):
for _ in range(10):
data = grequests.post(url=url, headers=headers, json=json_data)
response = data.send()
print(response.json())
status = response.json()['status']
if status == 301 or status == 500:
print("第{}请求失败".format(num))
num += 1
time.sleep(0.1)# 控制每秒钟发包10次
else:
print('成功')
def wait_until_8pm():
target_time = datetime.now().replace(hour=7, minute=2, second=59, microsecond=504500)
while True:
now = datetime.now()
if now >= target_time:
break
time.sleep(0.0001)
if __name__ == "__main__":
wait_until_8pm()
execute_command(1)
rwj1990 发表于 2023-7-24 11:25
import time
from datetime import datetime
from datetime import timedelta
C:\Users\Administrator\PycharmProjects\pythonProject\venv\Scripts\python.exe C:\Users\Administrator\PycharmProjects\pythonProject\123456.py
Traceback (most recent call last):
File "C:\Users\Administrator\PycharmProjects\pythonProject\123456.py", line 340, in <module>
execute_command(1)
File "C:\Users\Administrator\PycharmProjects\pythonProject\123456.py", line 321, in execute_command
print(response.json())
AttributeError: 'AsyncRequest' object has no attribute 'json'
rwj1990 发表于 2023-7-24 11:25
import time
from datetime import datetime
from datetime import timedelta
C:\Users\Administrator\PycharmProjects\pythonProject\venv\Scripts\python.exe C:\Users\Administrator\PycharmProjects\pythonProject\123456.py
Traceback (most recent call last):
File "C:\Users\Administrator\PycharmProjects\pythonProject\123456.py", line 340, in <module>
execute_command(1)
File "C:\Users\Administrator\PycharmProjects\pythonProject\123456.py", line 321, in execute_command
print(response.json())
AttributeError: 'AsyncRequest' object has no attribute 'json'
页:
[1]
2