吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 3446|回复: 8
收起左侧

[Python 转载] 【源码】TaskManger V.1.2批量ssh,sftp,支持多线程。

[复制链接]
We. 发表于 2021-4-30 15:39
本帖最后由 We. 于 2021-5-19 12:02 编辑

TaskManager V.1.2


更新了:
优化了sftp
支持传目录了,目录不存在自动创建。


先看源码中的注释说明再用

GitHub: https://github.com/Scorpio-F/DevOps-Tools
觉得还可以麻烦给点个星星。

源码:
[Python] 纯文本查看 复制代码
import paramiko
import os
import threading
import random
import time
import asyncio
from stat import S_ISDIR

"""
概述:
任务管理

使用说明:
使用ssh或者sftp,实例化Tasks类,并调用其中的ssh()方法刷命令,sftp_get/put()方法传文件/目录。
使用异步ping,实例化Tasks类,并调用asynchronous_ping()方法即可。
ip地址和命令可以提前复制在文件(一条一行)中然后调用get_info()方法读取

# Example:

import TaskManager
ip = TaskManager.get_info('/home/ipaddress/all_ip')
command = TaskManager.get_info('/home/command/command')
x = TaskManager.Tasks(ip=ip, command=command)
x.ssh(semaphore=10)

传参说明:
Tasks类初始化必须要传入ip地址列表和命令列表。 注意! 一个ip地址对应一个命令列表!如无需命令列表则command=None即可。
semaphore参数是信号量:多线程同时运行的最大线程数,默认为5。
source_path参数是本地文件/目录路径。
remote_path参数是远程文件/目录路径.
下载或者上传目录时,目录不存在会自动创建。

其他说明:
ssh执行完命令的结果日志默认存在/tmp/TaskManager/Results下。
ssh执行完命令的执行成功的ip、失败的ip、错误的日志默认存在/tmp/TaskManager/Info下。
ssh默认读取/root/.ssh/id_rsa下的密钥, username默认为root, 端口号默认22
"""


class TaskManager:
    def __init__(self):
        """
        初始化
        """
        self.username = 'root'
        self.port = 22
        self.pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
        self.datetime = time.strftime("%Y-%m-%d", time.localtime())
        self.Task_path = '/tmp/TaskManager/'

        self.Results_path = self.Task_path + '%s/Results/' % self.datetime
        self.Info_path = self.Task_path + '%s/Info/' % self.datetime
        self.Task_date_path = self.Task_path + self.datetime

    def sftp_client(self, ip):
        """
        sftp
        基于paramiko的sftp
        """
        client = paramiko.Transport(ip, self.port)
        client.connect(username=self.username, pkey=self.pkey)
        sftp = paramiko.SFTPClient.from_transport(client)
        return sftp

    def ssh_client(self, ip, command):
        """
        ssh
        基于paramiko的ssh
        """

        try:
            now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            print(now_time)

            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            client.connect(ip, self.port, self.username, pkey=self.pkey, banner_timeout=200)
            print('正在连接:%s' % ip)
            stdin, stdout, stderr = client.exec_command(command, get_pty=True)
            print('正在执行:%s' % command)
            result = stdout.read().decode('utf-8')

            with open(self.Results_path + '%s.log' % ip, 'a') as f:
                f.write('%s' % result)
                f.write('\n')

            with open(self.Info_path + 'access_ip', 'a') as access:
                access.write(ip)
                access.write('\n')
            client.close()
            print('执行完毕')

        except Exception as e:
            with open(self.Info_path + 'Error_log', 'a') as error:
                error.write('%s:%s' % (ip, e))
                error.write('\n')

            with open(self.Info_path + 'failed_ip', 'a') as failed:
                failed.write(ip)
                failed.write('\n')
            print('%s无法正常连接' % ip)

    def put_file(self, ip, remote_path, source_path):
        """
        上传文件、目录。
        :param ip: ip地址
        :param remote_path: 远程路径
        :param source_path: 本地路径
        :return: 不返回值
        """
        sftp = self.sftp_client(ip)

        try:
            S_ISDIR(sftp.stat(remote_path).st_mode)
        except Exception as e:
            if 'No such' in str(e):
                sftp.mkdir(remote_path)
            else:
                print(e)

        if os.path.exists(source_path) is True:
            for root, dirs, files in os.walk(source_path):
                sftp.mkdir(remote_path+'/'+root)
                for file in files:
                    sftp.put(localpath=root+'/'+file, remotepath=remote_path+'/'+root+'/'+file)
        else:
            print('请传入一个正确的目录')
        sftp.close()

    def get_file(self, ip, remote_path, destination_path):
        """
        下载文件,目录
        :param ip: ip地址
        :param remote_path: 远程目录/文件的路径
        :param destination_path: 目的路径
        :return: 不返回值
        """
        sftp = self.sftp_client(ip)

        if os.path.exists(destination_path) is False:
            os.makedirs(destination_path, exist_ok=True)

        try:  # 判断传入的路径是否为目录
            if S_ISDIR(sftp.stat(remote_path).st_mode) is True:  # 传入的路径为目录
                remote_directory = []
                self.check_directory(remote_path=remote_path, remote_directory=remote_directory, sftp=sftp)
                remote_directory.append(remote_path)

                if len(remote_directory) != 0:
                    for directory1 in remote_directory:
                        download_path1 = destination_path + directory1.replace(remote_path, '')
                        os.makedirs(download_path1, exist_ok=True)

                    for directory in remote_directory:
                        all_files = sftp.listdir(directory)
                        for files in all_files:
                            get_path = directory + '/' + files
                            download_path = destination_path + directory.replace(remote_path, '') + '/' + files

                            if S_ISDIR(sftp.stat(get_path).st_mode) is False:
                                sftp.get(remotepath=get_path, localpath=download_path)

            if S_ISDIR(sftp.stat(remote_path).st_mode) is False:  # 传入的为文件
                sftp.get(remote_path, destination_path)

        except IOError as e:
            print("%s,请传入一个正确的远程路径" % e)
        sftp.close()

    def check_directory(self, remote_path, sftp, remote_directory):
        """
        递归遍历远程路径的目录
        :param remote_path: 远程路径
        :param sftp: 必须先实例化一个sftp再传入
        :param remote_directory: 返回一个列表包含远程目录
        :return: 不返回值
        """
        x = sftp.listdir(remote_path)
        for i in x:
            try:
                if S_ISDIR(sftp.stat(remote_path + '/' + i).st_mode) is True:
                    self.check_directory(remote_path + '/' + i, sftp, remote_directory)
                    remote_directory.append(remote_path + '/' + i)

            except IOError:
                pass


