吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 584|回复: 2
上一主题 下一主题
收起左侧

[Web逆向] 某邮箱"XMTLS" 实现简要分析

[复制链接]
跳转到指定楼层
楼主
我是不会改名的 发表于 2025-3-15 14:38 回帖奖励
本帖最后由 我是不会改名的 于 2025-3-15 14:36 编辑

⚠️ 重要提示

本文档内容仅供学习交流使用,严禁用于任何商业用途或非法活动。在进行相关技术学习和实践过程中,请严格遵守法律法规及道德规范。

前言

地址 :aHR0cHM6Ly93eC5tYWlsLnFxLmNvbS8=
这周炒币损失惨重,闲着没事对 XMTLS 的实现进行了研究,并在此记录下相关成果。
此前一直直接调用 JavaScript 代码,此次尝试使用 Python 重新实现一遍。
XMTLS 类本质上是 MMTLS 的简化版本,通过对 XMTLS 的学习,也能为后续深入研究 MMTLS 奠定基础。
由于内容较多,本文省略了大量非关键部分的分析现。

直接调用 Wasm

在使用过程中,发现直接将整个 JavaScript 代码复制下来,无需进行任何修改即可正常运行。为了方便调用,我们可以简单编写一个 API。具体操作如下:

在命令行中运行node --inspect xmtls.js启动 Node.js 进程并开启调试模式。

在浏览器中访问edge://inspect/#devices(以 Edge 浏览器为例,其他支持调试的浏览器也有类似入口),通过浏览器调试工具可以方便地查看内存等信息,有助于对代码运行过程进行分析和调试。

JavaScript 代码示例

以下是一个完整的 JavaScript 服务端代码示例,用于提供 XMTLS 相关的加密和解密服务:

run();
window ={};
window.Module=Module;
const http = require('http');

class ClientCrypto {
    constructor() {
        this.module = window.Module;
        this.clientPtr = this.module?.cwrap("NewClient", "number")?.();
        this.sessionId = crypto.randomUUID();
        ClientCrypto.instances.push(this);
    }

    encrypt(data) {
        const strPtr = allocateUTF8(data);
        const resultPtr = this.module?.cwrap("ClientEncrypt", "number", ["number", "number"])(this.clientPtr, strPtr);
        const result = resultPtr ? UTF8ToString(resultPtr) : null;
        window.Module._free(strPtr);
        if (resultPtr) window.Module._free(resultPtr);
        return { original: data, encrypted: result, sessionId: this.sessionId };
    }

    decrypt(data, sessionId) {
        if (sessionId !== this.sessionId) {
            throw new Error("Invalid session ID");
        }
        const strPtr = allocateUTF8(data);
        const resultPtr = this.module?.cwrap("ClientDecrypt", "number", ["number", "number"])(this.clientPtr, strPtr);
        let result = resultPtr ? UTF8ToString(resultPtr) : null;
        window.Module._free(strPtr);
        if (resultPtr) window.Module._free(resultPtr);
        return typeof result === "string" ? JSON.parse(result) : result;
    }

    static handleApiRequest(type, data, sessionId = null) {
        if (type === "encrypt") {
            if (typeof data === 'object') {
            data = JSON.stringify(data);
        }
            const client = new ClientCrypto();
            return client.encrypt(data);
        } else if (type === "decrypt") {
            const client = ClientCrypto.instances.find(c => c.sessionId === sessionId);
            if (!client) throw new Error("Session ID not found");
            return client.decrypt(data, sessionId);
        } else {
            throw new Error("Invalid API request type");
        }
    }
}

ClientCrypto.instances = [];

const server = http.createServer((req, res) => {
    if (req.method === 'POST') {
        let body = '';
        req.on('data', chunk => {
            body += chunk.toString();
        });
        req.on('end', () => {
            try {
                const { type, data, sessionId } = JSON.parse(body);
                const response = ClientCrypto.handleApiRequest(type, data, sessionId);
                res.writeHead(200, { 'Content-Type': 'application/json' });
                res.end(JSON.stringify({ success: true, response }));
            } catch (error) {
                res.writeHead(400, { 'Content-Type': 'application/json' });
                res.end(JSON.stringify({ success: false, error: error.message }));
            }
        });
    } else {
        res.writeHead(405, { 'Content-Type': 'application/json' });
        res.end(JSON.stringify({ success: false, error: 'Method Not Allowed' }));
    }
});

server.listen(10010, () => {
    console.log('Server running on port 10010');
});

Python 还原

结构还原

在还原过程中,首先参考了 MMTLS 的相关介绍以及 Wasm 中的解包代码,以此来还原请求和响应的结构。例如,在 Wasm 代码中,我们可以看到以下函数调用:

ZN5xmtls11ClientHello11DeserializeERKNS\_6StringE
ZN5xmtls11ServerHello11DeserializeERKNS\_6StringE

这些函数用于反序列化客户端和服务器信息,并且只有在解析出现错误时才会打印错误信息,如:

