Files
aiagent/saars/backend/app/api/auth.py
2026-03-07 09:01:00 +08:00

86 lines
2.7 KiB
Python

"""
User authentication API: login, register, JWT refresh, OAuth 2.0 placeholder.
"""
from flask import Blueprint, request, jsonify
from flask_jwt_extended import (
create_access_token,
create_refresh_token,
jwt_required,
get_jwt_identity,
get_jwt,
)
from app import db
from app.models.user import User, Role
from app.utils.auth import hash_password, check_password
auth_bp = Blueprint("auth", __name__)
@auth_bp.route("/register", methods=["POST"])
def register():
data = request.get_json() or {}
email = (data.get("email") or "").strip()
username = (data.get("username") or "").strip()
password = data.get("password")
if not email or not username or not password:
return jsonify({"error": "email, username and password are required"}), 400
if User.query.filter_by(email=email).first():
return jsonify({"error": "Email already registered"}), 409
if User.query.filter_by(username=username).first():
return jsonify({"error": "Username already taken"}), 409
user = User(
email=email,
username=username,
password_hash=hash_password(password),
)
db.session.add(user)
db.session.commit()
access = create_access_token(identity=user.id)
refresh = create_refresh_token(identity=user.id)
return jsonify({
"user": user.to_dict(),
"access_token": access,
"refresh_token": refresh,
"token_type": "bearer",
}), 201
@auth_bp.route("/login", methods=["POST"])
def login():
data = request.get_json() or {}
email = (data.get("email") or "").strip()
password = data.get("password")
if not email or not password:
return jsonify({"error": "email and password are required"}), 400
user = User.query.filter_by(email=email).first()
if not user or not check_password(password, user.password_hash):
return jsonify({"error": "Invalid credentials"}), 401
if not user.is_active:
return jsonify({"error": "Account disabled"}), 403
access = create_access_token(identity=user.id)
refresh = create_refresh_token(identity=user.id)
return jsonify({
"user": user.to_dict(),
"access_token": access,
"refresh_token": refresh,
"token_type": "bearer",
})
@auth_bp.route("/refresh", methods=["POST"])
@jwt_required(refresh=True)
def refresh():
identity = get_jwt_identity()
access = create_access_token(identity=identity)
return jsonify({"access_token": access, "token_type": "bearer"})
@auth_bp.route("/me", methods=["GET"])
@jwt_required()
def me():
uid = get_jwt_identity()
user = User.query.get(uid)
if not user:
return jsonify({"error": "User not found"}), 404
return jsonify(user.to_dict())