Files
weidian/server/app.py
Jeason c293f3d4ac feat: 完善登录认证系统
- 首次使用强制设置密码(至少6位)
- 密码存数据库(SHA256哈希),前端可修改
- 登录失败5次锁定5分钟,防爆破
- 导航栏添加修改密码入口
- 移除硬编码默认密码
2026-04-02 12:11:09 +08:00

263 lines
8.9 KiB
Python

import os
import sys
import hashlib
import time
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from flask import (Flask, redirect, url_for, request, session,
render_template, jsonify)
from flask_socketio import SocketIO
from server.database import init_db, get_db
socketio = SocketIO(cors_allowed_origins="*")
# 防爆破:最大失败次数 & 锁定时间(秒)
MAX_FAIL = 5
LOCK_SECONDS = 300 # 5分钟
def _hash_pwd(pwd):
return hashlib.sha256(pwd.encode()).hexdigest()
def _get_admin():
db = get_db()
row = db.execute('SELECT * FROM admin WHERE id = 1').fetchone()
db.close()
return row
def _is_locked():
admin = _get_admin()
if not admin:
return False
if admin['fail_count'] >= MAX_FAIL and admin['locked_until']:
try:
from datetime import datetime
locked = datetime.strptime(admin['locked_until'],
'%Y-%m-%d %H:%M:%S')
if datetime.now() < locked:
return True
# 锁定已过期,重置
db = get_db()
db.execute("UPDATE admin SET fail_count=0, locked_until='' "
"WHERE id=1")
db.commit()
db.close()
except Exception:
pass
return False
def _record_fail():
from datetime import datetime, timedelta
db = get_db()
admin = db.execute('SELECT fail_count FROM admin WHERE id=1').fetchone()
if admin:
new_count = (admin['fail_count'] or 0) + 1
locked_until = ''
if new_count >= MAX_FAIL:
locked_until = (datetime.now() + timedelta(
seconds=LOCK_SECONDS)).strftime('%Y-%m-%d %H:%M:%S')
db.execute("UPDATE admin SET fail_count=?, locked_until=?, "
"updated_at=datetime('now','localtime') WHERE id=1",
(new_count, locked_until))
db.commit()
db.close()
def _reset_fail():
db = get_db()
db.execute("UPDATE admin SET fail_count=0, locked_until='', "
"updated_at=datetime('now','localtime') WHERE id=1")
db.commit()
db.close()
def create_app():
app = Flask(
__name__,
template_folder=os.path.join(os.path.dirname(__file__), '..',
'templates'),
static_folder=os.path.join(os.path.dirname(__file__), '..',
'static'),
)
app.secret_key = os.environ.get(
'SECRET_KEY', 'snatcher-' + _hash_pwd('change-me')[:16])
init_db()
# ── 登录 & 初始设置密码 ──
@app.route('/login', methods=['GET', 'POST'])
def login():
admin = _get_admin()
# 首次使用:跳转设置密码
if not admin:
return redirect(url_for('setup'))
if request.method == 'POST':
if _is_locked():
remain = ''
try:
from datetime import datetime
locked = datetime.strptime(
_get_admin()['locked_until'], '%Y-%m-%d %H:%M:%S')
remain = f'{int((locked - datetime.now()).total_seconds())}'
except Exception:
remain = f'{LOCK_SECONDS}'
return render_template(
'login.html',
error=f'登录失败次数过多,请{remain}后再试')
pwd = request.form.get('password', '')
if _hash_pwd(pwd) == admin['password_hash']:
_reset_fail()
session['authenticated'] = True
session.permanent = True
app.permanent_session_lifetime = __import__(
'datetime').timedelta(days=7)
return redirect(request.args.get('next', '/'))
else:
_record_fail()
admin = _get_admin()
remaining = MAX_FAIL - (admin['fail_count'] or 0)
if remaining <= 0:
return render_template(
'login.html',
error=f'账号已锁定{LOCK_SECONDS // 60}分钟')
return render_template(
'login.html',
error=f'密码错误,还剩{remaining}次机会')
return render_template('login.html')
@app.route('/setup', methods=['GET', 'POST'])
def setup():
admin = _get_admin()
if admin:
return redirect(url_for('login'))
if request.method == 'POST':
pwd = request.form.get('password', '')
pwd2 = request.form.get('password2', '')
if len(pwd) < 6:
return render_template('setup.html',
error='密码至少6位')
if pwd != pwd2:
return render_template('setup.html',
error='两次密码不一致')
db = get_db()
db.execute(
"INSERT INTO admin (id, password_hash) VALUES (1, ?)",
(_hash_pwd(pwd),))
db.commit()
db.close()
session['authenticated'] = True
session.permanent = True
app.permanent_session_lifetime = __import__(
'datetime').timedelta(days=7)
return redirect('/')
return render_template('setup.html')
@app.route('/logout')
def logout():
session.clear()
return redirect('/login')
@app.route('/change_password', methods=['GET', 'POST'])
def change_password():
if not session.get('authenticated'):
return redirect(url_for('login'))
if request.method == 'POST':
old = request.form.get('old_password', '')
new = request.form.get('new_password', '')
new2 = request.form.get('new_password2', '')
admin = _get_admin()
if _hash_pwd(old) != admin['password_hash']:
return render_template('change_password.html',
error='原密码错误')
if len(new) < 6:
return render_template('change_password.html',
error='新密码至少6位')
if new != new2:
return render_template('change_password.html',
error='两次密码不一致')
db = get_db()
db.execute(
"UPDATE admin SET password_hash=?, "
"updated_at=datetime('now','localtime') WHERE id=1",
(_hash_pwd(new),))
db.commit()
db.close()
return render_template('change_password.html',
success='密码修改成功')
return render_template('change_password.html')
@app.before_request
def require_auth():
allowed = ('/login', '/setup')
if request.path in allowed or request.path.startswith('/static'):
return
if request.path.startswith('/socket.io'):
return
admin = _get_admin()
if not admin:
return redirect(url_for('setup'))
if not session.get('authenticated'):
return redirect(url_for('login', next=request.path))
from server.routers import accounts, tasks, orders
app.register_blueprint(accounts.bp)
app.register_blueprint(tasks.bp)
app.register_blueprint(orders.bp)
@app.route('/')
def index():
return redirect(url_for('tasks.list_tasks'))
socketio.init_app(app)
_register_socketio_events()
return app
def _register_socketio_events():
import asyncio
from server.services.remote_browser import get_session
@socketio.on('rb_mouse', namespace='/rb')
def handle_mouse(data):
s = get_session(data.get('session_id', ''))
if s and s.loop and s.loop.is_running():
asyncio.run_coroutine_threadsafe(
s.handle_mouse(
data.get('action', 'click'),
data.get('x', 0), data.get('y', 0)),
s.loop)
@socketio.on('rb_keyboard', namespace='/rb')
def handle_keyboard(data):
s = get_session(data.get('session_id', ''))
if s and s.loop and s.loop.is_running():
asyncio.run_coroutine_threadsafe(
s.handle_keyboard(data.get('text', '')), s.loop)
@socketio.on('rb_key', namespace='/rb')
def handle_key(data):
s = get_session(data.get('session_id', ''))
if s and s.loop and s.loop.is_running():
asyncio.run_coroutine_threadsafe(
s.handle_key(data.get('key', '')), s.loop)
@socketio.on('rb_close', namespace='/rb')
def handle_close(data):
from server.services.remote_browser import close_session
close_session(data.get('session_id', ''))
if __name__ == '__main__':
app = create_app()
socketio.run(app, host='0.0.0.0', port=9000, debug=True)