xiaolinge566 发表于 2024-9-25 13:09

批量检测互联网IP+端口的连通性

本帖最后由 xiaolinge566 于 2024-9-26 08:44 编辑

由于运维工作中需要对不同的系统应用的互联网IP和端口进行检测是否连通,所以写了这个python脚本。分享给有需要的伙伴。
1.在iplist.csv中维护好IP、port、所属系统、站点描述、系统站点等信息;

2.检测结果输出到iplist_out.csv中。
以下是源代码部分


```python
import socket
import csv
import time
socket.setdefaulttimeout(3)


class NetworkCheck():
    def __init__(self,ipfile,outfile):
      self.ipfile = ipfile
      self.outfile = outfile
      #self.timestr = time.strftime('%Y-%m-%d %H:%M:%S %p')
    #------IP端口检测-----
    def socketTest(self,ip,port):
      #res = ''
      port = int(port)
      timestr = time.strftime('%Y-%m-%d %H:%M:%S %p')
      #print('%s - CONNECTING %s:%i' % (timestr, ip, port))
      #print('%s - CONNECTING %s:%i \n' % (timestr, ip, port))
      if port > 65536:
            print('--Error: Port should less than 65535')
            res = 'Error'
      else:
            try:
                for num in range(3):    # 循环检测3次
                  s = socket.socket()
                  result = s.connect_ex((ip, port))
                  if result == 0:
                        print('{}:{}--OK'.format(ip,port))
                        res = 'OK'
                        break   # 如果检测成功,则跳出循环
                  else:
                        print('{}:{}--FAIL'.format(ip,port))
                        res = 'FAIL'
                        continue    # 如果检测不通则间隔10秒,继续检测
            except Exception as e:
                print('异常未执行:{}'.format(e))
                res = '异常未执行'
            finally:
                s.close()
      return res
    #----------执行函数------------------
    def main(self):
      csvfile = csv.reader(open(self.ipfile, 'r'))
      ipInput = []
      ipOut = []
      # READ FROM INPUT CSV FILE
      for row in csvfile:
            if row.lower() == 'ip':
                continue
            else:
                ipInput.append((row, row, row, row, row))

      # TEST CONNECTION
      for ip, port, soft, detial, node in ipInput:
            timestr = time.strftime('%Y-%m-%d %H:%M:%S %p')# 当前时间
            res = self.socketTest(ip, port)
            #print(res)
            if res == 'FAIL':
                ipOut.append()

      # WRITE TO OUTPUT CSV FILE

      out = open(self.outfile, 'w', newline='')
      csv_write = csv.writer(out, dialect='excel')
      csv_write.writerow(['所属系统', '系统站点', '站点描述', 'ip', 'port', '测试结果', '测试时间'])
      for item in ipOut:
            csv_write.writerow(item)
      out.close()

      print("OUTPUT FILE: %s\n" % self.outfile)
#------主函数-----
if __name__ == '__main__':
    ipfile = 'iplist.csv'
    outfile = 'iplist_out.csv'
    start_time = time.time()
    so = NetworkCheck(ipfile,outfile)
    re = so.main()
```

lzy1995 发表于 2024-9-30 10:41

这是根据版主的代码改进的代码,可以直接在窗口显示
import socket
import csv
import time

socket.setdefaulttimeout(3)

class NetworkCheck():
    def __init__(self, ipfile):
      self.ipfile = ipfile

    # ------IP 端口检测-----
    def socketTest(self, ip, port):
      port = int(port)
      timestr = time.strftime('%Y-%m-%d %H:%M:%S %p')
      if port > 65536:
            print('--Error: Port should less than 65535')
            res = 'Error'
      else:
            try:
                for num in range(3):    # 循环检测 3 次
                  s = socket.socket()
                  result = s.connect_ex((ip, port))
                  if result == 0:
                        print(f'{timestr}: {ip}:{port} -- OK')
                        res = 'OK'
                        break   # 如果检测成功,则跳出循环
                  else:
                        print(f'{timestr}: {ip}:{port} -- FAIL')
                        res = 'FAIL'
                        continue    # 如果检测不通则间隔 10 秒,继续检测
            except Exception as e:
                print(f'异常未执行:{e}')
                res = '异常未执行'
            finally:
                s.close()
      return res

    # --------执行函数------------------
    def main(self):
      csvfile = csv.reader(open(self.ipfile, 'r'))
      ipInput = []
      # READ FROM INPUT CSV FILE
      for row in csvfile:
            if row.lower() == 'ip':
                continue
            else:
                ipInput.append((row, row))

      # TEST CONNECTION
      for ip, port in ipInput:
            res = self.socketTest(ip, port)

if __name__ == '__main__':
    ipfile = 'iplist.csv'
    start_time = time.time()
    so = NetworkCheck(ipfile)
    so.main()
    input("程序执行完毕,按回车键退出...")

xiaolinge566 发表于 2024-9-26 08:27

jun269 发表于 2024-9-25 16:39
楼主给打包并搞成UI界面就完美了

我考虑后面做一个。目前初衷是做成监控工具的模式,这个是部分代码。最后实现是通过邮件获取异常结果发送出去。

xiaolinge566 发表于 2024-9-25 14:42

md编辑器怎么有的代码包含不了

侃遍天下无二人 发表于 2024-9-25 15:11

xiaolinge566 发表于 2024-9-25 14:42
md编辑器怎么有的代码包含不了

语法问题,md的代码应该包裹在
```language
// code
```
里面,你这里要把language替换成python
至于被包裹的那一部分代码,应该是你在python里恰好有一段语法被解析成md代码块了,属于巧合

zhangwei0988 发表于 2024-9-25 15:27

加个协程?支持批量IP地址C段扫描?

jun269 发表于 2024-9-25 16:39

楼主给打包并搞成UI界面就完美了

lnshijia 发表于 2024-9-25 17:30

感谢,使用起来可以

xmp788 发表于 2024-9-25 17:48

python操作文件,指明文件路径时,建议你们不要这样写,这样写不同编辑器,解释得到的路径不一样,这样写pycharm是当前文件目录,而vscode和python自带的编辑器指向是python安装目录

KirinRT 发表于 2024-9-25 20:10

python源码都给了可以自己随便改

不过楼上提到的文件路径这个问题,确实可以注意一下

xiaolinge566 发表于 2024-9-26 08:19

侃遍天下无二人 发表于 2024-9-25 15:11
语法问题,md的代码应该包裹在
```language
// code


感谢指教,我下次试试
页: [1] 2
查看完整版本: 批量检测互联网IP+端口的连通性