RichardYangZT 发表于 2022-1-23 11:10

Windows本地HTTP(S)流量屏蔽

## 前言
此程序受到了某电子教室的网页屏蔽功能启发,在发现目前没有类似实现代码后**独立编写**,与其他例如hosts屏蔽方式存在很大的区别,如果进一步开发可实现http内容的全文解析及https的特征识别。

## 第三方依赖
pydivert(请通过pip安装)

## 代码
```python
import threading
import pydivert
import msvcrt
import re
import socket


class HTTPSPayload(object):
    def __init__(self, buffer):
      self.buffer = buffer
      self.buffer_len = len(self.buffer)
      self.pos = 0

    @property
    def available(self):
      left = self.buffer_len - self.pos - 1
      if left > 0:
            return left
      return 0

    def read_byte(self):
      byte_data = self.buffer
      self.pos += 1
      return byte_data

    def read_int16(self):
      i1 = self.read_byte()
      i2 = self.read_byte()
      return (i1 << 8) + i2

    def read_int24(self):
      i1 = self.read_byte()
      i2 = self.read_byte()
      i3 = self.read_byte()
      return (i1 << 16) + (i2 << 8) + i3

    def read_bytes(self, length):
      bytes_data = self.buffer
      self.pos += length
      return bytes_data

    def ensure_length(self, length):
      return self.buffer_len - self.pos >= length


class DivertThread(threading.Thread):
    def __init__(self):
      super(DivertThread, self).__init__(daemon=True)
      self.divert = None
      self.block_list = ['www.4399.com'] # 此处可添加要屏蔽的网站
      self.tcp_raw = {
            'ipv4': b'E\x04\x00(Y\x89@\x00@\x06u/\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00P\x10\x00B\x00\x00\x00\x00',
            'ipv6': b'`Nn\xb3\x00\x14\x062\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00P\x10\x00C\x00\x00\x00\x00'
      }

    @staticmethod
    def parse_packet_http(packet):
      match_result = re.search(
            b'Host:\s?(?P<host>\S+?)\\r\\n', packet.payload)
      if match_result is not None:
            host = match_result.group('host').decode()
            return host

    def parse_packet_https(self, packet):
      buffer = packet.payload
      if len(buffer) < 2:
            return
      record_type = buffer
      stream = HTTPSPayload(buffer)
      if record_type == 0x16:
            if not stream.ensure_length(43):
                return
            # SSL 3.0 or TLS 1.0, 1.1 and 1.2, SSL 2.0 is ignored
            major_version = stream.read_byte()
            minor_version = stream.read_byte()

            record_length = stream.read_int16()

            if stream.read_byte() != 0x01:# Should be client hello
                return

            length = stream.read_int24()

            major_version = stream.read_byte()
            minor_version = stream.read_byte()

            stream.read_bytes(32)# random
            length = stream.read_byte()

            # sessionid + 2 ciphersData length
            if not stream.ensure_length(length + 2):
                return

            stream.read_bytes(length)# sessionid
            length = stream.read_int16()

            # ciphersData + compressionData length
            if not stream.ensure_length(length + 1):
                return

            stream.read_bytes(length)# ciphersData
            length = stream.read_byte()

            if length < 1:
                return

            # compressionData
            if not stream.ensure_length(length):
                return

            stream.read_bytes(length)# compressionData
            extenstions_start_position = stream.pos

            if extenstions_start_position >= record_length + 5:
                return

            host = self.read_server_name(major_version, minor_version, stream)
            if host:
                return host.decode()

    @staticmethod
    def read_server_name(major_version, minor_version, stream):
      if major_version > 3 or major_version == 3 and minor_version >= 1:
            if stream.ensure_length(2):
                extensions_length = stream.read_int16()
                if stream.ensure_length(extensions_length):
                  while extensions_length > 3:
                        ext_id = stream.read_int16()
                        length = stream.read_int16()
                        data = stream.read_bytes(length)
                        if ext_id == 0:
                            idx = 2
                            while idx < len(data):
                              name_type = data
                              count = (data << 8) + data
                              string = data
                              if name_type == 0:
                                    return string
                              idx += 3 + count
                        extensions_length -= 4 + length

    def run(self):
      self.divert = pydivert.WinDivert('outbound and tcp.PayloadLength > 0 and ' +
                                       '(tcp.DstPort == 80 or tcp.DstPort == 443)')
      self.divert.open()
      while True:
            try:
                packet = self.divert.recv()
            except OSError:
                break
            if packet.dst_port == 80:
                result = self.parse_packet_http(packet)
            else:
                result = self.parse_packet_https(packet)
            if result is None or result not in self.block_list:
                self.divert.send(packet)
                continue
            '''
                URL matched blacklist, reset connection
                1. Send reset to server
            '''
            packet.tcp.rst = True
            packet.payload = b''
            packet.recalculate_checksums()
            self.divert.send(packet)
            '''
                2. Send reset to client
            '''
            is_ipv4 = packet.ipv4 is not None
            block_packet = pydivert.Packet(self.tcp_raw['ipv4'] if is_ipv4 else self.tcp_raw['ipv6'],
                                           packet.interface,
                                           pydivert.Direction.INBOUND)
            try:
                block_packet.src_addr, block_packet.src_port = packet.dst_addr, packet.dst_port
                block_packet.dst_addr, block_packet.dst_port = packet.src_addr, packet.src_port
                block_packet.tcp.rst = True
                if is_ipv4:
                  block_packet.ipv4.ttl = packet.ipv4.ttl
                  payload_len = packet.ipv4.packet_len - packet.ipv4.header_len
                else:
                  payload_len = packet.ipv6.payload_len
                block_packet.tcp.seq_num = packet.tcp.ack_num
                block_packet.tcp.ack_num = socket.htonl(socket.ntohl(packet.tcp.seq_num) + payload_len)
                block_packet.recalculate_checksums()
                self.divert.send(block_packet)
            except OSError:
                continue

    def stop(self):
      self.divert.close()
      self.join()


