92 lines
2.9 KiB
Python
92 lines
2.9 KiB
Python
"""
|
|
User authentication API: login, register, JWT refresh, OAuth 2.0 placeholder.
|
|
"""
|
|
from flask import Blueprint, request, jsonify
|
|
from flask_jwt_extended import (
|
|
create_access_token,
|
|
create_refresh_token,
|
|
jwt_required,
|
|
get_jwt_identity,
|
|
get_jwt,
|
|
)
|
|
from app import db
|
|
from app.models.user import User, Role
|
|
from app.utils.auth import hash_password, check_password
|
|
|
|
auth_bp = Blueprint("auth", __name__)
|
|
|
|
|
|
@auth_bp.route("/register", methods=["POST"])
|
|
def register():
|
|
data = request.get_json() or {}
|
|
email = (data.get("email") or "").strip()
|
|
username = (data.get("username") or "").strip()
|
|
password = data.get("password")
|
|
if not email or not username or not password:
|
|
return jsonify({"error": "email, username and password are required"}), 400
|
|
if User.query.filter_by(email=email).first():
|
|
return jsonify({"error": "Email already registered"}), 409
|
|
if User.query.filter_by(username=username).first():
|
|
return jsonify({"error": "Username already taken"}), 409
|
|
user = User(
|
|
email=email,
|
|
username=username,
|
|
password_hash=hash_password(password),
|
|
)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
access = create_access_token(identity=user.id)
|
|
refresh = create_refresh_token(identity=user.id)
|
|
return jsonify({
|
|
"user": user.to_dict(),
|
|
"access_token": access,
|
|
"refresh_token": refresh,
|
|
"token_type": "bearer",
|
|
}), 201
|
|
|
|
|
|
@auth_bp.route("/login", methods=["POST"])
|
|
def login():
|
|
data = request.get_json() or {}
|
|
email = (data.get("email") or "").strip()
|
|
username = (data.get("username") or "").strip()
|
|
password = data.get("password")
|
|
if not password:
|
|
return jsonify({"error": "password is required"}), 400
|
|
if email:
|
|
user = User.query.filter_by(email=email).first()
|
|
elif username:
|
|
user = User.query.filter_by(username=username).first()
|
|
else:
|
|
return jsonify({"error": "email or username is required"}), 400
|
|
if not user or not check_password(password, user.password_hash):
|
|
return jsonify({"error": "Invalid credentials"}), 401
|
|
if not user.is_active:
|
|
return jsonify({"error": "Account disabled"}), 403
|
|
access = create_access_token(identity=user.id)
|
|
refresh = create_refresh_token(identity=user.id)
|
|
return jsonify({
|
|
"user": user.to_dict(),
|
|
"access_token": access,
|
|
"refresh_token": refresh,
|
|
"token_type": "bearer",
|
|
})
|
|
|
|
|
|
@auth_bp.route("/refresh", methods=["POST"])
|
|
@jwt_required(refresh=True)
|
|
def refresh():
|
|
identity = get_jwt_identity()
|
|
access = create_access_token(identity=identity)
|
|
return jsonify({"access_token": access, "token_type": "bearer"})
|
|
|
|
|
|
@auth_bp.route("/me", methods=["GET"])
|
|
@jwt_required()
|
|
def me():
|
|
uid = get_jwt_identity()
|
|
user = User.query.get(uid)
|
|
if not user:
|
|
return jsonify({"error": "User not found"}), 404
|
|
return jsonify(user.to_dict())
|