局域网端对端加密通讯,socket传输
**简介**使用SOCKET技术实现数据交换,全部加解密操作都在客户端完成,服务端不参与数据处理,仅供数据交换。
**使用方法**
安装服务端与客户端依赖的支持库。运行一个服务端与两个客户端。
**语言**
Python
**效果图**
上面为服务端,下面两个为客户端
**代码**
server.py
```
import socket
import threading
from datetime import datetime
from cryptography.fernet import Fernet
# 生成服务端的随机密钥
server_key = Fernet.generate_key()
clients = {}
lock = threading.Lock()# 用于多线程访问的锁
def handle_client(client_socket, client_address, client_id):
print(f"客户端 {client_address} 已连接,ID: {client_id}")
# 发送服务端密钥给客户端
client_socket.sendall(server_key)
# 接收客户端密钥混合数据
client_key_mixed = client_socket.recv(1024)
with lock:
clients = {"socket": client_socket, "key_mixed": client_key_mixed}
# 如果两个客户端都已连接,进行密钥交换
if len(clients) == 2:
other_client_id = 2 if client_id == 1 else 1
other_client = clients
# 将 client_id 的混合密钥发送给 other_client
other_client["socket"].sendall(client_key_mixed)
# 将 other_client 的混合密钥发送给 client_id
client_socket.sendall(other_client["key_mixed"])
# 处理加密消息的交换
while True:
try:
# 接收客户端加密消息并转发
encrypted_message = client_socket.recv(1024)
if encrypted_message:
# 获取当前时间
exchange_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 输出接收到的密文及时间
print(f"[{exchange_time}] 从客户端 {client_id} 接收到的密文: {encrypted_message}")
# 转发消息给另一个客户端
other_client_socket = clients["socket"]
other_client_socket.sendall(encrypted_message)
# 输出转发操作
print(f"[{exchange_time}] 密文转发给另一个客户端")
else:
break
except Exception as e:
print(f"处理客户端 {client_id} 消息时出错: {e}")
break
print(f"客户端 {client_address} 已断开")
client_socket.close()
def start_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 65432))
server.listen(2)
print("服务器启动,等待客户端连接...")
client_id = 1
while True:
client_socket, client_address = server.accept()
# 为每个客户端启动新线程
client_handler = threading.Thread(target=handle_client, args=(client_socket, client_address, client_id))
client_handler.start()
client_id += 1
start_server()
```
client.py
```
import socket
import threading
import base64
from cryptography.fernet import Fernet
import os
# 生成客户端的随机密钥
client_key = Fernet.generate_key()
# 全局变量存储聊天记录
chat_history = []
# 全局变量存储混合后的密钥
final_key_base64 = None
def xor_bytes(byte1, byte2):
# 使用 XOR 混合两个字节串,并确保两者的长度一致
return bytes(a ^ b for a, b in zip(byte1, byte2))
def clear_screen():
"""清空屏幕并重新绘制聊天界面"""
os.system('cls')# 对于 Windows,使用 `cls` 清屏
# 显示混合后的密钥
if final_key_base64:
print(f"共享密钥: {final_key_base64.decode()}\n")
# 只显示最新的10条聊天记录
for message in chat_history[-10:]:
print(message)
# 重新显示输入提示
print("请输入要发送的消息:", end='', flush=True)
def receive_messages(client_socket, fernet):
"""用于接收消息的线程"""
while True:
try:
encrypted_response = client_socket.recv(1024)
if encrypted_response:
# 解密消息
decrypted_message = fernet.decrypt(encrypted_response).decode('utf-8')
# 将收到的消息添加到聊天记录
chat_history.append(f"收到消息: {decrypted_message}")
# 清空屏幕并重新显示聊天记录和输入提示
clear_screen()
except Exception as e:
print(f"接收消息时出错: {e}")
break
def send_messages(client_socket, fernet):
"""用于发送消息的主线程"""
while True:
message = input()
if message:
# 加密消息
encrypted_message = fernet.encrypt(message.encode('utf-8'))
# 发送加密消息
client_socket.sendall(encrypted_message)
# 将发送的消息添加到聊天记录
chat_history.append(f"我: {message}")
# 清空屏幕并重新显示聊天记录和输入提示
clear_screen()
def start_client():
global final_key_base64
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 65432)
client_socket.connect(server_address)
# 接收服务端的密钥
server_key = client_socket.recv(1024)
# 生成客户端的密钥并与服务器的密钥混合
client_key_mixed = xor_bytes(client_key, server_key)
print(f"混合后的客户端密钥发送给服务器")
# 发送混合后的密钥给服务器
client_socket.sendall(client_key_mixed)
# 接收对方客户端的混合密钥
other_client_key_mixed = client_socket.recv(1024)
# 将对方的密钥与自己的密钥混合,得到共享密钥
shared_key = xor_bytes(other_client_key_mixed, client_key)
# 取共享密钥的最后32个字节作为Fernet密钥
final_key = shared_key[-32:]
final_key_base64 = base64.urlsafe_b64encode(final_key)
print(f"共享密钥生成: {final_key_base64.decode()}")
# 使用最终的共享密钥进行加密与解密
fernet = Fernet(final_key_base64)
# 清空屏幕,准备显示聊天界面
clear_screen()
# 创建一个线程来接收消息
receive_thread = threading.Thread(target=receive_messages, args=(client_socket, fernet))
receive_thread.daemon = True# 让线程在主程序退出时结束
receive_thread.start()
# 主线程负责发送消息
send_messages(client_socket, fernet)
start_client()
```
---
**PS**
本来是想做一个多人的端对端,但是不太行。我的思路就是通过混合多个客户端密钥获得公共广播密钥用来加解密,but太难了。 更新加了个哈希值校验防止消息内容篡改
哈希值没加密,我觉得没啥必要
import socket
import threading
import base64
import hashlib
from cryptography.fernet import Fernet
import os
# 生成客户端的随机密钥
client_key = Fernet.generate_key()
# 全局变量存储聊天记录
chat_history = []
# 全局变量存储混合后的密钥
final_key_base64 = None
def xor_bytes(byte1, byte2):
# 使用 XOR 混合两个字节串,并确保两者的长度一致
return bytes(a ^ b for a, b in zip(byte1, byte2))
def clear_screen():
"""清空屏幕并重新绘制聊天界面"""
os.system('cls')# 对于 Windows,使用 `cls` 清屏
# 显示混合后的密钥
if final_key_base64:
print(f"共享密钥: {final_key_base64.decode()}\n")
# 只显示最新的10条聊天记录
for message in chat_history[-10:]:
print(message)
# 重新显示输入提示
print("请输入要发送的消息:", end='', flush=True)
# 计算消息的SHA-256哈希值
def calculate_sha256(message):
sha256_hash = hashlib.sha256(message.encode('utf-8')).digest()
return sha256_hash
def receive_messages(client_socket, fernet):
"""用于接收消息的线程"""
while True:
try:
# 接收包含加密消息和SHA-256哈希值的数据
encrypted_response_with_hash = client_socket.recv(1056)# 1024字节密文 + 32字节的SHA-256哈希值
if encrypted_response_with_hash:
# 分离加密消息和SHA-256哈希值
encrypted_message = encrypted_response_with_hash[:-32]
received_sha256 = encrypted_response_with_hash[-32:]
# 解密消息
decrypted_message = fernet.decrypt(encrypted_message).decode('utf-8')
# 计算解密后消息的SHA-256哈希值
calculated_sha256 = calculate_sha256(decrypted_message)
# 校验哈希值是否一致
if calculated_sha256 == received_sha256:
# 校验通过,将消息添加到聊天记录
chat_history.append(f"收到消息: {decrypted_message}")
else:
# 校验失败,记录篡改警告
chat_history.append(f"警告: 收到的消息可能被篡改")
# 清空屏幕并重新显示聊天记录和输入提示
clear_screen()
except Exception as e:
print(f"接收消息时出错: {e}")
break
def send_messages(client_socket, fernet):
"""用于发送消息的主线程"""
while True:
message = input()
if message:
# 计算消息的SHA-256哈希值
sha256_hash = calculate_sha256(message)
# 加密消息
encrypted_message = fernet.encrypt(message.encode('utf-8'))
# 将加密消息和SHA-256哈希值组合在一起
message_to_send = encrypted_message + sha256_hash
# 发送组合后的消息
client_socket.sendall(message_to_send)
# 将发送的消息添加到聊天记录
chat_history.append(f"我: {message}")
# 清空屏幕并重新显示聊天记录和输入提示
clear_screen()
def start_client():
global final_key_base64
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 65432)
client_socket.connect(server_address)
# 接收服务端的密钥
server_key = client_socket.recv(1024)
# 生成客户端的密钥并与服务器的密钥混合
client_key_mixed = xor_bytes(client_key, server_key)
print(f"混合后的客户端密钥发送给服务器")
# 发送混合后的密钥给服务器
client_socket.sendall(client_key_mixed)
# 接收对方客户端的混合密钥
other_client_key_mixed = client_socket.recv(1024)
# 将对方的密钥与自己的密钥混合,得到共享密钥
shared_key = xor_bytes(other_client_key_mixed, client_key)
# 取共享密钥的最后32个字节作为Fernet密钥
final_key = shared_key[-32:]
final_key_base64 = base64.urlsafe_b64encode(final_key)
print(f"共享密钥生成: {final_key_base64.decode()}")
# 使用最终的共享密钥进行加密与解密
fernet = Fernet(final_key_base64)
# 清空屏幕,准备显示聊天界面
clear_screen()
# 创建一个线程来接收消息
receive_thread = threading.Thread(target=receive_messages, args=(client_socket, fernet))
receive_thread.daemon = True# 让线程在主程序退出时结束
receive_thread.start()
# 主线程负责发送消息
send_messages(client_socket, fernet)
start_client()
Activityc 发表于 2024-10-18 16:04
是的,确实没有,这只是个技术演示的demo,主要还是端对端的思路
端到端重要的就是服务端无密钥,无法解密数据,侦听流量无法拿到密钥,并且有方式对抗中间人攻击。你这个代码服务端就可以计算密钥已经不算端到端了,监听流量一样可以拿到密钥,中间人伪装服务端也可以获取密钥,满足不了端到端要求。要么用非对称加密体系,要么使用密匙协商算法,服务端使用证书机制作为权威信息源提供密钥派发服务 {:301_992:}学习学习了 厉害了我的哥,多多学习{:301_993:} 厉害了 word哥 感谢分享,局域网通讯软件之前用过几个,比如飞秋或者内网通什么的 两个同事,有秘密。 我以前写过一个基于UDP的加密通信,失败了{:1_918:} mmcc1984 发表于 2024-10-17 16:12
感谢分享,局域网通讯软件之前用过几个,比如飞秋或者内网通什么的
可惜飞秋已经不更了,现在公司还在用老版的,虽然用的企业微信,但是内网还是飞秋方便 其实主要重点还是想搞个端对端加密的通讯工具,用socket局域网通讯是因为没钱买服务器调试,虚拟机或者本地搭建又太麻烦。如果有大佬能改成服务器外网端对端也欢迎。 KirchoffNZ 发表于 2024-10-17 16:46
我以前写过一个基于UDP的加密通信,失败了
UDP不稳定啊,聊着聊着就断了