前言
此程序受到了某电子教室的网页屏蔽功能启发,在发现目前没有类似实现代码后独立编写,与其他例如hosts屏蔽方式存在很大的区别,如果进一步开发可实现http内容的全文解析及https的特征识别。
第三方依赖
pydivert(请通过pip安装)
代码
import threading
import pydivert
import msvcrt
import re
import socket
class HTTPSPayload(object):
def __init__(self, buffer):
self.buffer = buffer
self.buffer_len = len(self.buffer)
self.pos = 0
@property
def available(self):
left = self.buffer_len - self.pos - 1
if left > 0:
return left
return 0
def read_byte(self):
byte_data = self.buffer[self.pos]
self.pos += 1
return byte_data
def read_int16(self):
i1 = self.read_byte()
i2 = self.read_byte()
return (i1 << 8) + i2
def read_int24(self):
i1 = self.read_byte()
i2 = self.read_byte()
i3 = self.read_byte()
return (i1 << 16) + (i2 << 8) + i3
def read_bytes(self, length):
bytes_data = self.buffer[self.pos: self.pos + length]
self.pos += length
return bytes_data
def ensure_length(self, length):
return self.buffer_len - self.pos >= length
class DivertThread(threading.Thread):
def __init__(self):
super(DivertThread, self).__init__(daemon=True)
self.divert = None
self.block_list = ['www.4399.com'] # 此处可添加要屏蔽的网站
self.tcp_raw = {
'ipv4': b'E\x04\x00(Y\x89@\x00@\x06u/\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00P\x10\x00B\x00\x00\x00\x00',
'ipv6': b'`Nn\xb3\x00\x14\x062\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00P\x10\x00C\x00\x00\x00\x00'
}
@staticmethod
def parse_packet_http(packet):
match_result = re.search(
b'Host:\s?(?P<host>\S+?)\\r\\n', packet.payload)
if match_result is not None:
host = match_result.group('host').decode()
return host
def parse_packet_https(self, packet):
buffer = packet.payload
if len(buffer) < 2:
return
record_type = buffer[0]
stream = HTTPSPayload(buffer[1:])
if record_type == 0x16:
if not stream.ensure_length(43):
return
# SSL 3.0 or TLS 1.0, 1.1 and 1.2, SSL 2.0 is ignored
major_version = stream.read_byte()
minor_version = stream.read_byte()
record_length = stream.read_int16()
if stream.read_byte() != 0x01: # Should be client hello
return
length = stream.read_int24()
major_version = stream.read_byte()
minor_version = stream.read_byte()
stream.read_bytes(32) # random
length = stream.read_byte()
# sessionid + 2 ciphersData length
if not stream.ensure_length(length + 2):
return
stream.read_bytes(length) # sessionid
length = stream.read_int16()
# ciphersData + compressionData length
if not stream.ensure_length(length + 1):
return
stream.read_bytes(length) # ciphersData
length = stream.read_byte()
if length < 1:
return
# compressionData
if not stream.ensure_length(length):
return
stream.read_bytes(length) # compressionData
extenstions_start_position = stream.pos
if extenstions_start_position >= record_length + 5:
return
host = self.read_server_name(major_version, minor_version, stream)
if host:
return host.decode()
@staticmethod
def read_server_name(major_version, minor_version, stream):
if major_version > 3 or major_version == 3 and minor_version >= 1:
if stream.ensure_length(2):
extensions_length = stream.read_int16()
if stream.ensure_length(extensions_length):
while extensions_length > 3:
ext_id = stream.read_int16()
length = stream.read_int16()
data = stream.read_bytes(length)
if ext_id == 0:
idx = 2
while idx < len(data):
name_type = data[idx]
count = (data[idx + 1] << 8) + data[idx + 2]
string = data[idx + 3: idx + 3 + count]
if name_type == 0:
return string
idx += 3 + count
extensions_length -= 4 + length
def run(self):
self.divert = pydivert.WinDivert('outbound and tcp.PayloadLength > 0 and ' +
'(tcp.DstPort == 80 or tcp.DstPort == 443)')
self.divert.open()
while True:
try:
packet = self.divert.recv()
except OSError:
break
if packet.dst_port == 80:
result = self.parse_packet_http(packet)
else:
result = self.parse_packet_https(packet)
if result is None or result not in self.block_list:
self.divert.send(packet)
continue
'''
URL matched blacklist, reset connection
1. Send reset to server
'''
packet.tcp.rst = True
packet.payload = b''
packet.recalculate_checksums()
self.divert.send(packet)
'''
2. Send reset to client
'''
is_ipv4 = packet.ipv4 is not None
block_packet = pydivert.Packet(self.tcp_raw['ipv4'] if is_ipv4 else self.tcp_raw['ipv6'],
packet.interface,
pydivert.Direction.INBOUND)
try:
block_packet.src_addr, block_packet.src_port = packet.dst_addr, packet.dst_port
block_packet.dst_addr, block_packet.dst_port = packet.src_addr, packet.src_port
block_packet.tcp.rst = True
if is_ipv4:
block_packet.ipv4.ttl = packet.ipv4.ttl
payload_len = packet.ipv4.packet_len - packet.ipv4.header_len
else:
payload_len = packet.ipv6.payload_len
block_packet.tcp.seq_num = packet.tcp.ack_num
block_packet.tcp.ack_num = socket.htonl(socket.ntohl(packet.tcp.seq_num) + payload_len)
block_packet.recalculate_checksums()
self.divert.send(block_packet)
except OSError:
continue
def stop(self):
self.divert.close()
self.join()
def main():
divert_thread = DivertThread()
divert_thread.start()
print('Working...')
msvcrt.getch()
divert_thread.stop()
if __name__ == '__main__':
main()
写在后面
原创开发不易,网络流量识别更是一项困难的技术,如果对您有帮助还请多多支持,谢谢!
|