Source code for qfa.cli.migrate

"""Run ``alembic upgrade head`` under a Postgres advisory lock.

Invoked from ``entrypoint.sh`` before uvicorn binds the port, and from
``make migrate`` in dev. Multi-replica safe: the session-scoped advisory
lock serialises concurrent migrators so non-winners wait for the winner
to finish before proceeding.

Run from the project root: Alembic resolves ``./alembic.ini`` from the
current working directory.
"""

from __future__ import annotations

import asyncio
import logging
import sys

from alembic.config import Config
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine

from alembic import command
from qfa.adapters.db import create_async_engine_from_settings
from qfa.settings import DatabaseSettings

logger = logging.getLogger(__name__)

LOCK_KEY: int = 7424901234567890
"""Stable 64-bit integer key for the migration advisory lock."""


def _alembic_upgrade_head(sync_connection):  # noqa: ANN001
    """Run Alembic upgrade using an existing SQLAlchemy sync connection."""
    config = Config("alembic.ini")
    config.attributes["connection"] = sync_connection
    command.upgrade(config, "head")


[docs] async def run_migrations(db: DatabaseSettings | str) -> None: """Run ``alembic upgrade head`` under an advisory lock. The lock is session-scoped: it is released automatically when the holding connection closes, so a crashed migrator cannot leave the keyspace permanently held. Parameters ---------- db : DatabaseSettings | str Either full DB settings (preferred, supports Entra token auth) or an explicit SQLAlchemy URL (used by integration tests). """ if isinstance(db, DatabaseSettings): engine = create_async_engine_from_settings(db) else: engine = create_async_engine(db) try: autocommit_engine = engine.execution_options(isolation_level="AUTOCOMMIT") async with autocommit_engine.connect() as conn: await conn.execute(text("SELECT pg_advisory_lock(:k)"), {"k": LOCK_KEY}) try: logger.info("Running alembic upgrade head") await conn.run_sync(_alembic_upgrade_head) finally: await conn.execute( text("SELECT pg_advisory_unlock(:k)"), {"k": LOCK_KEY} ) finally: await engine.dispose()
[docs] def main() -> int: """CLI entry point. Returns 0 on success (including the no-op case when usage tracking is disabled). """ settings = DatabaseSettings() if not settings.track_usage: logger.info("DB_TRACK_USAGE is false; skipping migrations") return 0 asyncio.run(run_migrations(settings)) return 0
if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s" ) try: sys.exit(main()) except Exception: logger.exception("Migration run failed") sys.exit(1)