Source code for mdvtools.auth.register_auth_routes
import os
from mdvtools.logging_config import get_logger
[docs]
logger = get_logger(__name__)
[docs]
def register_auth_routes(app):
from flask import jsonify, request, redirect, url_for, session
from mdvtools.auth.authutils import get_auth_provider
"""
Registers the Auth0 routes like login, callback, logout, etc. to the Flask app,
with centralized and route-specific error handling.
"""
logger.info("Registering AUTH routes...")
try:
# Route for login (redirects to Auth0 for authentication)
@app.route('/login')
def login():
"""Handles standard login (e.g., Auth0)."""
try:
logger.info("Login initiated")
session.clear()
session["auth_method"] = app.config.get("DEFAULT_AUTH_METHOD", "dummy").lower()
session.modified = True
return get_auth_provider().login()
except Exception as e:
session.clear()
logger.exception(f"In register_auth_routes : Error during login: {e}")
return jsonify({"error": "Failed to start login process."}), 500
# Route for the callback after login (handles the callback from Auth0)
@app.route('/callback')
def callback():
try:
logger.info("Callback route hit")
code = request.args.get('code')
# If this is a code-based provider like Auth0, validate presence of 'code'
if not code and session.get("auth_method", "").lower() == "auth0":
logger.error("Missing 'code' parameter in the callback URL for Auth0.")
session.clear()
return jsonify({"error": "Authorization code not provided."}), 400
auth_provider = get_auth_provider()
access_token = auth_provider.handle_callback()
if not access_token:
logger.error("Authentication failed: No valid token received.")
session.clear()
return jsonify({"error": "Authentication failed."}), 401
logger.info("Authentication successful. Redirecting based on configuration.")
# Check for MDV_API_ROOT environment variable
mdv_api_root = os.getenv('MDV_API_ROOT')
if mdv_api_root:
logger.info(f"Redirecting to MDV_API_ROOT: {mdv_api_root}")
return redirect(mdv_api_root)
else:
logger.info("MDV_API_ROOT not set, redirecting to index")
return redirect(url_for("index"))
except Exception as e:
logger.exception(f"In register_auth_routes : Error during callback: {e}")
session.clear() # Clear session on error
return jsonify({"error": "Failed to complete authentication process."}), 500
# Route for logout (clears the session and redirects to home)
@app.route('/logout')
def logout():
try:
auth_method = session.get("auth_method")
logger.info(f"Logout initiated for auth method: {auth_method}")
# Always attempt to clear the session early
session.clear()
session.modified = True
# Get the appropriate provider if method is set
if auth_method:
auth_provider = get_auth_provider()
return auth_provider.logout()
# Fallback for unknown or missing auth method
logger.warning("No auth method found in session during logout.")
return jsonify({"message": "Session cleared, but auth method unknown."}), 200
except Exception as e:
logger.exception(f"In register_auth_routes: Error during logout: {e}")
session.clear()
return jsonify({"error": "Failed to log out."}), 500
@app.route('/login_sso')
def login_sso():
"""Redirect user to Shibboleth-protected login page on Apache."""
try:
logger.info("Shibboleth SSO login initiated")
session.clear() # Clear previous session
session["auth_method"] = "shibboleth"
session.modified = True
return get_auth_provider().login()
except Exception as e:
# In case of error, clear the session and handle the error
session.clear() # Ensure session is cleared in case of failure
logger.exception(f"In register_auth_routes: Error during login SSO: {e}")
return jsonify({"error": "Failed to start login process using SSO."}), 500
# You can also add a sample route to check the user's profile or token
@app.route('/profile')
def profile():
try:
# Get the appropriate authentication provider based on session
provider = get_auth_provider()
# If the provider is Auth0, we need to pass the token (which should be in the session)
token = session.get("token")
# Get the user information
if token:
# If token is available, use it for Auth0
user_info = provider.get_user(token=token)
else:
# If no token (e.g., using Shibboleth), just call get_user without token
user_info = provider.get_user()
if not user_info:
return jsonify({"error": "User profile could not be retrieved."}), 404
# Return the user profile in a JSON response
return jsonify(user_info), 200
except Exception as e:
logger.exception(f"Error during profile retrieval: {e}")
return jsonify({"error": "Failed to retrieve user profile."}), 500
logger.info("Auth routes registered successfully!")
except Exception as e:
logger.exception(f"Error registering AUTH routes: {e}")
raise