[Python] 纯文本查看 复制代码
# -*- coding: utf-8 -*-
from flask import Flask, send_from_directory
import os
import sys
import netifaces
import random
import pyperclip
import winreg
import socket
import psutil
import time
import threading
import tkinter as tk
from tkinter import filedialog
from PIL import ImageTk, Image #
import qrcode
from colorama import init, Fore, Back, Style
from datetime import datetime
import shutil
import logging
def start_flask_server(p,fol,cip):
app = Flask(__name__)
logging.getLogger("werkzeug").disabled = True
@app.route('/download/<filename>', methods=['GET'])
def download_file(filename):
file_path = os.path.join(fol, filename)
if not os.path.exists(file_path):
return "文件不存在", 404
return send_from_directory(fol, filename, as_attachment=True, conditional=True)
app.run(host=cip, port=p, debug=False, threaded=True)
def get_all_ips(): #获取并返回本机所有IP地址
interfaces = netifaces.interfaces()
ip_addresses = []
for interface in interfaces:
addresses = netifaces.ifaddresses(interface)
if netifaces.AF_INET in addresses:
for address in addresses[netifaces.AF_INET]:
ip_address = address['addr']
if ip_address != '127.0.0.1':
ip_addresses.append(ip_address)
return ip_addresses
def choose_ip(): #选择要使用的IP
all_ips = get_all_ips()
ip_count = len(all_ips)
for i, ip in enumerate(all_ips):
print(f"{i+1}、{ip}")
while True:
selected_index = input("选择要使用的IP,并输入编号,按回车确认:")
try:
selected_index = int(selected_index)
if selected_index >= 1 and selected_index <= ip_count:
selected_ip = all_ips[selected_index - 1]
print("已选择IP:" + selected_ip)
return selected_ip
else:
print(Back.RED + Fore.WHITE + "无效的IP编号,请重新输入!" + Style.RESET_ALL)
except ValueError:
print(Back.RED + Fore.WHITE + "无效的IP编号,请重新输入!" + Style.RESET_ALL)
def write_registry_entry(epath, ico,cip): #写入注册表
reg_path = r"*\shell\Share File"
key = winreg.CreateKey(winreg.HKEY_CLASSES_ROOT, reg_path)
winreg.SetValue(key, None, winreg.REG_SZ, "分享此文件")
winreg.SetValueEx(key, 'ICON', 0, winreg.REG_SZ, ico)
winreg.SetValueEx(key, 'ip', 0, winreg.REG_SZ, cip)
winreg.SetValueEx(key, 'autoexit', 0, winreg.REG_SZ, "1")
winreg.SetValueEx(key, 'showqr', 0, winreg.REG_SZ, "1")
winreg.CloseKey(key)
command_path = r"*\shell\Share File\command"
command_key = winreg.CreateKey(winreg.HKEY_CLASSES_ROOT, command_path)
winreg.SetValue(command_key, None, winreg.REG_SZ, epath + ' "%1"')
winreg.CloseKey(command_key)
def read_registry_value(): #读取注册表
try:
key = winreg.OpenKey(winreg.HKEY_CLASSES_ROOT, r"Python.File\Shell\open\command")
value, _ = winreg.QueryValueEx(key, "")
winreg.CloseKey(key)
value = value.split('"')
return value[1]
except OSError:
print("注册表项不存在")
except Exception as e:
print(f"读取注册表值时出现错误: {e}")
def check_reg_key(keyname): #读取注册表指定项
key_path = r'*\shell\Share File'
try:
key = winreg.OpenKey(winreg.HKEY_CLASSES_ROOT, key_path)
value, reg_type = winreg.QueryValueEx(key, keyname)
return value
except FileNotFoundError:
return None
def CheckPortUse(port): #检测端口是否被占用
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', port))
if result == 0:
sock.close()
return False
else:
connections = psutil.net_connections()
for conn in connections:
if conn.laddr.port == port:
return False
return True
def get_port_connections(port): #获取本机端口的连接状态
global sflj
global clients
current_time = datetime.now()
formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
connections = psutil.net_connections()
stat= ""
ips = []
for conn in connections:
if conn.laddr.port == port:
stat= conn.status
if conn.status == 'ESTABLISHED':
ips.append(conn.raddr.ip)
diff1 = set(ips) - set(clients)
if diff1:
print(Back.WHITE + Fore.GREEN + "有新用户接入:{}".format(list(diff1)) + Style.RESET_ALL, formatted_time)
diff2= set(clients) - set(ips)
if diff2:
print(Back.RED + Fore.WHITE + "用户:{}已断开连接。".format(list(diff2)) + Style.RESET_ALL, formatted_time)
clients = ips
ipnum = len(list(set(ips)))
if ipnum > 0:
if sflj == 0:
sflj += 1
print(Fore.GREEN + "当前所有在线用户:{}".format(list(set(ips))) + Style.RESET_ALL, formatted_time)
return False
else:
if sflj == 0:
#print("暂无用户连接")
return False
else:
#print("所有用户都断开连接")
return True
def save_qr_code(img):
filename = filedialog.asksaveasfilename(defaultextension=".png",
filetypes=[("PNG Image", "*.png")],
initialfile="文件分享二维码.png")
if filename:
img.save(filename)
def show_qr_code(data):
window = tk.Tk()
window.title("二维码分享")
window.resizable(False, False)
window.attributes('-toolwindow', True)
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_H,
box_size=10,
border=4,
)
qr.add_data(data)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
width, height = img.size
max_width = 200
max_height = 200
if width > max_width or height > max_height:
scale = min(max_width/width, max_height/height)
new_width = int(width * scale)
new_height = int(height * scale)
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
photo = ImageTk.PhotoImage(img)
label = tk.Label(window, image=photo)
label.pack()
save_button = tk.Button(window, text=" 保 存 ", command=lambda: save_qr_code(img))
save_button.pack()
window.update_idletasks()
screen_width = window.winfo_screenwidth()
screen_height = window.winfo_screenheight()
window_width = window.winfo_width()
window_height = window.winfo_height()
x = int((screen_width - window_width) / 2)
y = int((screen_height - window_height) / 2)
window.geometry(f"{window_width}x{window_height}+{x}+{y}")
window.mainloop()
def sys_init(arg):
ip = cip = choose_ip()
executable_path = os.path.abspath(sys.executable)
if getattr(sys, 'frozen', False):
exe = icon = '"' + executable_path + '"'
#print("脚本已编译成exe文件")
else:
pypath = read_registry_value()
exe = '"' + pypath + '" "' + arg[0] + '"'
icon = '"' + pypath + '"'
#print("脚本未被编译成exe文件")
write_registry_entry(exe,icon,cip)
return ip
def qr_code_thread(url):
show_qr_code(url)
if __name__ == '__main__':
init(autoreset=True)
#下面两行为了清除打包时启用“输出引导进度信息”而在启动时打印的日志信息
os.system("cls" if os.name in ("nt", "dos") else "clear")
#print("\033c", end="")
args = sys.argv
ip = check_reg_key("ip")
if ip == None:
ip = sys_init(args)
if len(args) - 1 == 0:
print(Back.RED + Fore.WHITE + "\n**请通过右键菜单运行此程序**\n" + Style.RESET_ALL)
print(Back.BLUE + Fore.WHITE + "Tips:如不需自动退出和二维码功能,请在注册表HKEY_CLASSES_ROOT\\*\\shell\\Share File中自行修改" + Style.RESET_ALL)
hc = input("\n直接按回车键退出程序,输入其他任意内容后回车重新设置IP:")
if hc != "":
sys_init(args)
sys.exit(0)
file = args[1]
path = os.path.dirname(file)
filename = os.path.basename(file)
while True:
port = random.randint(10000, 65500)
if CheckPortUse(port):
break
url = "http://" + ip + ":" + str(port) + "/download/" + filename
pyperclip.copy(url)
print(Back.BLUE + Fore.WHITE + "\n↓↓文件下载地址已复制到剪贴板↓↓\n" + Style.RESET_ALL)
print(Back.GREEN + Fore.WHITE + url + Style.RESET_ALL)
print(Back.BLUE + Fore.WHITE + "\n↑↑文件下载地址已复制到剪贴板↑↑\n" + Style.RESET_ALL)
print("Tips:如无法重新复制上方链接,可在此窗口标题栏点击右键选择属性,勾选“快速编辑模式”,点击确定保存。")
print(Fore.GREEN + "\n52PoJie\n" + Style.RESET_ALL)
qr = check_reg_key("showqr")
if str(qr) == "1":
print(Back.CYAN + Fore.WHITE + "显示二维码:" + Style.RESET_ALL, Back.GREEN + Fore.WHITE + " 开 " + Style.RESET_ALL)
print(Fore.YELLOW + "如不需二维码功能请在注册表HKEY_CLASSES_ROOT\\*\\shell\\Share File\\showqr中自行修改值为0"+ Style.RESET_ALL)
qr_thread = threading.Thread(target=qr_code_thread, args=(url,))
qr_thread.start()
else:
print(Back.CYAN + Fore.WHITE + "显示二维码:" + Style.RESET_ALL, Back.RED + Fore.WHITE + " 关 " + Style.RESET_ALL)
print(Fore.YELLOW + "如需开启二维码功能请在注册表HKEY_CLASSES_ROOT\\*\\shell\\Share File\\showqr中自行修改值为1"+ Style.RESET_ALL)
autoexit = check_reg_key("autoexit")
if str(autoexit) == "1":
print(Back.CYAN + Fore.WHITE + "所有连接关闭将自动退出功能:" + Style.RESET_ALL, Back.GREEN + Fore.WHITE + " 开 " + Style.RESET_ALL)
print(Fore.YELLOW + "如不需自动退出功能请在注册表HKEY_CLASSES_ROOT\\*\\shell\\Share File\\autoexit中自行修改值为0"+ Style.RESET_ALL)
else:
print(Back.CYAN + Fore.WHITE + "所有连接关闭将自动退出功能:" + Style.RESET_ALL, Back.RED + Fore.WHITE + " 关 " + Style.RESET_ALL)
print(Fore.YELLOW + "如需开启自动退出功能请在注册表HKEY_CLASSES_ROOT\\*\\shell\\Share File\\autoexit中自行修改值为1"+ Style.RESET_ALL)
columns = shutil.get_terminal_size().columns
print('~' * columns)
server_thread = threading.Thread(target=start_flask_server, args=(port, path, ip))
server_thread.start()
sflj = 0
clients = []
while True:
zt = get_port_connections(port)
if zt and str(autoexit) == "1":
print("结束了,可以退出了")
time.sleep(3)
os._exit(0)
time.sleep(3)