吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 3817|回复: 23
收起左侧

[Python 原创] Python内网终端开放端口扫描程序

  [复制链接]
xiaomingtt 发表于 2023-3-29 10:18
上次做了个端口扫描程序【新提醒】局域网开放高危端口自动扫描报警程序——语音+飞秋报警——附端口封闭工具 - 『原创发布区』 - 吾爱破解 - LCG - LSG |安卓破解|病毒分析|[url]www.52pojie.cn[/url]
其中的端口扫描程序用的是shadow1ng/fscan: 一款内网综合扫描工具,方便一键自动化、全方位漏扫扫描。 (github.com)
由于不是自己的扫描程序,使用总是有不方便的地方。 于是决定用最近正在学习的Python写了一段端口扫描程序。
[Python] 纯文本查看 复制代码
import socket
import concurrent.futures
import ipaddress

# 定义要扫描的网段
subnet = "192.168.112.0/20"
# 定义要扫描的端口范围
port_range = [135]
# 定义线程池大小
thread_pool_size = 200

def scan_port(ip, port):
    #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((ip, port))
        if result == 0:
            return str(ip) + ":" + str(port)
    except:
        pass
    finally:
        if sock:
            sock.close()

def scan_subnet(subnet):
    ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())]
    print(ips)
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        futures = [executor.submit(scan_port, ip, port) for ip in ips for port in port_range]
        concurrent.futures.wait(futures)
        for future in concurrent.futures.as_completed(futures):
            port = future.result()
            if port is not None:
                print(port)

if __name__ == "__main__":
    scan_subnet(subnet)

我们内网网段有4096个地址,实际使用不超过10%,上面代码却要扫描整个网段,感觉比较浪费资源,又做了下面这个,先用Ping做存活检测,然后对存活终端做端口扫描。
[Python] 纯文本查看 复制代码
import subprocess
import os
import sys
import re
import concurrent.futures
import ipaddress
import socket

# 定义要扫描的网段
subnet = "192.168.112.0/20"
# 定义线程池大小
thread_pool_size = 200
# 定义要扫描的端口范围
port_range = [135]

def scan_port(ip, port):
    #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((ip, port))
        if result == 0:
            return str(ip) + ":" + str(port)
    except:
        pass
    finally:
        if sock:
            sock.close()


def scan_subnet(subnet):
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range]
        concurrent.futures.wait(futures)

        for future in concurrent.futures.as_completed(futures):
            port = future.result()
            if port is not None:
                print(port)

def PingIP(ip):
    try:
        p = subprocess.Popen(['ping','-n','1','-w','20',ip],
                    stdout=subprocess.PIPE,
                    stdin = subprocess.PIPE,
                    stderr = subprocess.PIPE,
                    shell = True)
        output = p.stdout.read().decode("gbk").upper()
        if "TTL" in output:
            return(ip)
        else:
            pass
    except:
        pass

def checkLive(subnet):
    ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())]
    iplist=[]
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        futures = [executor.submit(PingIP, ip) for ip in ips]
        concurrent.futures.wait(futures)

        for future in concurrent.futures.as_completed(futures):
            ip = future.result()
            if ip is not None:
                iplist.append(ip)
        print(iplist)
        scan_subnet(iplist)


if __name__ == "__main__":
    checkLive(subnets)

但这程序运行后CPU直接拉满,检测速度也比不做活检慢了好几倍。于是想到用ARP做活检。
[Python] 纯文本查看 复制代码
import os
import sys
import time
from scapy.all import ARP, Ether, srp
import concurrent.futures
import socket

# 定义要扫描的网段
subnet = "192.168.118.0/24"
# 定义要扫描的端口范围
port_range = [135,445,3306,3389,6379,22]
# 定义线程池大小
thread_pool_size = 200
	

def scan_port(ip, port):
    #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((ip, port))
        if result == 0:
            return str(ip) + ":" + str(port)
    except:
        pass
    finally:
        if sock:
            sock.close()

def scan_subnet(subnet):
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range]
        concurrent.futures.wait(futures)

        for future in concurrent.futures.as_completed(futures):
            port = future.result()
            if port is not None:
                print(port)
                