w2c__ZN5xmtls7ILogger3LogEiPKcS2_jS2_z(
      w2c_g266,
      4,
      *Z_envZ___memory_baseZ_i + 52856,
      *Z_envZ___memory_baseZ_i + 33538,
      38,#长度
      *Z_envZ___memory_baseZ_i + 122672,#可以通过这看出字段
      0);

经过分析和还原,得到的结构大致如下(Python 代码示例):

Block = Struct(
    "length" / Int16ub,
    "data" / Bytes(this.length)
)
LongBlock = Struct(
    "length" / Int32ub,
    "data" / Bytes(this.length)
)
ECDH = Struct(
    "length" / Int16ub,#整个长度
    "nid" / Int16ub ,#默认415
    "ecdh_pub_key_length" / Int16ub,
    "ecdh_pub_key" / Bytes(this.ecdh_pub_key_length),
)
Request = Struct(
    "file_length" / Int32ub,  # 整体文件长度
    "record_type" / Byte,#0:加密 1:解密
    "data_length" / Int32ub,  # 内容长度
    "ecdh" / ECDH,
    "encrypted_ticket" / LongBlock,
    "info" / Block,
    "Nonce" / Block,
    "encrypted_data" / LongBlock,
)
Response = Struct(
    "file_length" / Int32ub,  # 4字节文件长度
    "record_type" / Byte,#0:加密 1:解密
    "remaining_length" / Int32ub,
    "ecdh" / ECDH,
    "info" / Block,
    "Nonce" / Block,
    "position" / Tell,
    "encrypted_data" / If(this.position < this.file_length, LongBlock),
)

流程简单分析

XMTLS 数据结构定义

在 XMTLS 中,定义了xmtls::String结构,其内存布局如下:

ptr + 8:存储字符串内容。

ptr + 16:存储字符串长度。

大部分数据都使用这个类来表示。我们可以通过$_ZN5xmtls6String8hex_dataEv函数来查看其内容,该函数会返回转换为十六进制后的地址。

加密请求流程

生成密钥对:调用GenEcdhKeyPair函数创建密钥对。

生成随机数据:使用GenerateRandom函数随机生成 12 字节的infoNonce

生成加密密钥:通过HkdfExpand函数,利用生成的info和一个固定值生成一个 16 字节的key

数据加密:调用AesGcmEncrypt函数,使用生成的keynonce对数据进行加密。

生成请求包:通过PrepareDataPackage函数,利用上述加密后的数据生成请求包,并添加一个固定的encrypted_ticket

计算请求包哈希:使用Sha256函数计算请求包的哈希值。

发送请求:将生成的请求包发送出去。

接收响应流程

解包响应:调用xmtls7Channel9GetRecordERKNS_6StringERNS_6RecordE函数对接收的响应进行解包。

计算共享密钥:使用N5xmtls17OpensslCryptoUtil9ComputeDhEiRKNS_6StringES3_RS1_函数,利用本地的私钥和服务器的公钥计算出共享密钥。

生成解密密钥 1:通过HkdfExpand函数,以上述共享密钥和请求最后计算的哈希值生成一个 16 字节的key

第一次解密:调用AesGcmDecrypt函数,使用生成的keynonce对响应数据进行第一次解密。

生成解密密钥 2:再次调用HkdfExpand函数,利用请求最后计算的哈希值和一个固定值生成另一个 16 字节的key

第二次解密:再次使用AesGcmDecrypt函数,用新生成的keynonce对数据进行第二次解密。

返回结果:将最终解密后的响应返回。
完整代码如下:

base64
import hashlib
import json
import os

import requests
from construct import Struct, Int32ub, Byte, Int16ub, Bytes, this, If, Tell
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives._serialization import NoEncryption
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat

Block = Struct(
    "length" / Int16ub,
    "data" / Bytes(this.length)
)
LongBlock = Struct(
    "length" / Int32ub,
    "data" / Bytes(this.length)
)
ECDH = Struct(
    "length" / Int16ub,#整个长度
    "nid" / Int16ub ,
    "ecdh_pub_key_length" / Int16ub,
    "ecdh_pub_key" / Bytes(this.ecdh_pub_key_length),
)
Request = Struct(
    "file_length" / Int32ub,  # 整体文件长度
    "record_type" / Byte,#0:加密 1:解密
    "data_length" / Int32ub,  # 内容长度
    "ecdh" / ECDH,
    "encrypted_ticket" / LongBlock,
    "info" / Block,
    "Nonce" / Block,
    "encrypted_data" / LongBlock,
)
Response = Struct(
    "file_length" / Int32ub,  # 4字节文件长度
    "record_type" / Byte,#0:加密 1:解密
    "remaining_length" / Int32ub,
    "ecdh" / ECDH,
    "info" / Block,
    "Nonce" / Block,
    "position" / Tell,
    "encrypted_data" / If(this.position < this.file_length, LongBlock),
)
#简化版
def get_cl_key(info):
    key = bytes.fromhex(
        '4b 6e d3 41 50 52 ee 34 02 47 a7 e9 4b 17 a5 59 3e fb 80 95 94 c2 84 00 f8 c2 dc b3 ff fa 93 07')
    hmac_obj = HMAC(key, hashes.SHA256())
    hmac_obj.update(info)
    hmac_obj.update(bytes([0x01]))
    return hmac_obj.finalize()