def main():
    divert_thread = DivertThread()
    divert_thread.start()
    print('Working...')
    msvcrt.getch()
    divert_thread.stop()


if __name__ == '__main__':
    main()
```

## 写在后面
原创开发不易,网络流量识别更是一项困难的技术,如果对您有帮助还请多多支持,谢谢!

kexue8 发表于 2022-1-23 15:52

没弄明白,这个是控制上网的吗?比如老师让学生完成某个不需要联网的作业,用这个就不能上网了?

bwuaich 发表于 2022-1-23 18:44

来看下; 了解; 谢谢分享!!

RichardYangZT 发表于 2022-1-30 17:42

kexue8 发表于 2022-1-23 15:52
没弄明白,这个是控制上网的吗?比如老师让学生完成某个不需要联网的作业,用这个就不能上网了?

可以这么理解

kexue8 发表于 2022-1-31 11:08

RichardYangZT 发表于 2022-1-30 17:42
可以这么理解

感谢您的热心回复!顺祝新年快乐{:1_919:}

iptver 发表于 2022-1-31 11:45

这个是从底层看tcp流量里是不是有这个网址? 利用packet sniffer? 问题是如何部署呢,设置成开机启动?

RichardYangZT 发表于 2022-2-1 23:09

iptver 发表于 2022-1-31 11:45
这个是从底层看tcp流量里是不是有这个网址? 利用packet sniffer? 问题是如何部署呢,设置成开机启动?

对的,准确说不是抓包是直接拦截了Windows的出站数据

小旭. 发表于 2022-2-23 08:33

楼主那个PYCM的GitHub打不开了

RichardYangZT 发表于 2024-1-3 20:51

小旭. 发表于 2022-2-23 08:33
楼主那个PYCM的GitHub打不开了

换地址了:https://github.com/yang-zhongtian/PYCM
页: [1]
查看完整版本: Windows本地HTTP(S)流量屏蔽