import jwt from werkzeug.security import generate_password_hash, check_password_hash from datetime import datetime, timedelta from flask import current_app, request, jsonify from functools import wraps from app.db import get_redis import json def hash_password(password): return generate_password_hash(password) def verify_password(password, password_hash): return check_password_hash(password_hash, password) def create_jwt_token(user): payload = { 'user_id': user.id, 'role': user.role, 'exp': datetime.utcnow() + timedelta(days=7) } token = jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256') # 缓存token r = get_redis() r.setex(f"token:{token}", int(timedelta(days=1).total_seconds()), "1") return token def decode_jwt_token(token): try: # 先检查Redis缓存 r = get_redis() if not r.exists(f"token:{token}"): return None payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256']) return {'user_id': payload['user_id'], 'role': payload['role']} except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None def admin_required(f): @wraps(f) def wrapper(*args, **kwargs): auth = request.headers.get('Authorization', None) if not auth or not auth.startswith('Bearer '): return jsonify({'msg': 'Missing or invalid token'}), 401 token = auth.split(' ')[1] user_data = decode_jwt_token(token) if not user_data or user_data.get('role') != 'admin': return jsonify({'msg': 'Admin access required'}), 403 return f(*args, **kwargs) return wrapper def cache_response(ttl=30, key_prefix="cache"): """缓存响应装饰器""" def decorator(f): @wraps(f) def wrapper(*args, **kwargs): r = get_redis() cache_key = f"{key_prefix}:{request.path}" # 尝试从缓存获取 cached_data = r.get(cache_key) if cached_data: return jsonify(json.loads(cached_data)) # 执行函数并缓存结果 response = f(*args, **kwargs) if response.status_code == 200: r.setex(cache_key, ttl, json.dumps(response.json)) return response return wrapper return decorator def invalidate_cache(key_prefix): """使缓存失效""" r = get_redis() keys = r.keys(f"{key_prefix}:*") if keys: r.delete(*keys)