"""Decorators for Flask route authentication and authorization.
This module provides decorators for protecting Flask routes with JWT-based authentication
and authorization. Supports scope-based, role-based, permission-based, and object-level
access control with configurable claim names for different authorization servers.
For complete configuration documentation, see the Configuration section in the API reference.
"""
from functools import wraps
from typing import Callable, Optional
from axioms_core import (
AxiomsError,
check_permissions,
check_roles,
check_scopes,
check_token_validity,
get_claim_from_token,
get_expected_issuer,
get_key_from_jwks_json,
validate_token_header,
)
from axioms_core.config import get_config_value
from flask import current_app as app
from flask import g, request
from .config import get_config
[docs]
def has_required_scopes(*required_scopes, safe_methods=None):
"""Decorator to enforce scope-based authorization.
Checks if the authenticated user's token contains any of the required scopes.
Uses OR logic: the token must have at least ONE of the specified scopes.
To require ALL scopes (AND logic), chain multiple decorators.
Args:
*required_scopes: Variable length list of required scope strings.
safe_methods: List of HTTP methods that bypass authorization
(default: from config AXIOMS_SAFE_METHODS or ['OPTIONS']).
Returns:
Callable: Decorated function that enforces scope check.
Raises:
AxiomsError: If token is missing or doesn't contain required scopes.
Example:
OR logic - requires EITHER scope::
@app.route('/api/resource')
@has_required_scopes('read:resource', 'write:resource')
def protected_route():
return {'data': 'protected'}
Example:
AND logic - requires BOTH scopes via chaining::
@app.route('/api/strict')
@has_required_scopes('read:resource')
@has_required_scopes('write:resource')
def strict_route():
return {'data': 'requires both scopes'}
"""
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
# Get config
config = get_config()
# Skip authorization for safe methods (e.g., OPTIONS for CORS)
default_safe_methods = get_config_value(
config, "AXIOMS_SAFE_METHODS", ["OPTIONS"]
)
methods = safe_methods if safe_methods is not None else default_safe_methods
if request.method in methods:
return fn(*args, **kwargs)
payload = getattr(g, "auth_jwt", None)
if payload is None or payload is False:
raise AxiomsError(
{
"error": "invalid_token",
"error_description": "Invalid Authorization Token",
},
401,
)
# Get scope from configured claim names
token_scope = get_claim_from_token(payload, "SCOPE", config) or ""
# Convert required_scopes to list if needed
required = (
list(required_scopes[0])
if isinstance(required_scopes[0], (list, tuple))
else list(required_scopes)
)
if check_scopes(token_scope, required):
return fn(*args, **kwargs)
raise AxiomsError(
{
"error": "insufficient_scope",
"error_description": "Insufficient role, scope or permission",
},
403,
)
return wrapper
return decorator
[docs]
def has_required_roles(*view_roles, safe_methods=None):
"""Decorator to enforce role-based authorization.
Checks if the authenticated user's token contains any of the required roles.
Uses OR logic: the token must have at least ONE of the specified roles.
To require ALL roles (AND logic), chain multiple decorators.
Args:
*view_roles: Variable length list of required role strings.
safe_methods: List of HTTP methods that bypass authorization
(default: from config AXIOMS_SAFE_METHODS or ['OPTIONS']).
Returns:
Callable: Decorated function that enforces role check.
Raises:
AxiomsError: If token is missing or doesn't contain required roles.
Example:
OR logic - requires EITHER role::
@app.route('/admin/users')
@has_required_roles('admin', 'superuser')
def admin_route():
return {'users': []}
Example:
AND logic - requires BOTH roles via chaining::
@app.route('/admin/critical')
@has_required_roles('admin')
@has_required_roles('superuser')
def critical_route():
return {'message': 'requires both roles'}
"""
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
# Get config
config = get_config()
# Skip authorization for safe methods (e.g., OPTIONS for CORS)
default_safe_methods = get_config_value(
config, "AXIOMS_SAFE_METHODS", ["OPTIONS"]
)
methods = safe_methods if safe_methods is not None else default_safe_methods
if request.method in methods:
return fn(*args, **kwargs)
payload = getattr(g, "auth_jwt", None)
if payload is None or payload is False:
raise AxiomsError(
{
"error": "invalid_token",
"error_description": "Invalid Authorization Token",
},
401,
)
# Get roles from configured claim names
token_roles = get_claim_from_token(payload, "ROLES", config) or []
# Convert view_roles to list if needed
required = (
list(view_roles[0])
if isinstance(view_roles[0], (list, tuple))
else list(view_roles)
)
if check_roles(token_roles, required):
return fn(*args, **kwargs)
raise AxiomsError(
{
"error": "insufficient_scope",
"error_description": "Insufficient role, scope or permission",
},
403,
)
return wrapper
return decorator
[docs]
def has_required_permissions(*view_permissions, safe_methods=None):
"""Decorator to enforce permission-based authorization.
Checks if the authenticated user's token contains any of the required permissions.
Uses OR logic: the token must have at least ONE of the specified permissions.
To require ALL permissions (AND logic), chain multiple decorators.
Args:
*view_permissions: Variable length list of required permission strings.
safe_methods: List of HTTP methods that bypass authorization
(default: from config AXIOMS_SAFE_METHODS or ['OPTIONS']).
Returns:
Callable: Decorated function that enforces permission check.
Raises:
AxiomsError: If token is missing or doesn't contain required permissions.
Example:
OR logic - requires EITHER permission::
@app.route('/api/resource')
@has_required_permissions('resource:read', 'resource:write')
def resource_route():
return {'data': 'success'}
Example:
AND logic - requires BOTH permissions via chaining::
@app.route('/api/critical')
@has_required_permissions('resource:read')
@has_required_permissions('resource:admin')
def critical_route():
return {'message': 'requires both permissions'}
"""
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
# Get config
config = get_config()
# Skip authorization for safe methods (e.g., OPTIONS for CORS)
default_safe_methods = get_config_value(
config, "AXIOMS_SAFE_METHODS", ["OPTIONS"]
)
methods = safe_methods if safe_methods is not None else default_safe_methods
if request.method in methods:
return fn(*args, **kwargs)
payload = getattr(g, "auth_jwt", None)
if payload is None or payload is False:
raise AxiomsError(
{
"error": "invalid_token",
"error_description": "Invalid Authorization Token",
},
401,
)
# Get permissions from configured claim names
token_permissions = (
get_claim_from_token(payload, "PERMISSIONS", config) or []
)
# Convert view_permissions to list if needed
required = (
list(view_permissions[0])
if isinstance(view_permissions[0], (list, tuple))
else list(view_permissions)
)
if check_permissions(token_permissions, required):
return fn(*args, **kwargs)
raise AxiomsError(
{
"error": "insufficient_scope",
"error_description": "Insufficient role, scope or permission",
},
403,
)
return wrapper
return decorator
def _has_bearer_token(request_obj):
"""Extract and validate bearer token from request Authorization header.
Flask-specific wrapper for token extraction from request object.
Args:
request_obj: Flask request object containing HTTP headers.
Returns:
str: The extracted bearer token.
Raises:
AxiomsError: If Authorization header is missing, invalid, or malformed.
"""
header_name = "Authorization"
token_prefix = "bearer"
auth_header = request_obj.headers.get(header_name, None)
if auth_header is None:
raise AxiomsError(
{
"error": "invalid_token",
"error_description": "Missing Authorization Header",
},
401,
)
try:
bearer, _, token = auth_header.partition(" ")
if bearer.lower() == token_prefix and token != "":
return token
else:
raise AxiomsError(
{
"error": "invalid_token",
"error_description": "Invalid Authorization Bearer",
},
401,
)
except (ValueError, AttributeError):
raise AxiomsError(
{
"error": "invalid_token",
"error_description": "Invalid Authorization Header",
},
401,
)
def _has_valid_token(token):
"""Validate JWT token and verify audience and issuer claims.
Flask-specific wrapper that validates token using axioms-core-py functions
and stores the payload in g.auth_jwt.
Args:
token: JWT token string to validate.
Returns:
bool: True if token is valid and audience matches.
Raises:
AxiomsError: If token is invalid, audience doesn't match, or issuer doesn't match.
"""
# Get configuration
config = get_config()
# Validate token header (alg, kid, typ) - raises AxiomsError on failure
header = validate_token_header(token)
# Get key from JWKS (uses fallback if manager not initialized)
kid = header.get("kid")
key = get_key_from_jwks_json(kid, config)
# Get expected audience and issuer
audience = (
config.get("AXIOMS_AUDIENCE")
if hasattr(config, "get")
else getattr(config, "AXIOMS_AUDIENCE", None)
)
expected_issuer = get_expected_issuer(config)
# Validate token using core function (raises AxiomsError on failure)
payload = check_token_validity(
token=token,
key=key,
alg=header.get("alg"),
audience=audience,
issuer=expected_issuer,
)
# Store payload in Flask g object for route access
g.auth_jwt = payload
return True
[docs]
def has_valid_access_token(fn=None, *, safe_methods=None):
"""Decorator to enforce JWT token authentication.
Validates the JWT access token in the Authorization header and sets
the token payload in g.auth_jwt for use in the route handler.
Required config:
- AXIOMS_AUDIENCE: The expected audience claim
- AXIOMS_JWKS_URL (or AXIOMS_DOMAIN): JWKS endpoint URL or domain
Args:
fn: The Flask route function to decorate.
safe_methods: List of HTTP methods that bypass authorization
(default: from config AXIOMS_SAFE_METHODS or ['OPTIONS']).
Returns:
Callable: Decorated function that enforces token validation.
Raises:
AxiomsError: If token is missing or invalid.
Exception: If required config is not set.
Example:
Has valid access token::
@app.route('/api/protected')
@has_valid_access_token
def protected_route():
user_id = g.auth_jwt.sub
return {'user_id': user_id}
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Get config
config = get_config()
# Skip authorization for safe methods (e.g., OPTIONS for CORS)
default_safe_methods = get_config_value(
config, "AXIOMS_SAFE_METHODS", ["OPTIONS"]
)
methods = safe_methods if safe_methods is not None else default_safe_methods
if request.method in methods:
return func(*args, **kwargs)
# Check AXIOMS_AUDIENCE
if "AXIOMS_AUDIENCE" not in app.config:
raise Exception(
"Please set AXIOMS_AUDIENCE in your config. "
"For more details review axioms-flask-py docs."
)
# Check for JWKS URL or domain
if (
"AXIOMS_JWKS_URL" not in app.config
and "AXIOMS_DOMAIN" not in app.config
):
raise Exception(
"Please set either AXIOMS_JWKS_URL or AXIOMS_DOMAIN in your config. "
"For more details review axioms-flask-py docs."
)
# Extract and validate token (raises AxiomsError on failure)
token = _has_bearer_token(request)
_has_valid_token(token)
return func(*args, **kwargs)
return wrapper
# Support both @has_valid_access_token and @has_valid_access_token()
if fn is None:
# Called with arguments: @has_valid_access_token()
return decorator
else:
# Called without arguments: @has_valid_access_token
return decorator(fn)
[docs]
def check_object_ownership(
get_object: Callable,
owner_field: str = "user",
claim_field: str = "sub",
inject_as: Optional[str] = None,
safe_methods: Optional[list] = None,
) -> Callable:
"""Decorator to enforce object-level permissions based on ownership.
Verifies that the authenticated user owns the requested object by comparing
a field on the object with a claim in the JWT token. Useful for implementing
row-level security where users can only access their own data.
Args:
get_object: Callable that fetches the object. Receives same arguments as decorated function.
Should raise 404 (abort(404)) if object not found.
owner_field: Attribute/key name on the object containing owner identifier (default: "user").
claim_field: Claim name in JWT token containing user identifier (default: "sub").
inject_as: Optional parameter name to inject the fetched object into route handler.
If None, object is not injected.
safe_methods: List of HTTP methods that bypass authorization
(default: from config AXIOMS_SAFE_METHODS or ['OPTIONS']).
Returns:
Decorator function that enforces ownership checking.
Raises:
AxiomsError (401): If user is not authenticated.
AxiomsError (400): If object is missing the owner_field.
AxiomsError (403): If JWT is missing claim_field OR ownership check fails.
404: If object not found (raised by get_object).
Example:
Basic usage with default fields::
def get_article(article_id):
article = Article.query.get(article_id)
if not article:
abort(404)
return article
@app.patch('/articles/<int:article_id>')
@check_object_ownership(get_article)
def update_article(article_id):
# Only owner can access
# article.user must match token.sub
article = get_article(article_id)
article.title = request.json['title']
db.session.commit()
return {'id': article.id}
With object injection::
@app.patch('/articles/<int:article_id>')
@check_object_ownership(get_article, inject_as='article')
def update_article(article_id, article):
# article is injected, no need to call get_article again
article.title = request.json['title']
db.session.commit()
return {'id': article.id}
Custom owner field::
def get_comment(comment_id):
comment = Comment.query.get(comment_id)
if not comment:
abort(404)
return comment
@app.patch('/comments/<int:comment_id>')
@check_object_ownership(get_comment, owner_field='created_by')
def update_comment(comment_id):
# comment.created_by must match token.sub
pass
Custom claim field (match by email)::
@app.patch('/projects/<int:project_id>')
@check_object_ownership(
get_project,
owner_field='owner_email',
claim_field='email'
)
def update_project(project_id):
# project.owner_email must match token.email
pass
Note:
- Works with SQLAlchemy/SQLModel objects (uses getattr)
- Works with dictionaries (uses .get())
- Requires middleware or @has_valid_access_token decorator to set g.auth_jwt
"""
def decorator(f: Callable) -> Callable:
@wraps(f)
def decorated_function(*args, **kwargs):
# Get config
config = get_config()
# Skip authorization for safe methods (e.g., OPTIONS for CORS)
default_safe_methods = get_config_value(
config, "AXIOMS_SAFE_METHODS", ["OPTIONS"]
)
methods = safe_methods if safe_methods is not None else default_safe_methods
if request.method in methods:
return f(*args, **kwargs)
# Check authentication
if not hasattr(g, "auth_jwt") or not g.auth_jwt:
raise AxiomsError(
{
"error": "invalid_token",
"error_description": "Authentication required",
},
401,
)
# Fetch the object
obj = get_object(*args, **kwargs)
# Extract owner value from object
if hasattr(obj, owner_field):
# SQLAlchemy/SQLModel object
owner_value = getattr(obj, owner_field)
elif isinstance(obj, dict) and owner_field in obj:
# Dictionary
owner_value = obj[owner_field]
else:
# Object missing owner field
raise AxiomsError(
{
"error": "invalid_request",
"error_description": f"Object does not have '{owner_field}' field",
},
400,
)
# Extract claim value from JWT
payload = g.auth_jwt
if hasattr(payload, claim_field):
# Box object (attribute access)
claim_value = getattr(payload, claim_field)
elif isinstance(payload, dict) and claim_field in payload:
# Dictionary
claim_value = payload[claim_field]
else:
# JWT missing claim field
raise AxiomsError(
{
"error": "insufficient_scope",
"error_description": f"Token does not have '{claim_field}' claim",
},
403,
)
# Check ownership
if str(owner_value) != str(claim_value):
raise AxiomsError(
{
"error": "insufficient_scope",
"error_description": "You do not have permission to access this resource",
},
403,
)
# Inject object into kwargs if requested
if inject_as:
kwargs[inject_as] = obj
return f(*args, **kwargs)
return decorated_function
return decorator
[docs]
def require_ownership(
owner_field: str = "user",
claim_field: str = "sub",
safe_methods: Optional[list] = None,
) -> Callable:
"""Decorator to check ownership of an object passed as function argument.
Simpler alternative to check_object_ownership when the object is already
fetched by the route handler. Checks ownership of the object passed as
the first positional argument.
Args:
owner_field: Attribute/key name on object containing owner ID (default: "user").
claim_field: Claim name in JWT containing user ID (default: "sub").
safe_methods: List of HTTP methods that bypass authorization
(default: from config AXIOMS_SAFE_METHODS or ['OPTIONS']).
Returns:
Decorator function.
Example:
Basic usage::
@app.patch('/articles/<int:article_id>')
@require_ownership(owner_field='user')
def update_article(article_id):
article = Article.query.get_or_404(article_id)
# Ownership is checked against article
article.title = request.json['title']
db.session.commit()
return {'id': article.id}
"""
def decorator(f: Callable) -> Callable:
@wraps(f)
def decorated_function(*args, **kwargs):
# Get config
config = get_config()
# Skip authorization for safe methods (e.g., OPTIONS for CORS)
default_safe_methods = get_config_value(
config, "AXIOMS_SAFE_METHODS", ["OPTIONS"]
)
methods = safe_methods if safe_methods is not None else default_safe_methods
if request.method in methods:
return f(*args, **kwargs)
# Check authentication
if not hasattr(g, "auth_jwt") or not g.auth_jwt:
raise AxiomsError(
{
"error": "invalid_token",
"error_description": "Authentication required",
},
401,
)
# Object should be first positional argument
if not args:
raise AxiomsError(
{
"error": "invalid_request",
"error_description": "No object provided to check ownership",
},
400,
)
obj = args[0]
# Extract owner value
if hasattr(obj, owner_field):
owner_value = getattr(obj, owner_field)
elif isinstance(obj, dict) and owner_field in obj:
owner_value = obj[owner_field]
else:
raise AxiomsError(
{
"error": "invalid_request",
"error_description": f"Object does not have '{owner_field}' field",
},
400,
)
# Extract claim value
payload = g.auth_jwt
if hasattr(payload, claim_field):
claim_value = getattr(payload, claim_field)
elif isinstance(payload, dict) and claim_field in payload:
claim_value = payload[claim_field]
else:
raise AxiomsError(
{
"error": "insufficient_scope",
"error_description": f"Token does not have '{claim_field}' claim",
},
403,
)
# Check ownership
if str(owner_value) != str(claim_value):
raise AxiomsError(
{
"error": "insufficient_scope",
"error_description": "You do not have permission to access this resource",
},
403,
)
return f(*args, **kwargs)
return decorated_function
return decorator