42 lines
1.4 KiB
Python
42 lines
1.4 KiB
Python
import jwt
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from datetime import datetime, timedelta
|
|
from flask import current_app, request, jsonify
|
|
|
|
def hash_password(password):
|
|
return generate_password_hash(password)
|
|
|
|
def verify_password(password, password_hash):
|
|
return check_password_hash(password_hash, password)
|
|
|
|
def create_jwt_token(user):
|
|
payload = {
|
|
'user_id': user.id,
|
|
'role': user.role,
|
|
'exp': datetime.utcnow() + timedelta(days=1)
|
|
}
|
|
token = jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
|
|
return token
|
|
|
|
def decode_jwt_token(token):
|
|
try:
|
|
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
|
|
return {'user_id': payload['user_id'], 'role': payload['role']}
|
|
except jwt.ExpiredSignatureError:
|
|
return None
|
|
except jwt.InvalidTokenError:
|
|
return None
|
|
|
|
def admin_required(f):
|
|
def wrapper(*args, **kwargs):
|
|
auth = request.headers.get('Authorization', None)
|
|
if not auth or not auth.startswith('Bearer '):
|
|
return jsonify({'msg': 'Missing or invalid token'}), 401
|
|
token = auth.split(' ')[1]
|
|
user_data = decode_jwt_token(token)
|
|
if not user_data or user_data.get('role') != 'admin':
|
|
return jsonify({'msg': 'Admin access required'}), 403
|
|
return f(*args, **kwargs)
|
|
wrapper.__name__ = f.__name__
|
|
return wrapper
|