Source code for qfa.api.dependencies
"""FastAPI dependency functions for authentication and service injection."""
from fastapi import Depends, Request, Security
from fastapi.exceptions import HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from qfa.auth import validate_api_key
from qfa.domain.errors import AuthenticationError, AuthorizationError
from qfa.domain.models import TenantApiKey
from qfa.domain.ports import UsageRepositoryPort
from qfa.services.orchestrator import Orchestrator
[docs]
def get_orchestrator(request: Request) -> Orchestrator:
"""Return the orchestrator from app state.
Parameters
----------
request : Request
The incoming HTTP request.
Returns
-------
Orchestrator
The orchestrator service instance.
"""
return request.app.state.orchestrator
[docs]
def get_usage_repo(request: Request) -> UsageRepositoryPort:
"""Return the usage repository from app state, or raise 503 if disabled.
Parameters
----------
request : Request
The incoming HTTP request.
Returns
-------
UsageRepositoryPort
The usage repository instance.
Raises
------
HTTPException
503 if usage tracking is not enabled.
"""
repo = getattr(request.app.state, "usage_repo", None)
if repo is None:
raise HTTPException(
status_code=503,
detail={
"code": "usage_tracking_disabled",
"message": "Usage tracking is not enabled",
},
)
return repo
[docs]
async def authenticate_request(
request: Request,
credentials: HTTPAuthorizationCredentials = Security(HTTPBearer(auto_error=False)),
) -> TenantApiKey:
"""Validate a Bearer token from the Authorization header.
Parameters
----------
request : Request
The incoming HTTP request.
credentials : HTTPAuthorizationCredentials
The parsed Authorization header credentials.
Returns
-------
TenantApiKey
The authenticated tenant API key.
Raises
------
AuthenticationError
If the credentials are missing or invalid.
"""
error_message = (
"A valid API key is required. Provide it as: Authorization: Bearer <key>"
)
if credentials is None:
raise AuthenticationError(error_message)
try:
return validate_api_key(credentials.credentials, request.app.state.api_keys)
except AuthenticationError:
raise AuthenticationError(error_message)
[docs]
def require_superuser(
tenant: TenantApiKey = Depends(authenticate_request),
) -> TenantApiKey:
"""FastAPI dependency that authenticates and checks superuser status.
Parameters
----------
tenant : TenantApiKey
The authenticated tenant (injected by ``authenticate_request``).
Returns
-------
TenantApiKey
The authenticated superuser tenant.
Raises
------
AuthorizationError
If the tenant is not a superuser.
"""
if not tenant.is_superuser:
raise AuthorizationError("Superuser access required")
return tenant