吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1010|回复: 14
收起左侧

[求助] Python代码requests改成 grequests

  [复制链接]
sun4ay 发表于 2023-7-24 08:15
25吾爱币
本帖最后由 sun4ay 于 2023-7-24 08:47 编辑

改成每秒钟发包10次  requests改成 grequests 还有就是时间能不能优化一下
[Python] 纯文本查看 复制代码
import time
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import requests
from fake_useragent import UserAgent
import grequests
url = 'https://www.baidu.com'
ua = UserAgent()
headers = {
  #  'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.43',
    'Tyauthtoken': '4FE86EA2C64241BCA0220AFB0C517C2F', # 改账号密码
    "User-Agent": ua.random
}


json = {


1231231
123123
}
def execute_command(num):
    #data = requests.post(url=url, headers=headers, json=json)
    data = grequests.requests("post",url=url, headers=headers, json=json)
    print(data.json())
    status = data.json()['status']
    utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)


    SHA_TZ = timezone(
        timedelta(hours=8),
        name='Asia/Shanghai',
    )
    beijing_now = utc_now.astimezone(SHA_TZ)
    print(beijing_now)
    now_fmt = beijing_now.strftime('%H:%M:%S:%f')
    print(now_fmt)


    if status == 301:
        print("第{}请求失败".format(num))
        num = num + 1
        time.sleep(0.1)
        execute_command(num)
    elif status == 500:
        print("第{}请求失败".format(num))
        num = num + 1
        time.sleep(1)
        execute_command(num)
    else:
        print('成功')




def wait_until_8pm():
    target_time = datetime.now().replace(hour=7, minute=2, second=59, microsecond=504500)
    while True:
        now = datetime.now()
        if now >= target_time:
            break
        time.sleep(0.0001)




if __name__ == "__main__":
    wait_until_8pm()
    execute_command(1)

最佳答案

查看完整内容

[mw_shl_code=python,true]# -*- coding: utf-8 -*- import time from datetime import datetime from datetime import timedelta from datetime import timezone import grequests from fake_useragent import UserAgent url = 'https://www.baidu.com' ua = UserAgent() headers = { 'Tyauthtoken': '4FE86EA2C64241BCA0220AFB0C517C2F', "User-Agent": ua.random } json = { # 1231231 # ...

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

hxhnazx 发表于 2023-7-24 08:15
[Python] 纯文本查看 复制代码
# -*- coding: utf-8 -*-

import time
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import grequests
from fake_useragent import UserAgent

url = 'https://www.baidu.com'
ua = UserAgent()
headers = {
    'Tyauthtoken': '4FE86EA2C64241BCA0220AFB0C517C2F',
    "User-Agent": ua.random
}

json = {
    # 1231231
    # 123123
}


def execute_command(num=1):
    reqs = [grequests.post(url=url, headers=headers, json=json)
            for _ in range(num)]
    resps = grequests.map(reqs)
    for i, data in enumerate(resps):
        content_type = data.headers.get('Content-Type', '').lower()
        if data.status_code == 200 and content_type.startswith('application/json'):
            json_data = data.json()
            print(json_data)
            if 'status' in json_data:
                status = json_data['status']
                utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)

                SHA_TZ = timezone(
                    timedelta(hours=8),
                    name='Asia/Shanghai',
                )
                beijing_now = utc_now.astimezone(SHA_TZ)
                print(beijing_now)
                now_fmt = beijing_now.strftime('%H:%M:%S:%f')
                print(now_fmt)

                if status == 301:
                    print(f"第{i + 1}个请求失败,状态码为301")
                    time.sleep(0.1)
                elif status == 500:
                    print(f"第{i + 1}个请求失败,状态码为500")
                    time.sleep(1)
                else:
                    print('成功')
                    return
            else:
                print(f"Error: json 响应里没有 'status' 字段")
        else:
            print(f"Error: 状态码 {data.status_code}, 响应格式 {content_type}")


def wait_until_8pm():
    target_time = datetime.now().replace(hour=7, minute=2, second=59, microsecond=504500)
    while True:
        now = datetime.now()
        if now >= target_time:
            break
        time.sleep(0.0001)


