⚠️ 重要提示
本文档内容仅供学习交流使用,严禁用于任何商业用途或非法活动。在进行相关技术学习和实践过程中,请严格遵守法律法规及道德规范。
前言
地址 :aHR0cHM6Ly93eC5tYWlsLnFxLmNvbS8=
这周炒币损失惨重,闲着没事对 XMTLS 的实现进行了研究,并在此记录下相关成果。
此前一直直接调用 JavaScript 代码,此次尝试使用 Python 重新实现一遍。
XMTLS 类本质上是 MMTLS 的简化版本,通过对 XMTLS 的学习,也能为后续深入研究 MMTLS 奠定基础。
由于内容较多,本文省略了大量非关键部分的分析现。
直接调用 Wasm
在使用过程中,发现直接将整个 JavaScript 代码复制下来,无需进行任何修改即可正常运行。为了方便调用,我们可以简单编写一个 API。具体操作如下:
在命令行中运行node --inspect xmtls.js
启动 Node.js 进程并开启调试模式。
在浏览器中访问edge://inspect/#devices
(以 Edge 浏览器为例,其他支持调试的浏览器也有类似入口),通过浏览器调试工具可以方便地查看内存等信息,有助于对代码运行过程进行分析和调试。
JavaScript 代码示例
以下是一个完整的 JavaScript 服务端代码示例,用于提供 XMTLS 相关的加密和解密服务:
run();
window ={};
window.Module=Module;
const http = require('http');
class ClientCrypto {
constructor() {
this.module = window.Module;
this.clientPtr = this.module?.cwrap("NewClient", "number")?.();
this.sessionId = crypto.randomUUID();
ClientCrypto.instances.push(this);
}
encrypt(data) {
const strPtr = allocateUTF8(data);
const resultPtr = this.module?.cwrap("ClientEncrypt", "number", ["number", "number"])(this.clientPtr, strPtr);
const result = resultPtr ? UTF8ToString(resultPtr) : null;
window.Module._free(strPtr);
if (resultPtr) window.Module._free(resultPtr);
return { original: data, encrypted: result, sessionId: this.sessionId };
}
decrypt(data, sessionId) {
if (sessionId !== this.sessionId) {
throw new Error("Invalid session ID");
}
const strPtr = allocateUTF8(data);
const resultPtr = this.module?.cwrap("ClientDecrypt", "number", ["number", "number"])(this.clientPtr, strPtr);
let result = resultPtr ? UTF8ToString(resultPtr) : null;
window.Module._free(strPtr);
if (resultPtr) window.Module._free(resultPtr);
return typeof result === "string" ? JSON.parse(result) : result;
}
static handleApiRequest(type, data, sessionId = null) {
if (type === "encrypt") {
if (typeof data === 'object') {
data = JSON.stringify(data);
}
const client = new ClientCrypto();
return client.encrypt(data);
} else if (type === "decrypt") {
const client = ClientCrypto.instances.find(c => c.sessionId === sessionId);
if (!client) throw new Error("Session ID not found");
return client.decrypt(data, sessionId);
} else {
throw new Error("Invalid API request type");
}
}
}
ClientCrypto.instances = [];
const server = http.createServer((req, res) => {
if (req.method === 'POST') {
let body = '';
req.on('data', chunk => {
body += chunk.toString();
});
req.on('end', () => {
try {
const { type, data, sessionId } = JSON.parse(body);
const response = ClientCrypto.handleApiRequest(type, data, sessionId);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ success: true, response }));
} catch (error) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ success: false, error: error.message }));
}
});
} else {
res.writeHead(405, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ success: false, error: 'Method Not Allowed' }));
}
});
server.listen(10010, () => {
console.log('Server running on port 10010');
});
Python 还原
结构还原
在还原过程中,首先参考了 MMTLS 的相关介绍以及 Wasm 中的解包代码,以此来还原请求和响应的结构。例如,在 Wasm 代码中,我们可以看到以下函数调用:
ZN5xmtls11ClientHello11DeserializeERKNS\_6StringE
ZN5xmtls11ServerHello11DeserializeERKNS\_6StringE
这些函数用于反序列化客户端和服务器信息,并且只有在解析出现错误时才会打印错误信息,如:
w2c__ZN5xmtls7ILogger3LogEiPKcS2_jS2_z(
w2c_g266,
4,
*Z_envZ___memory_baseZ_i + 52856,
*Z_envZ___memory_baseZ_i + 33538,
38,#长度
*Z_envZ___memory_baseZ_i + 122672,#可以通过这看出字段
0)
经过分析和还原,得到的结构大致如下(Python 代码示例):
Block = Struct(
"length" / Int16ub,
"data" / Bytes(this.length)
)
LongBlock = Struct(
"length" / Int32ub,
"data" / Bytes(this.length)
)
ECDH = Struct(
"length" / Int16ub,#整个长度
"nid" / Int16ub ,#默认415
"ecdh_pub_key_length" / Int16ub,
"ecdh_pub_key" / Bytes(this.ecdh_pub_key_length),
)
Request = Struct(
"file_length" / Int32ub, # 整体文件长度
"record_type" / Byte,#0:加密 1:解密
"data_length" / Int32ub, # 内容长度
"ecdh" / ECDH,
"encrypted_ticket" / LongBlock,
"info" / Block,
"Nonce" / Block,
"encrypted_data" / LongBlock,
)
Response = Struct(
"file_length" / Int32ub, # 4字节文件长度
"record_type" / Byte,#0:加密 1:解密
"remaining_length" / Int32ub,
"ecdh" / ECDH,
"info" / Block,
"Nonce" / Block,
"position" / Tell,
"encrypted_data" / If(this.position < this.file_length, LongBlock),
)
流程简单分析
XMTLS 数据结构定义
在 XMTLS 中,定义了xmtls::String
结构,其内存布局如下:
ptr + 8
:存储字符串内容。
ptr + 16
:存储字符串长度。
大部分数据都使用这个类来表示。我们可以通过$_ZN5xmtls6String8hex_dataEv
函数来查看其内容,该函数会返回转换为十六进制后的地址。
加密请求流程
生成密钥对:调用GenEcdhKeyPair
函数创建密钥对。
生成随机数据:使用GenerateRandom
函数随机生成 12 字节的info
和Nonce
。
生成加密密钥:通过HkdfExpand
函数,利用生成的info
和一个固定值生成一个 16 字节的key
。
数据加密:调用AesGcmEncrypt
函数,使用生成的key
和nonce
对数据进行加密。
生成请求包:通过PrepareDataPackage
函数,利用上述加密后的数据生成请求包,并添加一个固定的encrypted_ticket
。
计算请求包哈希:使用Sha256
函数计算请求包的哈希值。
发送请求:将生成的请求包发送出去。
接收响应流程
解包响应:调用xmtls7Channel9GetRecordERKNS_6StringERNS_6RecordE
函数对接收的响应进行解包。
计算共享密钥:使用N5xmtls17OpensslCryptoUtil9ComputeDhEiRKNS_6StringES3_RS1_
函数,利用本地的私钥和服务器的公钥计算出共享密钥。
生成解密密钥 1:通过HkdfExpand
函数,以上述共享密钥和请求最后计算的哈希值生成一个 16 字节的key
。
第一次解密:调用AesGcmDecrypt
函数,使用生成的key
和nonce
对响应数据进行第一次解密。
生成解密密钥 2:再次调用HkdfExpand
函数,利用请求最后计算的哈希值和一个固定值生成另一个 16 字节的key
。
第二次解密:再次使用AesGcmDecrypt
函数,用新生成的key
和nonce
对数据进行第二次解密。
返回结果:将最终解密后的响应返回。
完整代码如下:
base64
import hashlib
import json
import os
import requests
from construct import Struct, Int32ub, Byte, Int16ub, Bytes, this, If, Tell
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives._serialization import NoEncryption
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat
Block = Struct(
"length" / Int16ub,
"data" / Bytes(this.length)
)
LongBlock = Struct(
"length" / Int32ub,
"data" / Bytes(this.length)
)
ECDH = Struct(
"length" / Int16ub,
"nid" / Int16ub ,
"ecdh_pub_key_length" / Int16ub,
"ecdh_pub_key" / Bytes(this.ecdh_pub_key_length),
)
Request = Struct(
"file_length" / Int32ub,
"record_type" / Byte,
"data_length" / Int32ub,
"ecdh" / ECDH,
"encrypted_ticket" / LongBlock,
"info" / Block,
"Nonce" / Block,
"encrypted_data" / LongBlock,
)
Response = Struct(
"file_length" / Int32ub,
"record_type" / Byte,
"remaining_length" / Int32ub,
"ecdh" / ECDH,
"info" / Block,
"Nonce" / Block,
"position" / Tell,
"encrypted_data" / If(this.position < this.file_length, LongBlock),
)
def get_cl_key(info):
key = bytes.fromhex(
'4b 6e d3 41 50 52 ee 34 02 47 a7 e9 4b 17 a5 59 3e fb 80 95 94 c2 84 00 f8 c2 dc b3 ff fa 93 07')
hmac_obj = HMAC(key, hashes.SHA256())
hmac_obj.update(info)
hmac_obj.update(bytes([0x01]))
return hmac_obj.finalize()
def get_sv_key(key,encrypted_data_hash):
newkey = HMAC(key, hashes.SHA256())
newkey.update(encrypted_data_hash)
newkey.update(bytes([0x01]))
return newkey.finalize()
class Client:
def __init__(self):
curve = ec.SECP256R1()
private_key_local = ec.generate_private_key(curve)
public_key_local = private_key_local.public_key()
self.private_key = private_key_local
self.public_key = public_key_local
self.private_key_bytes = private_key_local.private_bytes(
encoding=Encoding.DER,
format=PrivateFormat.PKCS8,
encryption_algorithm=NoEncryption()
)
self.public_key_bytes = public_key_local.public_bytes(
encoding=Encoding.X962,
format=PublicFormat.UncompressedPoint
)
self.encrypted_data_hash = None
self.encrypted_ticket=bytes.fromhex('ad56dca5e01165cb79d5a87f177c71ef1a40cd33bbd31fbbf3db6d6b2850c8ebf403ad35dd88cf5b3b97b22853f5c1376098c049737e')
def exchange(self, public_key_bytes):
public_key = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256R1(), public_key_bytes)
shared_secret = self.private_key.exchange(ec.ECDH(), public_key)
key = hashlib.sha256(shared_secret).digest()
return key
def get_public_key(self) -> dict:
data ={
"length":len(self.public_key_bytes)+6,
"nid":415,
"ecdh_pub_key_length":len(self.public_key_bytes),
"ecdh_pub_key":self.public_key_bytes
}
return data
def encrypt(self, data) -> str:
if isinstance(data, dict):
data = json.dumps(data)
if isinstance(data, str):
data = data.encode()
nonce = os.urandom(12)
info =os.urandom(12)
key = get_cl_key(info)[:16]
encrypted_data = AESGCM(key).encrypt(nonce, data, associated_data=None)
request ={
"file_length": 0,
"record_type": 0,
"data_length": 0,
"ecdh": self.get_public_key(),
"encrypted_ticket": {
"length": len(self.encrypted_ticket),
"data": self.encrypted_ticket
},
"info": {
"length": len(info),
"data": info
},
"Nonce": {
"length": len(nonce),
"data": nonce
},
"position": 0,
"encrypted_data": {
"length": len(encrypted_data),
"data": encrypted_data
},
}
r = Request.build(request)
file_length = len(r)
data_length = file_length-9
request["file_length"] = file_length
request["data_length"] = data_length
r = Request.build(request)
self.encrypted_data_hash = hashlib.sha256(r).digest()
return base64.b64encode(r).decode()
def cl_decrypt(self, data) -> str:
data = base64.b64decode(data)
rsp = Response.parse(data)
key = self.exchange(rsp.ecdh.ecdh_pub_key)
nonce = rsp.Nonce.data
svkey = get_sv_key(key,self.encrypted_data_hash)
encrypted_data = rsp.encrypted_data.data
cl_enc_data = AESGCM(svkey[0:16]).decrypt(nonce, encrypted_data, associated_data=None)
clkey = get_cl_key(self.encrypted_data_hash)
return AESGCM(clkey[:16]).decrypt(nonce, cl_enc_data, associated_data=None).decode()
def sv_decrypt(self, data) -> str:
data = base64.b64decode(data)
request = Request.parse(data)
if request.record_type == 0:
key = get_cl_key(request.info.data)
plaintext = AESGCM(key[0:16]).decrypt(request.Nonce.data, request.encrypted_data.data, associated_data=None).decode()
return plaintext
else:
return "解密失败"
后记
XMTLS 基本调用的都是 openssl,,所以这次对 XMTLS 里涉及 openssl 的部分,我就没做特别详细的分析。
要是后续想深入了解底层细节,直接去查 openssl 和tls 的相关资料,就能获得更全面的信息。