Source code for qfa.api.routes

"""API route handlers for the feedback analysis backend."""

from datetime import UTC, datetime, timedelta

from fastapi import APIRouter, Depends, Request

import qfa
from qfa.api.dependencies import authenticate_request, get_orchestrator
from qfa.api.schemas import (
    ApiAggregateSummary,
    ApiAnalyzeRequest,
    ApiAnalyzeResponse,
    ApiAssignCodesRequest,
    ApiAssignCodesResponse,
    ApiAssignedCode,
    ApiCodedFeedbackRecord,
    ApiFeedbackRecordSummary,
    ApiHealthResponse,
    ApiSummarizeAggregateResponse,
    ApiSummarizeFeedbackMetadata,
    ApiSummarizeRequest,
    ApiSummarizeResponse,
)
from qfa.domain.models import (
    AnalysisRequestModel,
    CodingAssignmentRequestModel,
    FeedbackRecordModel,
    TenantApiKey,
)
from qfa.domain.models import (
    SummaryRequestModel as DomainSummaryRequest,
)
from qfa.services.orchestrator import Orchestrator

router = APIRouter()


def _summarize_metadata_to_domain(
    meta: ApiSummarizeFeedbackMetadata,
) -> dict[str, str | int | float | bool]:
    """Flatten summarize metadata into the domain feedback metadata dict."""
    return {
        "created": meta.model_dump(mode="json")["created"],
        "feedback_record_id": meta.feedback_record_id,
        "coding_level_1": meta.coding_level_1,
        "coding_level_2": meta.coding_level_2,
        "coding_level_3": meta.coding_level_3,
    }


[docs] @router.post("/v1/analyze", response_model=ApiAnalyzeResponse, status_code=200) async def analyze( body: ApiAnalyzeRequest, request: Request, tenant: TenantApiKey = Depends(authenticate_request), orchestrator: Orchestrator = Depends(get_orchestrator), ) -> ApiAnalyzeResponse: """Analyze a batch of feedback records. Parameters ---------- body : AnalyzeRequest The request body containing feedback records and prompt. request : Request The incoming HTTP request. tenant : TenantApiKey The authenticated tenant, injected via dependency. orchestrator : Orchestrator The orchestrator service, injected via dependency. Returns ------- AnalyzeResponse The analysis result with feedback record count and request ID. """ deadline = datetime.now(UTC) + timedelta(seconds=120) domain_feedback_records = tuple( FeedbackRecordModel(id=doc.id, text=doc.text, metadata=doc.metadata) for doc in body.feedback_records ) domain_request = AnalysisRequestModel( feedback_records=domain_feedback_records, prompt=body.prompt, tenant_id=tenant.tenant_id, ) result = await orchestrator.analyze( domain_request, deadline, anonymize=body.anonymize ) return ApiAnalyzeResponse( analysis=result.result, feedback_record_count=len(body.feedback_records), request_id=request.state.request_id, used_anonymization=body.anonymize, )
[docs] @router.post("/v1/summarize", response_model=ApiSummarizeResponse, status_code=200) async def summarize( body: ApiSummarizeRequest, request: Request, tenant: TenantApiKey = Depends(authenticate_request), orchestrator: Orchestrator = Depends(get_orchestrator), ) -> ApiSummarizeResponse: """Summarize each submitted feedback record individually. Parameters ---------- body : SummarizeRequest The request body containing feedback records and summarization options. request : Request The incoming HTTP request. tenant : TenantApiKey The authenticated tenant, injected via dependency. orchestrator : Orchestrator The orchestrator service, injected via dependency. Returns ------- SummarizeResponse The per-feedback-record titles and summaries. """ deadline = datetime.now(UTC) + timedelta(seconds=120) feedback_records = tuple( FeedbackRecordModel( id=record.id, text=record.content, metadata=_summarize_metadata_to_domain(record.metadata), ) for record in body.feedback_records ) domain_request = DomainSummaryRequest( feedback_records=feedback_records, output_language=body.output_language, prompt=body.prompt, tenant_id=tenant.tenant_id, ) result = await orchestrator.summarize( domain_request, deadline, anonymize=body.anonymize, ) return ApiSummarizeResponse( summaries=[ ApiFeedbackRecordSummary( id=summary.id, title=summary.title, summary=summary.summary, quality_score=summary.quality_score, ) for summary in result.feedback_record_summaries ], used_anonymization=body.anonymize, )
[docs] @router.post("/v1/assign_codes", response_model=ApiAssignCodesResponse, status_code=200) async def assign_codes( body: ApiAssignCodesRequest, tenant: TenantApiKey = Depends(authenticate_request), orchestrator: Orchestrator = Depends(get_orchestrator), ) -> ApiAssignCodesResponse: """Assign codes via iterative LLM picks at each level of the framework.""" deadline = datetime.now(UTC) + timedelta(seconds=120) domain_feedback_records = tuple( FeedbackRecordModel(id=record.id, text=record.content, metadata={}) for record in body.feedback_records ) domain_request = CodingAssignmentRequestModel( feedback_records=domain_feedback_records, coding_framework=body.coding_framework, max_codes=body.max_codes, confidence_threshold=body.confidence_threshold, tenant_id=tenant.tenant_id, ) result = await orchestrator.assign_codes( domain_request, deadline, anonymize=body.anonymize ) return ApiAssignCodesResponse( coded_feedback_records=[ ApiCodedFeedbackRecord( feedback_record_id=coded.feedback_record_id, assigned_codes=[ ApiAssignedCode( code_id=assigned.code_id, code_label=assigned.code_label, confidence_type=assigned.confidence_type, confidence_category=assigned.confidence_category, confidence_code=assigned.confidence_code, confidence_aggregate=assigned.confidence_aggregate, explanation=assigned.explanation, ) for assigned in coded.assigned_codes ], ) for coded in result.coded_feedback_records ], )
[docs] @router.post( "/v1/summarize-aggregate", response_model=ApiSummarizeAggregateResponse, status_code=200, ) async def summarize_aggregate( body: ApiSummarizeRequest, request: Request, tenant: TenantApiKey = Depends(authenticate_request), orchestrator: Orchestrator = Depends(get_orchestrator), ) -> ApiSummarizeAggregateResponse: """Summarize all submitted feedback records as a single aggregate summary. Parameters ---------- body : SummarizeRequest The request body containing feedback records and summarization options. request : Request The incoming HTTP request. tenant : TenantApiKey The authenticated tenant, injected via dependency. orchestrator : Orchestrator The orchestrator service, injected via dependency. Returns ------- SummarizeAggregateResponse A single summary with themes ordered by frequency across all feedback records. """ deadline = datetime.now(UTC) + timedelta(seconds=120) feedback_records = tuple( FeedbackRecordModel( id=record.id, text=record.content, metadata=_summarize_metadata_to_domain(record.metadata), ) for record in body.feedback_records ) domain_request = DomainSummaryRequest( feedback_records=feedback_records, output_language=body.output_language, prompt=body.prompt, tenant_id=tenant.tenant_id, ) result = await orchestrator.summarize_aggregate( domain_request, deadline, anonymize=body.anonymize ) return ApiSummarizeAggregateResponse( summary=ApiAggregateSummary( ids=list(result.ids), title=result.title, summary=result.summary, quality_score=result.quality_score, ) )
[docs] @router.get("/v1/health", response_model=ApiHealthResponse, status_code=200) async def health() -> ApiHealthResponse: """Return service health status. Returns ------- HealthResponse Health status and package version. """ return ApiHealthResponse( status="ok", version=qfa.__version__, )