import socket
import concurrent.futures
import ipaddress
# 定义要扫描的网段
subnet = "192.168.112.0/20"
# 定义要扫描的端口范围
port_range = [135]
# 定义线程池大小
thread_pool_size = 200
def scan_port(ip, port):
#print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
if result == 0:
return str(ip) + ":" + str(port)
except:
pass
finally:
if sock:
sock.close()
def scan_subnet(subnet):
ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())]
print(ips)
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
futures = [executor.submit(scan_port, ip, port) for ip in ips for port in port_range]
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
port = future.result()
if port is not None:
print(port)
if __name__ == "__main__":
scan_subnet(subnet)
import subprocess
import os
import sys
import re
import concurrent.futures
import ipaddress
import socket
# 定义要扫描的网段
subnet = "192.168.112.0/20"
# 定义线程池大小
thread_pool_size = 200
# 定义要扫描的端口范围
port_range = [135]
def scan_port(ip, port):
#print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
if result == 0:
return str(ip) + ":" + str(port)
except:
pass
finally:
if sock:
sock.close()
def scan_subnet(subnet):
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range]
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
port = future.result()
if port is not None:
print(port)
def PingIP(ip):
try:
p = subprocess.Popen(['ping','-n','1','-w','20',ip],
stdout=subprocess.PIPE,
stdin = subprocess.PIPE,
stderr = subprocess.PIPE,
shell = True)
output = p.stdout.read().decode("gbk").upper()
if "TTL" in output:
return(ip)
else:
pass
except:
pass
def checkLive(subnet):
ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())]
iplist=[]
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
futures = [executor.submit(PingIP, ip) for ip in ips]
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
ip = future.result()
if ip is not None:
iplist.append(ip)
print(iplist)
scan_subnet(iplist)
if __name__ == "__main__":
checkLive(subnets)
但这程序运行后CPU直接拉满,检测速度也比不做活检慢了好几倍。于是想到用ARP做活检。
[Python] 纯文本查看复制代码
import os
import sys
import time
from scapy.all import ARP, Ether, srp
import concurrent.futures
import socket
# 定义要扫描的网段
subnet = "192.168.118.0/24"
# 定义要扫描的端口范围
port_range = [135,445,3306,3389,6379,22]
# 定义线程池大小
thread_pool_size = 200
def scan_port(ip, port):
#print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
if result == 0:
return str(ip) + ":" + str(port)
except:
pass
finally:
if sock:
sock.close()
def scan_subnet(subnet):
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range]
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
port = future.result()
if port is not None:
print(port)
def arpscan(subnet):
arp_request = ARP(pdst=subnet)
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
arp_request_broadcast = ether / arp_request
answered_list = srp(arp_request_broadcast, timeout=1, verbose=False)[0]
clients = []
for packet in answered_list:
ip = packet[1].psrc
clients.append(ip)
scan_subnet(clients)
if __name__ == "__main__":
arpscan(subnet)
v