if __name__ == "__main__":
    wait_until_8pm()
    start_time = time.perf_counter()
    execute_command(10)
    end_time = time.perf_counter()
    print(f"耗时: {end_time - start_time:.6f} 秒")
qq632280928 发表于 2023-7-24 08:22
一只大菜猫 发表于 2023-7-24 08:38
sunsjw 发表于 2023-7-24 09:01
要利用grequests并发发请求
[Python] 纯文本查看 复制代码
import grequests

req_list = [   # 请求列表
    grequests.get('http://httpbin.org/get?a=1&b=2'),
    grequests.post('http://httpbin.org/post', data={'a':1,'b':2}),
    grequests.put('http://httpbin.org/post', json={'a': 1, 'b': 2}),
]

res_list = grequests.map(req_list)    # 并行发送,等最后一个运行完后返回
print(res_list[0].text)  # 打印第一个请求的响应文本

免费评分

参与人数 1吾爱币 +1 热心值 +1 收起 理由
yaoziqi + 1 + 1 学习了,知识又扩充了

查看全部评分

 楼主| sun4ay 发表于 2023-7-24 10:38
本帖最后由 sun4ay 于 2023-7-24 10:53 编辑

嗯我开的抓包工具
sunsjw 发表于 2023-7-24 10:53
sun4ay 发表于 2023-7-24 10:38
[mw_shl_code=python,true]C:%users\Administrator\PycharmProjects\pythonProject\venv\Scripts\python. ...

for i, data in enumerate(resps):

#改成:
for  data in resps:
rwj1990 发表于 2023-7-24 11:25
import time
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import requests
from fake_useragent import UserAgent
import grequests

url = 'https://www.baidu.com'
ua = UserAgent()
headers = {
    'Tyauthtoken': '4FE86EA2C64241BCA0220AFB0C517C2F', # 改账号密码
    "User-Agent": ua.random
}

json_data = {  # 修改成正确的JSON格式
    "key1": "value1",
    "key2": "value2"
}

def execute_command(num):
    for _ in range(10):
        data = grequests.post(url=url, headers=headers, json=json_data)
        response = data.send()
        print(response.json())
        status = response.json()['status']
        if status == 301 or status == 500:
            print("第{}请求失败".format(num))
            num += 1
            time.sleep(0.1)  # 控制每秒钟发包10次
        else:
            print('成功')

def wait_until_8pm():
    target_time = datetime.now().replace(hour=7, minute=2, second=59, microsecond=504500)
    while True:
        now = datetime.now()
        if now >= target_time:
            break
        time.sleep(0.0001)

if __name__ == "__main__":
    wait_until_8pm()
    execute_command(1)
 楼主| sun4ay 发表于 2023-7-24 11:37
rwj1990 发表于 2023-7-24 11:25
import time
from datetime import datetime
from datetime import timedelta

[Python] 纯文本查看 复制代码
C:\Users\Administrator\PycharmProjects\pythonProject\venv\Scripts\python.exe C:\Users\Administrator\PycharmProjects\pythonProject\123456.py 
Traceback (most recent call last):
  File "C:\Users\Administrator\PycharmProjects\pythonProject\123456.py", line 340, in <module>
    execute_command(1)
  File "C:\Users\Administrator\PycharmProjects\pythonProject\123456.py", line 321, in execute_command
    print(response.json())
AttributeError: 'AsyncRequest' object has no attribute 'json'
 楼主| sun4ay 发表于 2023-7-24 11:38
rwj1990 发表于 2023-7-24 11:25
import time
from datetime import datetime
from datetime import timedelta

[Python] 纯文本查看 复制代码
C:\Users\Administrator\PycharmProjects\pythonProject\venv\Scripts\python.exe C:\Users\Administrator\PycharmProjects\pythonProject\123456.py 
Traceback (most recent call last):
  File "C:\Users\Administrator\PycharmProjects\pythonProject\123456.py", line 340, in <module>
    execute_command(1)
  File "C:\Users\Administrator\PycharmProjects\pythonProject\123456.py", line 321, in execute_command
    print(response.json())
AttributeError: 'AsyncRequest' object has no attribute 'json'
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-24 20:44

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表