本帖最后由 yitoumao 于 2023-3-9 23:32 编辑
[Python] 纯文本查看 复制代码
from concurrent.futures import ThreadPoolExecutor, wait # 引入线程池和等待函数
from datetime import datetime, timedelta # 引入日期时间和时间差
from selenium import webdriver # 引入selenium库中的webdriver
from selenium.webdriver.common.by import By # 引入webdriver中的By
import time # 引入时间模块
import threading # 引入线程模块
import subprocess # 引入子进程模块
# 获取设备名称的函数
def get_device_name(adb):
process = subprocess.Popen(adb, shell=True, stdout=subprocess.PIPE) # 执行adb命令
device_name = process.stdout.read().decode().strip() # 获取设备名称
return device_name
# 继承线程类,实现多线程
class AppiumThread(threading.Thread):
def __init__(self, port, version, adb):
threading.Thread.__init__(self) # 调用父类的构造函数
self.port = port # appium服务的端口号
self.version = version # 安卓系统的版本号
self.adb = adb # adb命令
self.result = None # 线程执行结果
def run(self):
if self.port == 4723: # 判断端口号,获取设备名称
device_name = get_device_name(self.adb[0])
elif self.port == 4725:
device_name = get_device_name(self.adb[1])
else:
raise ValueError(f"请检查输入的端口: {self.port},是否对应的appium服务已启动")
# 配置webdriver参数
desired_caps = {
"platformName": 'Android',
"platformVersion": str(self.version),
"appPackage": 'xxxxxxxx',
'deviceName': device_name,
"appActivity": '.splash.Login',
"noReset": "false"
}
driver = None # 初始化webdriver
while not driver: # 循环直到获取到webdriver
try:
driver = webdriver.Remote(f"http://127.0.0.1:{self.port}/wd/hub", desired_caps) # 获取webdriver
except:
print(f"Appium服务{self.port}未启动,等待5秒钟后重试...")
time.sleep(5)
time.sleep(2) # 等待2秒
driver.find_element(By.ID, 'xxxxxxxxxxx').click() # 点击同意按钮
if self.port == 4723:
driver.implicitly_wait(20) # 隐式等待20秒
else:
time.sleep(5) # 等待5秒
driver.quit() # 关闭webdriver
time.sleep(5) # 等待5秒
self.result = run_agree(self.adb) # 执行其他函数并返回结果
if __name__ == '__main__':
new_time = datetime.now() + timedelta(minutes=1) # 获取1分钟后的时间
adb_list = ['adb -s 34438ffa shell getprop ro.product.model', 'adb -s JTK5T19917014403 shell getprop ro.product.model'] # adb命令列表
threads = [] # 线程列表
success_count = 0 # 成功次数
fail_count = 0 # 失败次数
with ThreadPoolExecutor(max_workers=len(adb_list)) as executor: # 创建线程池
while datetime.now() <= new_time: # 循环直到时间到达
for port in [4723, 4725]:
thread = AppiumThread(port, 13 if port == 4723 else 12, adb_list) # 创建线程
threads.append(thread) # 将线程加入线程列表
executor.submit(thread.run) # 提交线程到线程池中执行
# 等待所有任务完成
wait([thread.result for thread in threads])
for thread in threads: # 遍历线程列表
if thread.result == "True":
success_count += 1 # 统计成功次数
else:
fail_count += 1 # 统计失败次数
print("成功次数:", success_count)
print("失败次数:", fail_count) # 输出结果
Python 想要实现的功能:在一定时间内多线程启动Appium服务进行操作,操作后调用其他方法并返回结果,统计每一次的执行结果,返回True成功+1 返回Flase失败+1 ,最后统计总成功和失败次数 |