def arpscan(subnet):
	arp_request = ARP(pdst=subnet)
	ether = Ether(dst="ff:ff:ff:ff:ff:ff")
	arp_request_broadcast = ether / arp_request
	answered_list = srp(arp_request_broadcast, timeout=1, verbose=False)[0]
	clients = []
	for packet in answered_list:
		ip = packet[1].psrc
		clients.append(ip)
	scan_subnet(clients)	
                
if __name__ == "__main__":
    arpscan(subnet)
v

ARP做本网段存活检测确实快,但研究半天才发现ARP不支持跨网段,活检还得想办法用Ping,直到找到了https://blog.csdn.net/Small_Teenager/article/details/122123299
[Python] 纯文本查看 复制代码
# encoding:utf-8
import time
import struct
import socket
import select
import concurrent.futures
import ipaddress

 
#Ping程序代码来自[url]https://blog.csdn.net/Small_Teenager/article/details/122123299[/url]
# 定义要扫描的网段
subnet = "192.168.112.0/20"
# 定义线程池大小
thread_pool_size = 200
# 定义要扫描的端口范围
port_range = [135,445,3306,3389,6379,22]

def scan_port(ip, port):
    #print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((ip, port))
        if result == 0:
            return str(ip) + ":" + str(port)
    except:
        pass
    finally:
        if sock:
            sock.close()

def scan_subnet(subnet):
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range]
        concurrent.futures.wait(futures)
        
        for future in concurrent.futures.as_completed(futures):
            port = future.result()
            if port is not None:
                print(port)
 
def chesksum(data):
    n = len(data)
    m = n % 2
    sum = 0
    for i in range(0, n - m ,2):
        sum += (data[i]) + ((data[i+1]) << 8)#传入data以每两个字节(十六进制)通过ord转十进制,第一字节在低位,第二个字节在高位
    if m:
        sum += (data[-1])
    #将高于16位与低16位相加
    sum = (sum >> 16) + (sum & 0xffff)
    sum += (sum >> 16) #如果还有高于16位,将继续与低16位相加
    answer = ~sum & 0xffff
    #  主机字节序转网络字节序列(参考小端序转大端序)
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer
 
def request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body):
    #  把字节打包成二进制数据
    icmp_packet = struct.pack('>BBHHH32s',data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body)
    icmp_chesksum = chesksum(icmp_packet)  #获取校验和
    #  把校验和传入,再次打包
    icmp_packet = struct.pack('>BBHHH32s',data_type,data_code,icmp_chesksum,data_ID,data_Sequence,payload_body)
    return icmp_packet
 
 
def raw_socket(dst_addr,icmp_packet):
    '''
       连接套接字,并将数据发送到套接字
    '''
    #实例化一个socket对象,ipv4,原套接字,分配协议端口
    rawsocket = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.getprotobyname("icmp"))
    #记录当前请求时间
    send_request_ping_time = time.time()
    #发送数据到网络
    rawsocket.sendto(icmp_packet,(dst_addr,80))
    #返回数据
    return send_request_ping_time,rawsocket,dst_addr
 
 
def reply_ping(send_request_ping_time,rawsocket,data_Sequence,timeout = 2):
    while True:
        #开始时间
        started_select = time.time()
        #实例化select对象,可读rawsocket,可写为空,可执行为空,超时时间
        what_ready = select.select([rawsocket], [], [], timeout)
        #等待时间
        wait_for_time = (time.time() - started_select)
        #没有返回可读的内容,判断超时
        if what_ready[0] == []:  # Timeout
            return -1
        #记录接收时间
        time_received = time.time()
        #设置接收的包的字节为1024
        received_packet, addr = rawsocket.recvfrom(1024)
        #获取接收包的icmp头
        #print(icmpHeader)
        icmpHeader = received_packet[20:28]
        #反转编码
        type, code, checksum, packet_id, sequence = struct.unpack(
            ">BBHHH", icmpHeader
        )
 
        if type == 0 and sequence == data_Sequence:
            return time_received - send_request_ping_time
 
        #数据包的超时时间判断
        timeout = timeout - wait_for_time
        if timeout <= 0:
            return -1

