口令传送箱
解决问题
很多时候,我们都想将一些文件或文本传送给别人,或者跨端传递一些信息,但是我们又不想为了分享,而去下载一些七里八里端软件,这时候,我们就可以使用口令传送箱。
主要功能
- [x] 轻量简洁,Fastapi+sqlite3
- [x] 拖拽,复制粘贴上传
- [x] 文件口令传输,生成二维码
- [x] 分享文件:多种上传方式供你选择
- [x] 分享文本:直接复制粘贴直接上传
- [x] 防爆破:错误五次拉黑十分钟
- [x] 完全匿名:不记录任何信息
- [x] 无需注册:无需注册,无需登录
- [x] Sqlite3数据库:无需安装数据库
- [x] 可以加get参数code,这样打开就会读取取件码如:http://xxx.com?code=12345
- [x] 管理面板:简单列表页删除违规文件
- [ ] 口令使用次数,口令有效期,二维码分享
更新记录
2022年12月10日 01:56:43
- 管理面板已新增,一如既往的极简,只有删除
- 二维码图片(调用的网络接口,如果离线环境将无法使用,一切为了极简)
系统截图
取件
寄件
管理面板
部署方式
服务端部署
- 安装Python3
- 拉取代码,解压缩
- 安装依赖包:
pip install -r requirements.txt
- 运行
uvicorn main:app --host 0.0.0.0 --port 12345
- 然后你自己看怎么进程守护吧
宝塔部署
- 安装宝塔Python Manager
- 然后你自己看着填吧
Docker部署
docker build --file Dockerfile --tag filecodebox .
docker run -d -p 12345:12345 --name filecodebox filecodebox
关键代码
import datetime
import os
import uuid
from fastapi import FastAPI, Depends, UploadFile, Form, File
from sqlalchemy.orm import Session
from starlette.requests import Request
from starlette.responses import HTMLResponse
import random
from starlette.staticfiles import StaticFiles
import database
from database import engine, SessionLocal, Base
Base.metadata.create_all(bind=engine)
app = FastAPI()
if not os.path.exists('./static'):
os.makedirs('./static')
app.mount("/static", StaticFiles(directory="static"), name="static")
index_html = open('templates/index.html', 'r').read()
# 过期时间
exp_hour = 24
# 允许错误次数
error_count = 5
# 禁止分钟数
error_minute = 60
error_ip_count = {}
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_code(db: Session = Depends(get_db)):
code = random.randint(10000, 99999)
while db.query(database.Codes).filter(database.Codes.code == code).first():
code = random.randint(10000, 99999)
return str(code)
def get_file_name(key, ext, file):
now = datetime.datetime.now()
path = f'./static/upload/{now.year}/{now.month}/{now.day}/'
name = f'{key}.{ext}'
if not os.path.exists(path):
os.makedirs(path)
file = file.file.read()
with open(f'{os.path.join(path, name)}', 'wb') as f:
f.write(file)
return key, len(file), path[1:] + name
@app.get('/')
async def index():
return HTMLResponse(index_html)
@app.post('/')
async def index(request: Request, code: str, db: Session = Depends(get_db)):
info = db.query(database.Codes).filter(database.Codes.code == code).first()
error = error_ip_count.get(request.client.host, {'count': 0, 'time': datetime.datetime.now()})
if error['count'] > error_count:
if datetime.datetime.now() - error['time'] < datetime.timedelta(minutes=error_minute):
return {'code': 404, 'msg': '请求过于频繁,请稍后再试'}
else:
error['count'] = 0
else:
if not info:
error['count'] += 1
error_ip_count[request.client.host] = error
return {'code': 404, 'msg': f'取件码错误,错误5次将被禁止10分钟'}
else:
return {'code': 200, 'msg': '取件成功,请点击库查看', 'data': info}
@app.post('/share')
async def share(text: str = Form(default=None), file: UploadFile = File(default=None), db: Session = Depends(get_db)):
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=exp_hour)
db.query(database.Codes).filter(database.Codes.use_time < cutoff_time).delete()
db.commit()
code = get_code(db)
if text:
info = database.Codes(
code=code,
text=text,
type='text/plain',
key=uuid.uuid4().hex,
size=len(text),
used=True,
name='分享文本'
)
db.add(info)
db.commit()
return {'code': 200, 'msg': '上传成功,请点击文件库查看',
'data': {'code': code, 'name': '分享文本', 'text': text}}
elif file:
key, size, full_path = get_file_name(uuid.uuid4().hex, file.filename.split('.')[-1], file)
info = database.Codes(
code=code,
text=full_path,
type=file.content_type,
key=key,
size=size,
used=True,
name=file.filename
)
db.add(info)
db.commit()
return {'code': 200, 'msg': '上传成功,请点击文件库查看',
'data': {'code': code, 'name': file.filename, 'text': full_path}}
else:
return {'code': 422, 'msg': '参数错误', 'data': []}
免责声明
本项目开源仅供学习使用,不得用于商业用途以及任何违法用途,否则后果自负,与本人无关。
如果可以给个star,谢谢