We. 发表于 2021-4-30 15:39

【源码】TaskManger V.1.2批量ssh,sftp,支持多线程。

本帖最后由 We. 于 2021-5-19 12:02 编辑

TaskManager V.1.2


更新了:
优化了sftp
支持传目录了,目录不存在自动创建。


先看源码中的注释说明再用

GitHub: https://github.com/Scorpio-F/DevOps-Tools
觉得还可以麻烦给点个星星。

源码:
import paramiko
import os
import threading
import random
import time
import asyncio
from stat import S_ISDIR

"""
概述:
任务管理

使用说明:
使用ssh或者sftp,实例化Tasks类,并调用其中的ssh()方法刷命令,sftp_get/put()方法传文件/目录。
使用异步ping,实例化Tasks类,并调用asynchronous_ping()方法即可。
ip地址和命令可以提前复制在文件(一条一行)中然后调用get_info()方法读取

# Example:

import TaskManager
ip = TaskManager.get_info('/home/ipaddress/all_ip')
command = TaskManager.get_info('/home/command/command')
x = TaskManager.Tasks(ip=ip, command=command)
x.ssh(semaphore=10)

传参说明:
Tasks类初始化必须要传入ip地址列表和命令列表。 注意! 一个ip地址对应一个命令列表!如无需命令列表则command=None即可。
semaphore参数是信号量:多线程同时运行的最大线程数,默认为5。
source_path参数是本地文件/目录路径。
remote_path参数是远程文件/目录路径.
下载或者上传目录时,目录不存在会自动创建。

其他说明:
ssh执行完命令的结果日志默认存在/tmp/TaskManager/Results下。
ssh执行完命令的执行成功的ip、失败的ip、错误的日志默认存在/tmp/TaskManager/Info下。
ssh默认读取/root/.ssh/id_rsa下的密钥, username默认为root, 端口号默认22
"""


class TaskManager:
    def __init__(self):
      """
      初始化
      """
      self.username = 'root'
      self.port = 22
      self.pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
      self.datetime = time.strftime("%Y-%m-%d", time.localtime())
      self.Task_path = '/tmp/TaskManager/'

      self.Results_path = self.Task_path + '%s/Results/' % self.datetime
      self.Info_path = self.Task_path + '%s/Info/' % self.datetime
      self.Task_date_path = self.Task_path + self.datetime

    def sftp_client(self, ip):
      """
      sftp
      基于paramiko的sftp
      """
      client = paramiko.Transport(ip, self.port)
      client.connect(username=self.username, pkey=self.pkey)
      sftp = paramiko.SFTPClient.from_transport(client)
      return sftp

    def ssh_client(self, ip, command):
      """
      ssh
      基于paramiko的ssh
      """

      try:
            now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            print(now_time)

            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            client.connect(ip, self.port, self.username, pkey=self.pkey, banner_timeout=200)
            print('正在连接:%s' % ip)
            stdin, stdout, stderr = client.exec_command(command, get_pty=True)
            print('正在执行:%s' % command)
            result = stdout.read().decode('utf-8')

            with open(self.Results_path + '%s.log' % ip, 'a') as f:
                f.write('%s' % result)
                f.write('\n')

            with open(self.Info_path + 'access_ip', 'a') as access:
                access.write(ip)
                access.write('\n')
            client.close()
            print('执行完毕')

      except Exception as e:
            with open(self.Info_path + 'Error_log', 'a') as error:
                error.write('%s:%s' % (ip, e))
                error.write('\n')

            with open(self.Info_path + 'failed_ip', 'a') as failed:
                failed.write(ip)
                failed.write('\n')
            print('%s无法正常连接' % ip)

    def put_file(self, ip, remote_path, source_path):
      """
      上传文件、目录。
      :param ip: ip地址
      :param remote_path: 远程路径
      :param source_path: 本地路径
      :return: 不返回值
      """
      sftp = self.sftp_client(ip)

      try:
            S_ISDIR(sftp.stat(remote_path).st_mode)
      except Exception as e:
            if 'No such' in str(e):
                sftp.mkdir(remote_path)
            else:
                print(e)

      if os.path.exists(source_path) is True:
            for root, dirs, files in os.walk(source_path):
                sftp.mkdir(remote_path+'/'+root)
                for file in files:
                  sftp.put(localpath=root+'/'+file, remotepath=remote_path+'/'+root+'/'+file)
      else:
            print('请传入一个正确的目录')
      sftp.close()

    def get_file(self, ip, remote_path, destination_path):
      """
      下载文件,目录
      :param ip: ip地址
      :param remote_path: 远程目录/文件的路径
      :param destination_path: 目的路径
      :return: 不返回值
      """
      sftp = self.sftp_client(ip)

      if os.path.exists(destination_path) is False:
            os.makedirs(destination_path, exist_ok=True)

      try:# 判断传入的路径是否为目录
            if S_ISDIR(sftp.stat(remote_path).st_mode) is True:# 传入的路径为目录
                remote_directory = []
                self.check_directory(remote_path=remote_path, remote_directory=remote_directory, sftp=sftp)
                remote_directory.append(remote_path)

                if len(remote_directory) != 0:
                  for directory1 in remote_directory:
                        download_path1 = destination_path + directory1.replace(remote_path, '')
                        os.makedirs(download_path1, exist_ok=True)

                  for directory in remote_directory:
                        all_files = sftp.listdir(directory)
                        for files in all_files:
                            get_path = directory + '/' + files
                            download_path = destination_path + directory.replace(remote_path, '') + '/' + files

                            if S_ISDIR(sftp.stat(get_path).st_mode) is False:
                              sftp.get(remotepath=get_path, localpath=download_path)

            if S_ISDIR(sftp.stat(remote_path).st_mode) is False:# 传入的为文件
                sftp.get(remote_path, destination_path)

      except IOError as e:
            print("%s,请传入一个正确的远程路径" % e)
      sftp.close()

    def check_directory(self, remote_path, sftp, remote_directory):
      """
      递归遍历远程路径的目录
      :param remote_path: 远程路径
      :param sftp: 必须先实例化一个sftp再传入
      :param remote_directory: 返回一个列表包含远程目录
      :return: 不返回值
      """
      x = sftp.listdir(remote_path)
      for i in x:
            try:
                if S_ISDIR(sftp.stat(remote_path + '/' + i).st_mode) is True:
                  self.check_directory(remote_path + '/' + i, sftp, remote_directory)
                  remote_directory.append(remote_path + '/' + i)

            except IOError:
                pass