#简化版
def get_sv_key(key,encrypted_data_hash):
    newkey = HMAC(key, hashes.SHA256())
    newkey.update(encrypted_data_hash)
    newkey.update(bytes([0x01]))
    return newkey.finalize()
class Client:
    def __init__(self):
        curve = ec.SECP256R1()
        private_key_local = ec.generate_private_key(curve)
        public_key_local = private_key_local.public_key()
        self.private_key = private_key_local
        self.public_key = public_key_local
        self.private_key_bytes = private_key_local.private_bytes(
            encoding=Encoding.DER,
            format=PrivateFormat.PKCS8,
            encryption_algorithm=NoEncryption()
        )
        self.public_key_bytes = public_key_local.public_bytes(
            encoding=Encoding.X962,
            format=PublicFormat.UncompressedPoint
        )
        self.encrypted_data_hash = None
        self.encrypted_ticket=bytes.fromhex('ad56dca5e01165cb79d5a87f177c71ef1a40cd33bbd31fbbf3db6d6b2850c8ebf403ad35dd88cf5b3b97b22853f5c1376098c049737e')
    #密钥交换
    def exchange(self, public_key_bytes):
        public_key = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256R1(), public_key_bytes)
        shared_secret = self.private_key.exchange(ec.ECDH(), public_key)
        key = hashlib.sha256(shared_secret).digest()
        return key
    def get_public_key(self) -> dict:
        data ={
            "length":len(self.public_key_bytes)+6,
            "nid":415,
            "ecdh_pub_key_length":len(self.public_key_bytes),
            "ecdh_pub_key":self.public_key_bytes
        }
        return data

    def encrypt(self, data) -> str:
        if isinstance(data, dict):
            data = json.dumps(data)
        if isinstance(data, str):
            data = data.encode()
        nonce = os.urandom(12)
        info =os.urandom(12)
        key = get_cl_key(info)[:16]
        encrypted_data = AESGCM(key).encrypt(nonce, data, associated_data=None)
        request ={
                "file_length": 0,
                "record_type": 0,
                "data_length": 0,
                "ecdh": self.get_public_key(),
                "encrypted_ticket": {
                    "length": len(self.encrypted_ticket),
                    "data": self.encrypted_ticket
                },
                "info": {
                    "length": len(info),
                    "data": info
                },
                "Nonce": {
                    "length": len(nonce),
                    "data": nonce
                },
                "position": 0,
                "encrypted_data": {
                    "length": len(encrypted_data),
                    "data": encrypted_data
                },
            }
        r = Request.build(request)
        file_length = len(r)
        data_length = file_length-9
        request["file_length"] = file_length
        request["data_length"] = data_length
        r = Request.build(request)
        self.encrypted_data_hash = hashlib.sha256(r).digest()
        return base64.b64encode(r).decode()
    def cl_decrypt(self, data) -> str:
        data = base64.b64decode(data)
        rsp = Response.parse(data)
        key = self.exchange(rsp.ecdh.ecdh_pub_key)
        nonce = rsp.Nonce.data
        svkey = get_sv_key(key,self.encrypted_data_hash)
        encrypted_data = rsp.encrypted_data.data
        cl_enc_data = AESGCM(svkey[0:16]).decrypt(nonce, encrypted_data, associated_data=None)
        clkey = get_cl_key(self.encrypted_data_hash)
        return AESGCM(clkey[:16]).decrypt(nonce, cl_enc_data, associated_data=None).decode()
   #服务端解密
    def sv_decrypt(self, data) -> str:
        data = base64.b64decode(data)
        request = Request.parse(data)
        if request.record_type == 0:
            key = get_cl_key(request.info.data)
            plaintext = AESGCM(key[0:16]).decrypt(request.Nonce.data, request.encrypted_data.data, associated_data=None).decode()
            return plaintext
        else:
            return "解密失败"

后记

XMTLS 基本调用的都是 openssl,,所以这次对 XMTLS 里涉及 openssl 的部分,我就没做特别详细的分析。
要是后续想深入了解底层细节,直接去查 openssl 和tls 的相关资料,就能获得更全面的信息。

免费评分

参与人数 2威望 +1 吾爱币 +20 热心值 +2 收起 理由
涛之雨 + 1 + 20 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!
iYoloPPD + 1 谢谢@Thanks!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

沙发
aqin5014 发表于 2025-3-16 14:29
学习一下
3#
wdj500 发表于 2025-3-16 19:08
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-3-17 09:45

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表