def ping(host):
    #TODO icmp数据包的构建
    data_type = 8 # ICMP Echo Request
    data_code = 0 # must be zero
    data_checksum = 0 # "...with value 0 substituted for this field..."
    data_ID = 0 #Identifier
    data_Sequence = 1 #Sequence number
    payload_body = b'abcdefghijklmnopqrstuvwabcdefghi' #data
 
    # 将主机名转ipv4地址格式,返回以ipv4地址格式的字符串,如果主机名称是ipv4地址,则它将保持不变
    #dst_addr = socket.gethostbyname(host)
    #print("正在 Ping {0} [{1}] 具有 32 字节的数据:".format(host,dst_addr))
    #请求ping数据包的二进制转换
    icmp_packet = request_ping(data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body)
    #连接套接字,并将数据发送到套接字
    send_request_ping_time,rawsocket,addr = raw_socket(host,icmp_packet)
    #数据包传输时间
    times = reply_ping(send_request_ping_time,rawsocket,data_Sequence)
    if times > 0:
        #print("来自 {0} 的回复: 字节=32 时间={1}ms".format(addr,int(times*1000)))
        return host
    else:
        #print("请求超时。")
        pass

def StartPing(subnet):
    # 将网段转换为IP地址列表
    ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())]
    print(ips)
    # 创建线程池
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
        # 对于每个IP地址和端口,提交扫描任务到线程池
        futures = [executor.submit(ping, ip) for ip in ips]
        # 等待所有扫描任务完成
        concurrent.futures.wait(futures)

        # 打印开放的端口号
        iplist = []
        for future in concurrent.futures.as_completed(futures):
            ip = future.result()
            if ip is not None:
                iplist.append(ip)
        scan_subnet(iplist)
                
if __name__ == "__main__":
    StartPing(subnet)

利用Python实现Ping后,资源占用明显降低,扫描一个端口不如不做活检直接扫描,但扫描多个端口速度优势就体现出来了。

免费评分

参与人数 10吾爱币 +18 热心值 +9 收起 理由
jonepengcn + 1 + 1 我很赞同!
jekooma + 1 + 1 谢谢@Thanks!
刘甲乙 + 2 + 1 热心回复!
skywalker0123 + 2 + 1 我很赞同!
苏紫方璇 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
jihedian + 1 鼓励转贴优秀软件安全工具和文档!
aka1Andrew + 1 + 1 热心回复!
嘚瑟挨顿揍 + 1 + 1 谢谢@Thanks!
jlzoe + 1 + 1 谢谢@Thanks!
vnightray + 1 + 1 谢谢@Thanks!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

dcsyzx123 发表于 2023-3-29 10:41
感谢分享,努力学习python中……
 楼主| xiaomingtt 发表于 2023-3-29 12:14
wukequdai 发表于 2023-3-29 11:13
nmap 现成的  不要重复造轮子

现成程序确实很多,但我正在学习pyrhon,总要拿一些东西来练手。而且现成的东西不一定完全满足我自己的需求,比如我以前用的fscan,必须把扫描结果保存到文件,再读取文件中的扫描结果,我希望扫描出来一条就马上自动处理一条,而不是全部结果出来以后再去处理。你认为这是轮子,但这是我进步的阶梯。
jkl5322203 发表于 2023-3-29 10:34
其实还可以升级成扫描外网IP的端口。内网外网都可以扫,指定IP开放的端口也可以扫就完美了。我有这种工具,但不是PY写的。我感觉PY写会更好
anson1599 发表于 2023-3-29 10:38
那如果机器做了防PING,能检测吗?
whw0623 发表于 2023-3-29 10:44
可以学习nmap的源码
Easonll 发表于 2023-3-29 10:51
能否升级端口,内外网都能扫呢
liulaolao 发表于 2023-3-29 11:01
感谢分享。
wukequdai 发表于 2023-3-29 11:13
nmap 现成的  不要重复造轮子
zheng8542 发表于 2023-3-29 11:13
防火墙如果防PING是不是就不能扫描端口了?
wangL0 发表于 2023-3-29 11:59
感谢分享
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-24 16:36

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表