【源码】TaskManger V.1.2批量ssh,sftp,支持多线程。
本帖最后由 We. 于 2021-5-19 12:02 编辑TaskManager V.1.2
更新了:
优化了sftp
支持传目录了,目录不存在自动创建。
先看源码中的注释说明再用
GitHub: https://github.com/Scorpio-F/DevOps-Tools
觉得还可以麻烦给点个星星。
源码:
import paramiko
import os
import threading
import random
import time
import asyncio
from stat import S_ISDIR
"""
概述:
任务管理
使用说明:
使用ssh或者sftp,实例化Tasks类,并调用其中的ssh()方法刷命令,sftp_get/put()方法传文件/目录。
使用异步ping,实例化Tasks类,并调用asynchronous_ping()方法即可。
ip地址和命令可以提前复制在文件(一条一行)中然后调用get_info()方法读取
# Example:
import TaskManager
ip = TaskManager.get_info('/home/ipaddress/all_ip')
command = TaskManager.get_info('/home/command/command')
x = TaskManager.Tasks(ip=ip, command=command)
x.ssh(semaphore=10)
传参说明:
Tasks类初始化必须要传入ip地址列表和命令列表。 注意! 一个ip地址对应一个命令列表!如无需命令列表则command=None即可。
semaphore参数是信号量:多线程同时运行的最大线程数,默认为5。
source_path参数是本地文件/目录路径。
remote_path参数是远程文件/目录路径.
下载或者上传目录时,目录不存在会自动创建。
其他说明:
ssh执行完命令的结果日志默认存在/tmp/TaskManager/Results下。
ssh执行完命令的执行成功的ip、失败的ip、错误的日志默认存在/tmp/TaskManager/Info下。
ssh默认读取/root/.ssh/id_rsa下的密钥, username默认为root, 端口号默认22
"""
class TaskManager:
def __init__(self):
"""
初始化
"""
self.username = 'root'
self.port = 22
self.pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
self.datetime = time.strftime("%Y-%m-%d", time.localtime())
self.Task_path = '/tmp/TaskManager/'
self.Results_path = self.Task_path + '%s/Results/' % self.datetime
self.Info_path = self.Task_path + '%s/Info/' % self.datetime
self.Task_date_path = self.Task_path + self.datetime
def sftp_client(self, ip):
"""
sftp
基于paramiko的sftp
"""
client = paramiko.Transport(ip, self.port)
client.connect(username=self.username, pkey=self.pkey)
sftp = paramiko.SFTPClient.from_transport(client)
return sftp
def ssh_client(self, ip, command):
"""
ssh
基于paramiko的ssh
"""
try:
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(now_time)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, self.port, self.username, pkey=self.pkey, banner_timeout=200)
print('正在连接:%s' % ip)
stdin, stdout, stderr = client.exec_command(command, get_pty=True)
print('正在执行:%s' % command)
result = stdout.read().decode('utf-8')
with open(self.Results_path + '%s.log' % ip, 'a') as f:
f.write('%s' % result)
f.write('\n')
with open(self.Info_path + 'access_ip', 'a') as access:
access.write(ip)
access.write('\n')
client.close()
print('执行完毕')
except Exception as e:
with open(self.Info_path + 'Error_log', 'a') as error:
error.write('%s:%s' % (ip, e))
error.write('\n')
with open(self.Info_path + 'failed_ip', 'a') as failed:
failed.write(ip)
failed.write('\n')
print('%s无法正常连接' % ip)
def put_file(self, ip, remote_path, source_path):
"""
上传文件、目录。
:param ip: ip地址
:param remote_path: 远程路径
:param source_path: 本地路径
:return: 不返回值
"""
sftp = self.sftp_client(ip)
try:
S_ISDIR(sftp.stat(remote_path).st_mode)
except Exception as e:
if 'No such' in str(e):
sftp.mkdir(remote_path)
else:
print(e)
if os.path.exists(source_path) is True:
for root, dirs, files in os.walk(source_path):
sftp.mkdir(remote_path+'/'+root)
for file in files:
sftp.put(localpath=root+'/'+file, remotepath=remote_path+'/'+root+'/'+file)
else:
print('请传入一个正确的目录')
sftp.close()
def get_file(self, ip, remote_path, destination_path):
"""
下载文件,目录
:param ip: ip地址
:param remote_path: 远程目录/文件的路径
:param destination_path: 目的路径
:return: 不返回值
"""
sftp = self.sftp_client(ip)
if os.path.exists(destination_path) is False:
os.makedirs(destination_path, exist_ok=True)
try:# 判断传入的路径是否为目录
if S_ISDIR(sftp.stat(remote_path).st_mode) is True:# 传入的路径为目录
remote_directory = []
self.check_directory(remote_path=remote_path, remote_directory=remote_directory, sftp=sftp)
remote_directory.append(remote_path)
if len(remote_directory) != 0:
for directory1 in remote_directory:
download_path1 = destination_path + directory1.replace(remote_path, '')
os.makedirs(download_path1, exist_ok=True)
for directory in remote_directory:
all_files = sftp.listdir(directory)
for files in all_files:
get_path = directory + '/' + files
download_path = destination_path + directory.replace(remote_path, '') + '/' + files
if S_ISDIR(sftp.stat(get_path).st_mode) is False:
sftp.get(remotepath=get_path, localpath=download_path)
if S_ISDIR(sftp.stat(remote_path).st_mode) is False:# 传入的为文件
sftp.get(remote_path, destination_path)
except IOError as e:
print("%s,请传入一个正确的远程路径" % e)
sftp.close()
def check_directory(self, remote_path, sftp, remote_directory):
"""
递归遍历远程路径的目录
:param remote_path: 远程路径
:param sftp: 必须先实例化一个sftp再传入
:param remote_directory: 返回一个列表包含远程目录
:return: 不返回值
"""
x = sftp.listdir(remote_path)
for i in x:
try:
if S_ISDIR(sftp.stat(remote_path + '/' + i).st_mode) is True:
self.check_directory(remote_path + '/' + i, sftp, remote_directory)
remote_directory.append(remote_path + '/' + i)
except IOError:
pass
class TasksThread(threading.Thread):
"""
多线程
封装信号量
"""
def __init__(self, args=(), group=None, target=None, name=None, kwargs=None, *, daemon=None, semaphore):
super(TasksThread, self).__init__(target=target, args=args, kwargs=kwargs)
self.args = args
self.kwargs = kwargs
self.target = target
self.semaphore = threading.BoundedSemaphore(semaphore)
def run(self):
self.semaphore.acquire()
self.target(*self.args)
self.semaphore.release()
class Tasks(TaskManager):
"""
封装上面两个类,便于调用
"""
def __init__(self, ip, command):
super(Tasks, self).__init__()
self.ip = ip
self.command = command
if os.path.exists(self.Task_path) is False:
os.mkdir(self.Task_path)
if os.path.exists(self.Task_date_path) is False:
os.mkdir(self.Task_date_path)
if os.path.exists(self.Results_path) is False:
os.mkdir(self.Results_path)
if os.path.exists(self.Info_path) is False:
os.mkdir(self.Info_path)
def ssh(self, semaphore=5):
"""
多对一
"""
lst = [TasksThread(target=TaskManager().ssh_client, args=(i, ';'.join(self.command),), semaphore=semaphore) for
i in self.ip]
for i in lst:
i.start()
def sftp_get(self, remote_path, destination_path, semaphore=5):
"""
多对一
"""
lst =
for i in lst:
i.start()
def sftp_put(self, remote_path, source_path, semaphore=5):
"""
一对多
"""
lst = [TasksThread(target=TaskManager().put_file, args=(i, remote_path, source_path,), semaphore=semaphore)
for i in self.ip]
for i in lst:
i.start()
# 异步ping
async def ping(self, ip):
await asyncio.create_subprocess_exec('ping', '-c', '5', '-w', '5', ip)
async def tasks_ping(self):
task =
await asyncio.wait(task)
def asynchronous_ping(self):
start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(self.tasks_ping())
loop.close()
print('总共耗时: %.2f' % (time.time() - start_time))
def compare_list(list1, list2):
difference_list = []
for i in list1:
if i in list2:
difference_list.append(i)
return difference_list
def get_info(path):
"""
从文件中读取信息,按行读取
可以读ip列表,也可以读取command
需要传入文件路径
返回一个列表可以传入至Task初始化用。
"""
info_list =
return info_list
def password_random_generator():
"""
随机密码生成器, 默认生成12位密码包含数字大小写字母符号
"""
upper_letters = random.sample(, 3)
lower_letters = random.sample(, 3)
num = random.sample(, 3)
symbol = random.sample(['!', '@', '#', '$', '%', '*', '&', '-', '+'], 3)
password = upper_letters + lower_letters + num + symbol
random.shuffle(password)
return ''.join(password)
谢谢楼主分享,请问 python版本有要求吗? lamjiarong 发表于 2021-4-30 17:09
谢谢楼主分享,请问 python版本有要求吗?
我用的python3.7, python2肯定不行。主要是paramiko这个模块是要自己装的,其他两个是内置的。
paramiko能装上就能用这个脚本。 GitHub好难链接 alongzhenggang 发表于 2022-3-28 04:04
GitHub好难链接
这种狗屎一样的代码别看了, 我已经删了。 We. 发表于 2022-3-28 09:28
这种狗屎一样的代码别看了, 我已经删了。
别自谦 真的 我看不懂{:301_999:} alongzhenggang 发表于 2022-3-28 09:42
别自谦 真的 我看不懂
看不懂就对了, 这种水平的代码不值得学习。 谢谢分享! We. 发表于 2022-3-31 10:04
看不懂就对了, 这种水平的代码不值得学习。
{:1_926:}我是认真的(//∇//)26个都认识那种
页:
[1]