class TasksThread(threading.Thread):
    """
    多线程
    封装信号量
    """

    def __init__(self, args=(), group=None, target=None, name=None, kwargs=None, *, daemon=None, semaphore):
        super(TasksThread, self).__init__(target=target, args=args, kwargs=kwargs)
        self.args = args
        self.kwargs = kwargs
        self.target = target
        self.semaphore = threading.BoundedSemaphore(semaphore)

    def run(self):
        self.semaphore.acquire()
        self.target(*self.args)
        self.semaphore.release()


class Tasks(TaskManager):
    """
    封装上面两个类,便于调用
    """

    def __init__(self, ip, command):
        super(Tasks, self).__init__()
        self.ip = ip
        self.command = command

        if os.path.exists(self.Task_path) is False:
            os.mkdir(self.Task_path)

        if os.path.exists(self.Task_date_path) is False:
            os.mkdir(self.Task_date_path)

        if os.path.exists(self.Results_path) is False:
            os.mkdir(self.Results_path)

        if os.path.exists(self.Info_path) is False:
            os.mkdir(self.Info_path)

    def ssh(self, semaphore=5):
        """
        多对一
        """
        lst = [TasksThread(target=TaskManager().ssh_client, args=(i, ';'.join(self.command),), semaphore=semaphore) for
               i in self.ip]
        for i in lst:
            i.start()

    def sftp_get(self, remote_path, destination_path, semaphore=5):
        """
        多对一
        """
        lst = [TasksThread(target=TaskManager().get_file, args=(i, remote_path, destination_path+'/%s' % i,), semaphore=semaphore) for i in self.ip]
        for i in lst:
            i.start()

    def sftp_put(self, remote_path, source_path, semaphore=5):
        """
        一对多
        """
        lst = [TasksThread(target=TaskManager().put_file, args=(i, remote_path, source_path,), semaphore=semaphore)
               for i in self.ip]
        for i in lst:
            i.start()

    # 异步ping
    async def ping(self, ip):
        await asyncio.create_subprocess_exec('ping', '-c', '5', '-w', '5', ip)

    async def tasks_ping(self):
        task = [self.ping(i) for i in self.ip]
        await asyncio.wait(task)

    def asynchronous_ping(self):
        start_time = time.time()
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.tasks_ping())
        loop.close()
        print('总共耗时: %.2f' % (time.time() - start_time))


def compare_list(list1, list2):
    difference_list = []
    for i in list1:
        if i in list2:
            difference_list.append(i)
    return difference_list


def get_info(path):
    """
    从文件中读取信息,按行读取
    可以读ip列表,也可以读取command
    需要传入文件路径
    返回一个列表可以传入至Task初始化用。
    """
    info_list = [i.replace('\n', '') for i in open(path, 'r').readlines()]
    return info_list


def password_random_generator():
    """
    随机密码生成器, 默认生成12位密码包含数字大小写字母符号
    """
    upper_letters = random.sample([chr(i + ord('A')) for i in range(26)], 3)
    lower_letters = random.sample([chr(i + ord('a')) for i in range(26)], 3)
    num = random.sample([str(i) for i in range(10)], 3)
    symbol = random.sample(['!', '@', '#', '$', '%', '*', '&', '-', '+'], 3)
    password = upper_letters + lower_letters + num + symbol
    random.shuffle(password)
    return ''.join(password)

免费评分

参与人数 2吾爱币 +2 热心值 +2 收起 理由
v.n.lee + 1 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!
joneqm + 1 + 1 用心讨论,共获提升!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

lamjiarong 发表于 2021-4-30 17:09
谢谢楼主分享,请问 python版本有要求吗?
 楼主| We. 发表于 2021-4-30 17:12
lamjiarong 发表于 2021-4-30 17:09
谢谢楼主分享,请问 python版本有要求吗?

我用的python3.7, python2肯定不行。  主要是paramiko这个模块是要自己装的,其他两个是内置的。
paramiko能装上就能用这个脚本。
alongzhenggang 发表于 2022-3-28 04:04
 楼主| We. 发表于 2022-3-28 09:28

这种狗屎一样的代码别看了, 我已经删了。
alongzhenggang 发表于 2022-3-28 09:42
We. 发表于 2022-3-28 09:28
这种狗屎一样的代码别看了, 我已经删了。

别自谦 真的   我看不懂
 楼主| We. 发表于 2022-3-31 10:04
alongzhenggang 发表于 2022-3-28 09:42
别自谦 真的   我看不懂

看不懂就对了, 这种水平的代码不值得学习。
zm55555 发表于 2022-3-31 10:18
谢谢分享!
alongzhenggang 发表于 2022-4-1 00:24
We. 发表于 2022-3-31 10:04
看不懂就对了, 这种水平的代码不值得学习。

我是认真的(//∇//)26个都认识那种
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-12 19:47

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表