[Python] 纯文本查看 复制代码
from flask import Flask, request, render_template_string, jsonify, make_response
import subprocess
import socket
import os
import threading
import time
'''
吾爱破解论坛 http://www.52pojie.cn
'''
app = Flask(__name__)
# 登录失败次数记录,键为IP,值为失败次数
failed_logins = {}
# 黑名单列表
blacklist = set()
def get_local_ip():
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
# 连接到一个公共的 DNS 服务器以获取本地 IP
s.connect(("8.8.8.8", 53))
return s.getsockname()[0]
except:
return None
def load_server_ips():
ip_file = 'server_ip.txt'
if not os.path.exists(ip_file):
return []
with open(ip_file, 'r', encoding='utf-8') as f:
ips = [line.strip() for line in f.readlines()]
return ips
def save_server_ips(ips):
ip_file = 'server_ip.txt'
with open(ip_file, 'w', encoding='utf-8') as f:
for ip in ips:
f.write(ip + '\n')
def ping_ip(ip):
try:
output = subprocess.check_output(['ping', '-4', '-n', '1', ip], stderr=subprocess.STDOUT, universal_newlines=True)
if "unreachable" in output.lower():
return False
return True
except subprocess.CalledProcessError:
return False
def bind_server(host_ip):
try:
print(f"尝试绑定到IP: {host_ip}")
app.run(host=host_ip, port=80)
except Exception as e:
print(f"绑定到 {host_ip} 失败: {e}")
def process_server_ip_x():
ip_file = 'xun_ip.txt'
while True:
if not os.path.exists(ip_file):
try:
with open(ip_file, 'w', encoding='utf-8') as f:
pass # 创建空文件
print(f"已创建文件: {ip_file}")
except Exception as e:
print(f"创建文件 {ip_file} 失败: {e}")
time.sleep(60)
continue
try:
with open(ip_file, 'r', encoding='utf-8') as f:
lines = [line.strip() for line in f.readlines()]
if len(lines) >= 2:
second_line = lines[1]
if second_line == '1':
first_line = lines[0]
if first_line and is_valid_ip(first_line):
print(f"检测到服务器IP: {first_line}")
if ping_ip(first_line):
print(f"Ping测试成功,IP {first_line} 可用")
bind_server(first_line)
# 绑定成功后跳出循环,释放文件
break
else:
print(f"Ping测试失败,IP {first_line} 不可用")
else:
print("第一行IP无效或为空")
else:
print("第二行不是数字1,跳出循环")
break
else:
print("文件行数不足,跳出循环")
break
except Exception as e:
print(f"读取文件 {ip_file} 失败: {e}")
print("等待60秒后重试")
time.sleep(60)
def is_valid_ip(ip):
try:
socket.inet_aton(ip)
return True
except:
return False
def load_root_credentials():
default_account = "root"
default_password = "52pj2025"
default_max_attempts = 5
root_file = 'root.txt'
if not os.path.exists(root_file):
try:
with open(root_file, 'w', encoding='utf-8') as f:
f.write(f'{default_account}\n{default_password}\n{default_max_attempts}\n')
print(f"已创建文件: {root_file},使用默认账户、密码和登录尝试次数")
except Exception as e:
print(f"创建文件 {root_file} 失败: {e}")
return default_account, default_password, default_max_attempts
try:
with open(root_file, 'r', encoding='utf-8') as f:
lines = [line.strip() for line in f.readlines()]
if len(lines) >= 3:
account = lines[0]
password = lines[1]
max_attempts_str = lines[2]
# 检查账号和密码是否为空
if not account:
print("账号为空,使用默认值")
account = default_account
if not password:
print("密码为空,使用默认值")
password = default_password
# 检查最大尝试次数是否有效
if max_attempts_str.isdigit() and int(max_attempts_str) >= 1:
max_attempts = int(max_attempts_str)
else:
print("登录尝试次数无效,使用默认值5")
max_attempts = default_max_attempts
# 如果有默认值被使用,则更新文件
if account != lines[0] or password != lines[1] or max_attempts != int(max_attempts_str):
try:
with open(root_file, 'w', encoding='utf-8') as f:
f.write(f'{account}\n{password}\n{max_attempts}\n')
print(f"已更新文件: {root_file}")
except Exception as e:
print(f"更新文件 {root_file} 失败: {e}")
else:
print("文件行数不足,使用默认值")
account = default_account
password = default_password
max_attempts = default_max_attempts
try:
with open(root_file, 'w', encoding='utf-8') as f:
f.write(f'{account}\n{password}\n{max_attempts}\n')
print(f"已创建文件: {root_file},使用默认账户、密码和登录尝试次数")
except Exception as e:
print(f"创建文件 {root_file} 失败: {e}")
except Exception as e:
print(f"读取文件 {root_file} 失败: {e}")
return default_account, default_password, default_max_attempts
return account, password, max_attempts
def load_blacklist():
blacklist_file = 'hei_ip.txt'
if not os.path.exists(blacklist_file):
try:
with open(blacklist_file, 'w', encoding='utf-8') as f:
pass # 创建空文件
print(f"已创建文件: {blacklist_file}")
except Exception as e:
print(f"创建文件 {blacklist_file} 失败: {e}")
try:
with open(blacklist_file, 'r', encoding='utf-8') as f:
ips = [line.strip() for line in f.readlines()]
blacklist.update(ips)
except Exception as e:
print(f"读取文件 {blacklist_file} 失败: {e}")
def update_blacklist(ip):
blacklist_file = 'hei_ip.txt'
try:
with open(blacklist_file, 'a', encoding='utf-8') as f:
f.write(ip + '\n')
blacklist.add(ip)
print(f"IP {ip} 已添加到黑名单")
except Exception as e:
print(f"更新黑名单失败: {e}")
@app.before_request
def check_blacklist():
client_ip = request.remote_addr
if client_ip in blacklist:
return make_response(jsonify({"message": "您的IP已被禁止访问"}), 403)
def main():
# 创建“1文件说明.txt”文件
file_description = '1文件说明.txt'
if not os.path.exists(file_description):
try:
with open(file_description, 'w', encoding='utf-8') as f:
f.write('hei_ip.txt 文件说明:\n')
f.write(' ip黑名单(拒绝该ip登录)\n\n')
f.write('root.txt 文件说明:\n')
f.write(' 第一行填写账号(默认:root)\n')
f.write(' 第二行填写密码(默认:52pj2025)\n')
f.write(' 第三行填写登录尝试次数,不得小于1(默认:5)\n\n')
f.write('server_ip.txt 文件说明:\n')
f.write(' 储存尝试绑定的ip地址,一行储存一个ip地址\n\n')
f.write('xun_ip.txt 文件说明:\n')
f.write(' 不断尝试绑定其中的ip地址\n')
f.write(' 只能储存一个ip地址,且储存在第一行\n')
f.write(' 第二行填写参数,1=启用,非1或空白=不启用\n\n\n')
f.write('--------吾爱破解论坛 http://www.52pojie.cn--------')
print(f"已创建文件: {file_description}")
except Exception as e:
print(f"创建文件 {file_description} 失败: {e}")
else:
print(f"文件 {file_description} 已存在,跳过创建步骤")
# 启动处理 server_ip_x.txt 的线程
t = threading.Thread(target=process_server_ip_x, daemon=True)
t.start()
# 处理 server_ip.txt 的逻辑
server_ips = load_server_ips()
if not server_ips:
print("没有存储的服务器IP,尝试获取本机IP")
server_ip = get_local_ip()
if server_ip:
print(f"获取到服务器IP: {server_ip}")
server_ips.append(server_ip)
save_server_ips(server_ips)
else:
print("无法获取本机IP")
server_ips = []
reachable_ips = []
for ip in server_ips:
if ping_ip(ip):
print(f"Ping测试成功,IP {ip} 可用")
reachable_ips.append(ip)
else:
print(f"Ping测试失败,IP {ip} 不可用")
if reachable_ips:
threads = []
for ip in reachable_ips:
t = threading.Thread(target=bind_server, args=(ip,))
threads.append(t)
t.start()
else:
print("所有存储的IP都不可用,尝试绑定到127.0.0.1")
bind_server('127.0.0.1')
# 加载 root 凭证
account, password, max_attempts = load_root_credentials()
if not account or not password:
print("无法加载 root 凭证,服务器将无法处理登录请求")
return
load_blacklist()
@app.route('/', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
data = request.form
if 'username' in data and 'password' in data:
username = data['username']
password_m = data['password']
print(f"Received username: {username}, password: {password_m}")
# 处理中文账户或密码
if not isinstance(username, str) or not isinstance(password_m, str):
return jsonify({"message": "用户名或密码格式错误"}), 400
if username == account and password_m == password:
# 登录成功,清除失败次数
client_ip = request.remote_addr
if client_ip in failed_logins:
del failed_logins[client_ip]
return render_template_string('''
<!DOCTYPE html>
<html>
<head>
<title>登录成功</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
h1, h2 {
text-align: center;
}
/* 按钮样式 */
.btn {
padding: 10px 20px;
font-size: 16px;
margin: 5px;
border: none;
border-radius: 4px;
cursor: pointer;
background-color: #007bff;
color: white;
}
.btn:hover {
background-color: #0056b3;
}
/* 输入框样式 */
#command-input {
width: 100%;
height: 100px;
font-size: 16px;
padding: 10px;
resize: vertical;
border: 1px solid #ccc;
border-radius: 4px;
box-sizing: border-box;
}
/* 结果显示区域 */
#result {
margin-top: 20px;
padding: 10px;
border: 1px solid #ddd;
border-radius: 4px;
background-color: #f9f9f9;
white-space: pre-wrap; /* 保留换行 */
}
/* 执行按钮样式 */
#execute-button {
float: right;
margin-top: 10px;
}
/* 响应式设计 */
@media (max-width: 600px) {
.btn {
width: 100%;
font-size: 14px;
}
#command-input {
height: 100px;
font-size: 14px;
}
#execute-button {
width: 100%;
float: none;
}
#result {
font-size: 14px;
}
}
</style>
</head>
<body>
<h1>登录成功</h1>
<div style="text-align: center;">
<button class="btn">退出登录</button>
<button class="btn">关机</button>
<button class="btn">重启</button>
<button class="btn">休眠</button>
<button class="btn">锁定</button>
</div>
<br>
<h2>执行命令</h2>
<textarea id="command-input" placeholder="请输入命令"></textarea>
<button id="execute-button" class="btn">执行</button>
<h3>结果:</h3>
<div id="result"></div>
<script>
function logout() {
fetch('/logout', { method: 'GET' })
.then(response => response.json())
.then(data => {
alert(data.message);
window.location.href = '/';
});
}
function shutdown() {
fetch('/shutdown', { method: 'GET' })
.then(response => response.json())
.then(data => alert(data.message));
}
function restart() {
fetch('/restart', { method: 'GET' })
.then(response => response.json())
.then(data => alert(data.message));
}
function sleep() {
fetch('/sleep', { method: 'GET' })
.then(response => response.json())
.then(data => alert(data.message));
}
function lock() {
fetch('/lock', { method: 'GET' })
.then(response => response.json())
.then(data => alert(data.message));
}
function executeCommand() {
const command = document.getElementById('command-input').value;
fetch('/execute', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ command: command })
})
.then(response => response.json())
.then(data => {
document.getElementById('result').innerText = data.output;
})
.catch(error => {
document.getElementById('result').innerText = '执行失败';
});
}
</script>
</body>
</html>
''')
else:
client_ip = request.remote_addr
if client_ip not in failed_logins:
failed_logins[client_ip] = 1
else:
failed_logins[client_ip] += 1
if failed_logins[client_ip] >= max_attempts:
update_blacklist(client_ip)
return make_response(jsonify({"message": "您的IP已被禁止访问"}), 403)
return jsonify({"message": "登录失败"}), 401
else:
return jsonify({"message": "缺少用户名或密码"}), 400
else:
return render_template_string('''
<!DOCTYPE html>
<html>
<head>
<title>登录</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
form {
max-width: 400px;
margin: 0 auto;
}
input[type="text"], input[type="password"] {
width: 100%;
padding: 10px;
margin: 10px 0;
border: 1px solid #ccc;
border-radius: 4px;
box-sizing: border-box;
}
input[type="submit"] {
width: 100%;
padding: 10px;
background-color: #28a745;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
input[type="submit"]:hover {
background-color: #218838;
}
@media (max-width: 600px) {
form {
max-width: 100%;
}
}
</style>
</head>
<body>
<h1>登录</h1>
<form method="post">
<input type="text" name="username" placeholder="用户名"><br>
<input type="password" name="password" placeholder="密码"><br>
<input type="submit" value="登录">
</form>
</body>
</html>
''')
@app.route('/logout', methods=['GET'])
def logout():
return jsonify({"message": "退出登录成功"}), 200
@app.route('/shutdown', methods=['GET'])
def shutdown():
try:
subprocess.run(["shutdown", "/s", "/t", "1"], check=True)
return jsonify({"message": "关机成功"}), 200
except subprocess.CalledProcessError as e:
return jsonify({"message": "关机失败", "error": str(e)}), 500
@app.route('/restart', methods=['GET'])
def restart():
try:
subprocess.run(["shutdown", "/r", "/t", "1"], check=True)
return jsonify({"message": "重启成功"}), 200
except subprocess.CalledProcessError as e:
return jsonify({"message": "重启失败", "error": str(e)}), 500
@app.route('/sleep', methods=['GET'])
def sleep():
try:
subprocess.run(["rundll32.exe", "powrprof.dll,SetSuspendState", "0,1,0"], check=True)
return jsonify({"message": "休眠成功"}), 200
except subprocess.CalledProcessError as e:
return jsonify({"message": "休眠失败", "error": str(e)}), 500
@app.route('/lock', methods=['GET'])
def lock():
try:
subprocess.run(["rundll32", "user32.dll,LockWorkStation"], check=True)
return jsonify({"message": "锁定成功"}), 200
except subprocess.CalledProcessError as e:
return jsonify({"message": "锁定失败", "error": str(e)}), 500
@app.route('/execute', methods=['POST'])
def execute():
data = request.get_json()
command = data.get('command', '')
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
output = result.stdout + result.stderr
return jsonify({"output": output}), 200
except Exception as e:
return jsonify({"output": f"执行失败: {str(e)}"}), 500
if __name__ == '__main__':
main()