Skip to content

papi.core.db

get_redis_client() async

Lazily initialize and return a singleton Redis client.

Uses the Redis URI configured in the application settings.

Returns:

Name Type Description
Redis Redis | None

An asyncio-compatible Redis client instance.

Source code in papi/core/db/redis.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async def get_redis_client() -> Redis | None:
    """
    Lazily initialize and return a singleton Redis client.

    Uses the Redis URI configured in the application settings.

    Returns:
        Redis: An asyncio-compatible Redis client instance.
    """
    global _redis
    config = get_config()
    if _redis is None:
        if config.database and config.database.redis_uri:
            _redis = from_url(
                config.database.redis_uri,
                decode_responses=True,
            )
    return _redis

get_redis_uri_with_db(base_uri, db_index)

Construct a Redis URI with a specific database index.

Parameters:

Name Type Description Default
base_uri str

The base Redis URI, without the database path.

required
db_index int

The database index to use (e.g., 0, 1, 2...).

required

Returns:

Name Type Description
str str

The Redis URI with the database index set as the path.

Source code in papi/core/db/redis.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def get_redis_uri_with_db(base_uri: str, db_index: int) -> str:
    """
    Construct a Redis URI with a specific database index.

    Args:
        base_uri (str): The base Redis URI, without the database path.
        db_index (int): The database index to use (e.g., 0, 1, 2...).

    Returns:
        str: The Redis URI with the database index set as the path.
    """
    parsed = urlparse(base_uri)
    new_path = f"/{db_index}"
    return urlunparse(parsed._replace(path=new_path))

get_sql_session() async

Asynchronous context manager for database sessions with production-grade features.

Features: - Connection pooling with configurable size - Automatic transaction management - Error handling with rollback safety - Connection recycling - Timeout configurations - Comprehensive logging

Usage

async with get_sql_session() as session: result = await session.execute(select(User)) users = result.scalars().all()

Raises:

Type Description
RuntimeError

For configuration errors

SQLAlchemyError

For database operation errors

Yields:

Name Type Description
AsyncSession AsyncGenerator[AsyncSession, None]

Database session instance

Source code in papi/core/db/sql_session.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@asynccontextmanager
async def get_sql_session() -> AsyncGenerator[AsyncSession, None]:
    """
    Asynchronous context manager for database sessions with production-grade features.

    Features:
    - Connection pooling with configurable size
    - Automatic transaction management
    - Error handling with rollback safety
    - Connection recycling
    - Timeout configurations
    - Comprehensive logging

    Usage:
        async with get_sql_session() as session:
            result = await session.execute(select(User))
            users = result.scalars().all()

    Raises:
        RuntimeError: For configuration errors
        SQLAlchemyError: For database operation errors

    Yields:
        AsyncSession: Database session instance
    """
    config = get_config()
    sql_uri = config.database.sql_uri

    # Validate configuration
    if not sql_uri:
        log.critical("Database SQL_URI not configured")
        raise RuntimeError("Database configuration missing: SQL_URI not set")

    # Create engine with production-ready settings
    engine = create_async_engine(
        sql_uri,
        future=True,
        poolclass=AsyncAdaptedQueuePool,
        # pool_size=config.database.pool_size,
        # max_overflow=config.database.max_overflow,
        # pool_timeout=config.database.pool_timeout,
        # pool_recycle=config.database.pool_recycle,
        # echo=config.database.echo_sql,
        # connect_args={"timeout": config.database.connect_timeout},
    )

    # Configure session factory
    session_factory = async_sessionmaker(
        bind=engine, expire_on_commit=False, autoflush=False, class_=AsyncSession
    )

    # Session management
    session = session_factory()
    try:
        log.debug("Database session opened")
        yield session
        await session.commit()
        log.debug("Transaction committed successfully")
    except SQLAlchemyError as e:
        log.error(f"Database operation failed: {str(e)}")
        await session.rollback()
        log.warning("Transaction rolled back due to error")
        raise
    except Exception as e:
        log.critical(f"Unexpected error in session: {str(e)}")
        await session.rollback()
        raise RuntimeError("Database session aborted") from e
    finally:
        await session.close()
        log.debug("Database session closed")

create_database_if_not_exists(db_url) async

Asynchronously check and create the database if it does not exist.

Internally runs create_database_if_not_exists_sync in a thread executor to remain non-blocking in async applications.

Parameters:

Name Type Description Default
db_url str

The database URI.

required
Example
await create_database_if_not_exists(
    "postgresql+asyncpg://user:pass@localhost/dbname"
)
Source code in papi/core/db/db_creation.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def create_database_if_not_exists(db_url: str) -> None:
    """
    Asynchronously check and create the database if it does not exist.

    Internally runs `create_database_if_not_exists_sync` in a thread executor
    to remain non-blocking in async applications.

    Args:
        db_url (str): The database URI.

    Example:
        ```python
        await create_database_if_not_exists(
            "postgresql+asyncpg://user:pass@localhost/dbname"
        )
        ```
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, create_database_if_not_exists_sync, db_url)