class TasksThread(threading.Thread):
    """
    多线程
    封装信号量
    """

    def __init__(self, args=(), group=None, target=None, name=None, kwargs=None, *, daemon=None, semaphore):
      super(TasksThread, self).__init__(target=target, args=args, kwargs=kwargs)
      self.args = args
      self.kwargs = kwargs
      self.target = target
      self.semaphore = threading.BoundedSemaphore(semaphore)

    def run(self):
      self.semaphore.acquire()
      self.target(*self.args)
      self.semaphore.release()


class Tasks(TaskManager):
    """
    封装上面两个类,便于调用
    """

    def __init__(self, ip, command):
      super(Tasks, self).__init__()
      self.ip = ip
      self.command = command

      if os.path.exists(self.Task_path) is False:
            os.mkdir(self.Task_path)

      if os.path.exists(self.Task_date_path) is False:
            os.mkdir(self.Task_date_path)

      if os.path.exists(self.Results_path) is False:
            os.mkdir(self.Results_path)

      if os.path.exists(self.Info_path) is False:
            os.mkdir(self.Info_path)

    def ssh(self, semaphore=5):
      """
      多对一
      """
      lst = [TasksThread(target=TaskManager().ssh_client, args=(i, ';'.join(self.command),), semaphore=semaphore) for
               i in self.ip]
      for i in lst:
            i.start()

    def sftp_get(self, remote_path, destination_path, semaphore=5):
      """
      多对一
      """
      lst =
      for i in lst:
            i.start()

    def sftp_put(self, remote_path, source_path, semaphore=5):
      """
      一对多
      """
      lst = [TasksThread(target=TaskManager().put_file, args=(i, remote_path, source_path,), semaphore=semaphore)
               for i in self.ip]
      for i in lst:
            i.start()

    # 异步ping
    async def ping(self, ip):
      await asyncio.create_subprocess_exec('ping', '-c', '5', '-w', '5', ip)

    async def tasks_ping(self):
      task =
      await asyncio.wait(task)

    def asynchronous_ping(self):
      start_time = time.time()
      loop = asyncio.get_event_loop()
      loop.run_until_complete(self.tasks_ping())
      loop.close()
      print('总共耗时: %.2f' % (time.time() - start_time))


def compare_list(list1, list2):
    difference_list = []
    for i in list1:
      if i in list2:
            difference_list.append(i)
    return difference_list


def get_info(path):
    """
    从文件中读取信息,按行读取
    可以读ip列表,也可以读取command
    需要传入文件路径
    返回一个列表可以传入至Task初始化用。
    """
    info_list =
    return info_list


def password_random_generator():
    """
    随机密码生成器, 默认生成12位密码包含数字大小写字母符号
    """
    upper_letters = random.sample(, 3)
    lower_letters = random.sample(, 3)
    num = random.sample(, 3)
    symbol = random.sample(['!', '@', '#', '$', '%', '*', '&', '-', '+'], 3)
    password = upper_letters + lower_letters + num + symbol
    random.shuffle(password)
    return ''.join(password)

lamjiarong 发表于 2021-4-30 17:09

谢谢楼主分享,请问 python版本有要求吗?

We. 发表于 2021-4-30 17:12

lamjiarong 发表于 2021-4-30 17:09
谢谢楼主分享,请问 python版本有要求吗?

我用的python3.7, python2肯定不行。主要是paramiko这个模块是要自己装的,其他两个是内置的。
paramiko能装上就能用这个脚本。

alongzhenggang 发表于 2022-3-28 04:04

GitHub好难链接

We. 发表于 2022-3-28 09:28

alongzhenggang 发表于 2022-3-28 04:04
GitHub好难链接

这种狗屎一样的代码别看了, 我已经删了。

alongzhenggang 发表于 2022-3-28 09:42

We. 发表于 2022-3-28 09:28
这种狗屎一样的代码别看了, 我已经删了。

别自谦 真的   我看不懂{:301_999:}

We. 发表于 2022-3-31 10:04

alongzhenggang 发表于 2022-3-28 09:42
别自谦 真的   我看不懂

看不懂就对了, 这种水平的代码不值得学习。

zm55555 发表于 2022-3-31 10:18

谢谢分享!

alongzhenggang 发表于 2022-4-1 00:24

We. 发表于 2022-3-31 10:04
看不懂就对了, 这种水平的代码不值得学习。

{:1_926:}我是认真的(//∇//)26个都认识那种
页: [1]
查看完整版本: 【源码】TaskManger V.1.2批量ssh,sftp,支持多线程。