Skip to content

StarRocks

The starrocks/sqlalchemy adapter provides integration with StarRocks, a high-performance analytical database compatible with the MySQL protocol, using SQLAlchemy.

Session Managers

StarRocks-specific session manager handling connection management for the StarRocks analytical engine.

Classes:

Name Description
StarRocksSQlAlchemySessionManager

Synchronous SQLAlchemy session manager for StarRocks.

AsyncStarRocksSQlAlchemySessionManager

Asynchronous SQLAlchemy session manager for StarRocks.

StarRocksSQlAlchemySessionManager

Synchronous SQLAlchemy session manager for StarRocks.

Inherits from BaseSQLAlchemySessionManager to provide StarRocks-specific session management, including connection URL creation and engine configuration.

Parameters:

Name Type Description Default
orm_config StarRocksSQLAlchemyConfig | None

StarRocks-specific configuration. If None, uses global config.

None
Source code in archipy/adapters/starrocks/sqlalchemy/session_managers.py
class StarRocksSQlAlchemySessionManager(BaseSQLAlchemySessionManager[StarRocksSQLAlchemyConfig], metaclass=Singleton):
    """Synchronous SQLAlchemy session manager for StarRocks.

    Inherits from BaseSQLAlchemySessionManager to provide StarRocks-specific session
    management, including connection URL creation and engine configuration.

    Args:
        orm_config: StarRocks-specific configuration. If None, uses global config.
    """

    def __init__(self, orm_config: StarRocksSQLAlchemyConfig | None = None) -> None:
        """Initialize the StarRocks session manager.

        Args:
            orm_config: StarRocks-specific configuration. If None, uses global config.
        """
        configs = BaseConfig.global_config().STARROCKS_SQLALCHEMY if orm_config is None else orm_config
        super().__init__(configs)

    @override
    def _expected_config_type(self) -> type[StarRocksSQLAlchemyConfig]:
        """Return the expected configuration type for StarRocks.

        Returns:
            The StarRocksSQLAlchemyConfig class.
        """
        return StarRocksSQLAlchemyConfig

    @override
    def _get_database_name(self) -> str:
        """Return the name of the database being used.

        Returns:
            str: The name of the database ('starrocks').
        """
        return "starrocks"

    @override
    def _get_connect_args(self) -> dict:
        """Return connection arguments for StarRocks to ensure proper transaction support.

        StarRocks (using MySQL protocol) requires autocommit to be explicitly disabled
        to ensure transactions work properly with rollback support.

        Returns:
            A dictionary with autocommit=False and connect_timeout from config.
        """
        connect_args = {}

        # Add connect_timeout if configured
        if hasattr(self, "_configs"):
            if hasattr(self._configs, "CONNECT_TIMEOUT") and self._configs.CONNECT_TIMEOUT is not None:
                connect_args["connect_timeout"] = self._configs.CONNECT_TIMEOUT

        # Add StarRocks-specific setting for transaction support
        connect_args["autocommit"] = False

        return connect_args

    @override
    def _create_url(self, configs: StarRocksSQLAlchemyConfig) -> URL:
        """Create a StarRocks connection URL.

        Args:
            configs: StarRocks configuration.

        Returns:
            A SQLAlchemy URL object for StarRocks.

        Raises:
            DatabaseConnectionError: If there's an error creating the URL.
        """
        try:
            return URL.create(
                drivername=configs.DRIVER_NAME,
                username=configs.USERNAME,
                password=configs.PASSWORD,
                host=configs.HOST,
                port=configs.PORT,
                database=configs.DATABASE,
            )
        except SQLAlchemyError as e:
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

AsyncStarRocksSQlAlchemySessionManager

Asynchronous SQLAlchemy session manager for StarRocks.

Inherits from AsyncBaseSQLAlchemySessionManager to provide async StarRocks-specific session management, including connection URL creation and async engine configuration.

Parameters:

Name Type Description Default
orm_config StarRocksSQLAlchemyConfig | None

StarRocks-specific configuration. If None, uses global config.

None
Source code in archipy/adapters/starrocks/sqlalchemy/session_managers.py
class AsyncStarRocksSQlAlchemySessionManager(
    AsyncBaseSQLAlchemySessionManager[StarRocksSQLAlchemyConfig],
    metaclass=Singleton,
):
    """Asynchronous SQLAlchemy session manager for StarRocks.

    Inherits from AsyncBaseSQLAlchemySessionManager to provide async StarRocks-specific
    session management, including connection URL creation and async engine configuration.

    Args:
        orm_config: StarRocks-specific configuration. If None, uses global config.
    """

    def __init__(self, orm_config: StarRocksSQLAlchemyConfig | None = None) -> None:
        """Initialize the async StarRocks session manager.

        Args:
            orm_config: StarRocks-specific configuration. If None, uses global config.
        """
        configs = BaseConfig.global_config().STARROCKS_SQLALCHEMY if orm_config is None else orm_config
        super().__init__(configs)

    @override
    def _expected_config_type(self) -> type[StarRocksSQLAlchemyConfig]:
        """Return the expected configuration type for StarRocks.

        Returns:
            The StarRocksSQLAlchemyConfig class.
        """
        return StarRocksSQLAlchemyConfig

    @override
    def _get_database_name(self) -> str:
        """Return the name of the database being used.

        Returns:
            str: The name of the database ('starrocks').
        """
        return "starrocks"

    @override
    def _create_url(self, configs: StarRocksSQLAlchemyConfig) -> URL:
        """Create an async StarRocks connection URL.

        For async operations, StarRocks requires the starrocks+asyncmy driver
        which uses the asyncmy library for async MySQL protocol support while
        maintaining StarRocks dialect features (type mapping, compiler patches).

        Args:
            configs: StarRocks configuration.

        Returns:
            A SQLAlchemy URL object for StarRocks with async driver.

        Raises:
            DatabaseConnectionError: If there's an error creating the URL.
        """
        try:
            return URL.create(
                drivername="starrocks+asyncmy",
                username=configs.USERNAME,
                password=configs.PASSWORD,
                host=configs.HOST,
                port=configs.PORT,
                database=configs.DATABASE,
            )
        except SQLAlchemyError as e:
            raise DatabaseConnectionError(
                database=self._get_database_name(),
            ) from e

    @override
    def _get_connect_args(self) -> dict:
        """Return connection arguments for async StarRocks to ensure proper transaction support.

        StarRocks (using MySQL protocol via asyncmy) requires autocommit to be explicitly disabled
        to ensure transactions work properly with rollback support.

        Note: asyncmy driver only supports connect_timeout, not read_timeout/write_timeout.
        These socket-level timeouts are handled differently in async drivers.

        Returns:
            A dictionary with autocommit=False and connect_timeout (no read/write timeouts for asyncmy).
        """
        connect_args = {}

        # Add connect_timeout if configured
        if hasattr(self, "_configs"):
            if hasattr(self._configs, "CONNECT_TIMEOUT") and self._configs.CONNECT_TIMEOUT is not None:
                connect_args["connect_timeout"] = self._configs.CONNECT_TIMEOUT

        # Add StarRocks async-specific setting for transaction support
        connect_args["autocommit"] = False

        return connect_args

options: show_root_toc_entry: false heading_level: 3

Session Manager Registry

Registry for StarRocks session manager instances.

Classes:

Name Description
StarRocksSessionManagerRegistry

Registry for StarRocks SQLAlchemy session managers.

StarRocksSessionManagerRegistry

Registry for StarRocks SQLAlchemy session managers.

This registry provides a centralized access point for both synchronous and asynchronous StarRocks session managers, implementing the Service Locator pattern. It lazily initializes the appropriate session manager when first requested.

The registry maintains singleton instances of: - A synchronous session manager (StarRocksSQlAlchemySessionManager) - An asynchronous session manager (AsyncStarRocksSQlAlchemySessionManager)

Methods:

Name Description
get_sync_manager

Get the synchronous StarRocks session manager instance.

set_sync_manager

Set a custom synchronous session manager.

get_async_manager

Get the asynchronous StarRocks session manager instance.

set_async_manager

Set a custom asynchronous session manager.

reset

Reset the registry to its initial state.

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
class StarRocksSessionManagerRegistry(SessionManagerRegistry, metaclass=Singleton):
    """Registry for StarRocks SQLAlchemy session managers.

    This registry provides a centralized access point for both synchronous and
    asynchronous StarRocks session managers, implementing the Service Locator pattern.
    It lazily initializes the appropriate session manager when first requested.

    The registry maintains singleton instances of:
    - A synchronous session manager (StarRocksSQlAlchemySessionManager)
    - An asynchronous session manager (AsyncStarRocksSQlAlchemySessionManager)
    """

    @classmethod
    def get_sync_manager(cls) -> SessionManagerPort:
        """Get the synchronous StarRocks session manager instance.

        Lazily initializes a default StarRocksSQlAlchemySessionManager if none has been set.

        Returns:
            SessionManagerPort: The registered synchronous session manager

        Raises:
            DatabaseConnectionError: If there's an error initializing the session manager
        """
        if cls._sync_instance is None:
            try:
                from archipy.adapters.starrocks.sqlalchemy.session_managers import StarRocksSQlAlchemySessionManager

                cls._sync_instance = StarRocksSQlAlchemySessionManager()
            except Exception as e:
                raise DatabaseConnectionError(
                    database="starrocks",
                ) from e
        return cls._sync_instance

    @classmethod
    def set_sync_manager(cls, manager: SessionManagerPort) -> None:
        """Set a custom synchronous session manager.

        Args:
            manager: An instance implementing SessionManagerPort

        Raises:
            InvalidArgumentError: If the manager is None or doesn't implement SessionManagerPort
        """
        if manager is None:
            raise InvalidArgumentError("StarRocks session manager cannot be None")
        from archipy.adapters.base.sqlalchemy.session_manager_ports import SessionManagerPort

        if not isinstance(manager, SessionManagerPort):
            raise InvalidArgumentError(f"Manager must implement SessionManagerPort, got {type(manager).__name__}")
        cls._sync_instance = manager

    @classmethod
    def get_async_manager(cls) -> AsyncSessionManagerPort:
        """Get the asynchronous StarRocks session manager instance.

        Lazily initializes a default AsyncStarRocksSQlAlchemySessionManager if none has been set.

        Returns:
            AsyncSessionManagerPort: The registered asynchronous session manager

        Raises:
            DatabaseConnectionError: If there's an error initializing the session manager
        """
        if cls._async_instance is None:
            try:
                from archipy.adapters.starrocks.sqlalchemy.session_managers import (
                    AsyncStarRocksSQlAlchemySessionManager,
                )

                cls._async_instance = AsyncStarRocksSQlAlchemySessionManager()
            except Exception as e:
                raise DatabaseConnectionError(
                    database="starrocks",
                ) from e
        return cls._async_instance

    @classmethod
    def set_async_manager(cls, manager: AsyncSessionManagerPort) -> None:
        """Set a custom asynchronous session manager.

        Args:
            manager: An instance implementing AsyncSessionManagerPort

        Raises:
            InvalidArgumentError: If the manager is None or doesn't implement AsyncSessionManagerPort
        """
        if manager is None:
            raise InvalidArgumentError("StarRocks async session manager cannot be None")
        from archipy.adapters.base.sqlalchemy.session_manager_ports import AsyncSessionManagerPort

        if not isinstance(manager, AsyncSessionManagerPort):
            raise InvalidArgumentError(f"Manager must implement AsyncSessionManagerPort, got {type(manager).__name__}")
        cls._async_instance = manager

    @classmethod
    def reset(cls) -> None:
        """Reset the registry to its initial state.

        This method clears both registered managers, useful for testing.
        """
        cls._sync_instance = None
        cls._async_instance = None

get_sync_manager classmethod

get_sync_manager() -> SessionManagerPort

Get the synchronous StarRocks session manager instance.

Lazily initializes a default StarRocksSQlAlchemySessionManager if none has been set.

Returns:

Name Type Description
SessionManagerPort SessionManagerPort

The registered synchronous session manager

Raises:

Type Description
DatabaseConnectionError

If there's an error initializing the session manager

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
@classmethod
def get_sync_manager(cls) -> SessionManagerPort:
    """Get the synchronous StarRocks session manager instance.

    Lazily initializes a default StarRocksSQlAlchemySessionManager if none has been set.

    Returns:
        SessionManagerPort: The registered synchronous session manager

    Raises:
        DatabaseConnectionError: If there's an error initializing the session manager
    """
    if cls._sync_instance is None:
        try:
            from archipy.adapters.starrocks.sqlalchemy.session_managers import StarRocksSQlAlchemySessionManager

            cls._sync_instance = StarRocksSQlAlchemySessionManager()
        except Exception as e:
            raise DatabaseConnectionError(
                database="starrocks",
            ) from e
    return cls._sync_instance

set_sync_manager classmethod

set_sync_manager(manager: SessionManagerPort) -> None

Set a custom synchronous session manager.

Parameters:

Name Type Description Default
manager SessionManagerPort

An instance implementing SessionManagerPort

required

Raises:

Type Description
InvalidArgumentError

If the manager is None or doesn't implement SessionManagerPort

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
@classmethod
def set_sync_manager(cls, manager: SessionManagerPort) -> None:
    """Set a custom synchronous session manager.

    Args:
        manager: An instance implementing SessionManagerPort

    Raises:
        InvalidArgumentError: If the manager is None or doesn't implement SessionManagerPort
    """
    if manager is None:
        raise InvalidArgumentError("StarRocks session manager cannot be None")
    from archipy.adapters.base.sqlalchemy.session_manager_ports import SessionManagerPort

    if not isinstance(manager, SessionManagerPort):
        raise InvalidArgumentError(f"Manager must implement SessionManagerPort, got {type(manager).__name__}")
    cls._sync_instance = manager

get_async_manager classmethod

get_async_manager() -> AsyncSessionManagerPort

Get the asynchronous StarRocks session manager instance.

Lazily initializes a default AsyncStarRocksSQlAlchemySessionManager if none has been set.

Returns:

Name Type Description
AsyncSessionManagerPort AsyncSessionManagerPort

The registered asynchronous session manager

Raises:

Type Description
DatabaseConnectionError

If there's an error initializing the session manager

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
@classmethod
def get_async_manager(cls) -> AsyncSessionManagerPort:
    """Get the asynchronous StarRocks session manager instance.

    Lazily initializes a default AsyncStarRocksSQlAlchemySessionManager if none has been set.

    Returns:
        AsyncSessionManagerPort: The registered asynchronous session manager

    Raises:
        DatabaseConnectionError: If there's an error initializing the session manager
    """
    if cls._async_instance is None:
        try:
            from archipy.adapters.starrocks.sqlalchemy.session_managers import (
                AsyncStarRocksSQlAlchemySessionManager,
            )

            cls._async_instance = AsyncStarRocksSQlAlchemySessionManager()
        except Exception as e:
            raise DatabaseConnectionError(
                database="starrocks",
            ) from e
    return cls._async_instance

set_async_manager classmethod

set_async_manager(manager: AsyncSessionManagerPort) -> None

Set a custom asynchronous session manager.

Parameters:

Name Type Description Default
manager AsyncSessionManagerPort

An instance implementing AsyncSessionManagerPort

required

Raises:

Type Description
InvalidArgumentError

If the manager is None or doesn't implement AsyncSessionManagerPort

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
@classmethod
def set_async_manager(cls, manager: AsyncSessionManagerPort) -> None:
    """Set a custom asynchronous session manager.

    Args:
        manager: An instance implementing AsyncSessionManagerPort

    Raises:
        InvalidArgumentError: If the manager is None or doesn't implement AsyncSessionManagerPort
    """
    if manager is None:
        raise InvalidArgumentError("StarRocks async session manager cannot be None")
    from archipy.adapters.base.sqlalchemy.session_manager_ports import AsyncSessionManagerPort

    if not isinstance(manager, AsyncSessionManagerPort):
        raise InvalidArgumentError(f"Manager must implement AsyncSessionManagerPort, got {type(manager).__name__}")
    cls._async_instance = manager

reset classmethod

reset() -> None

Reset the registry to its initial state.

This method clears both registered managers, useful for testing.

Source code in archipy/adapters/starrocks/sqlalchemy/session_manager_registry.py
@classmethod
def reset(cls) -> None:
    """Reset the registry to its initial state.

    This method clears both registered managers, useful for testing.
    """
    cls._sync_instance = None
    cls._async_instance = None

options: show_root_toc_entry: false heading_level: 3

Adapters

Concrete StarRocks adapter built on the base SQLAlchemy adapter with StarRocks-specific dialect configuration.

Classes:

Name Description
StarrocksSQLAlchemyAdapter

Synchronous SQLAlchemy adapter for Starrocks.

AsyncStarrocksSQLAlchemyAdapter

Asynchronous SQLAlchemy adapter for Starrocks.

StarrocksSQLAlchemyAdapter

Synchronous SQLAlchemy adapter for Starrocks.

Inherits from BaseSQLAlchemyAdapter to provide Starrocks-specific session management and database operations.

Parameters:

Name Type Description Default
orm_config StarRocksSQLAlchemyConfig | None

Starrocks-specific configuration. If None, uses global config.

None
Source code in archipy/adapters/starrocks/sqlalchemy/adapters.py
class StarrocksSQLAlchemyAdapter(BaseSQLAlchemyAdapter[StarRocksSQLAlchemyConfig]):
    """Synchronous SQLAlchemy adapter for Starrocks.

    Inherits from BaseSQLAlchemyAdapter to provide Starrocks-specific session management
    and database operations.

    Args:
        orm_config: Starrocks-specific configuration. If None, uses global config.
    """

    def __init__(self, orm_config: StarRocksSQLAlchemyConfig | None = None) -> None:
        """Initialize the Starrocks adapter with a session manager.

        Args:
            orm_config: Starrocks-specific configuration. If None, uses global config.
        """
        configs = BaseConfig.global_config().STARROCKS_SQLALCHEMY if orm_config is None else orm_config
        super().__init__(configs)

    @override
    def _create_session_manager(self, configs: StarRocksSQLAlchemyConfig) -> StarRocksSQlAlchemySessionManager:
        """Create a Starrocks-specific session manager.

        Args:
            configs: Starrocks configuration.

        Returns:
            A Starrocks session manager instance.
        """
        return StarRocksSQlAlchemySessionManager(configs)

AsyncStarrocksSQLAlchemyAdapter

Asynchronous SQLAlchemy adapter for Starrocks.

Inherits from AsyncBaseSQLAlchemyAdapter to provide async Starrocks-specific session management and database operations.

Parameters:

Name Type Description Default
orm_config StarRocksSQLAlchemyConfig | None

Starrocks-specific configuration. If None, uses global config.

None
Source code in archipy/adapters/starrocks/sqlalchemy/adapters.py
class AsyncStarrocksSQLAlchemyAdapter(AsyncBaseSQLAlchemyAdapter[StarRocksSQLAlchemyConfig]):
    """Asynchronous SQLAlchemy adapter for Starrocks.

    Inherits from AsyncBaseSQLAlchemyAdapter to provide async Starrocks-specific session
    management and database operations.

    Args:
        orm_config: Starrocks-specific configuration. If None, uses global config.
    """

    def __init__(self, orm_config: StarRocksSQLAlchemyConfig | None = None) -> None:
        """Initialize the async Starrocks adapter with a session manager.

        Args:
            orm_config: Starrocks-specific configuration. If None, uses global config.
        """
        configs = BaseConfig.global_config().STARROCKS_SQLALCHEMY if orm_config is None else orm_config
        super().__init__(configs)

    @override
    def _create_async_session_manager(
        self,
        configs: StarRocksSQLAlchemyConfig,
    ) -> AsyncStarRocksSQlAlchemySessionManager:
        """Create an async Starrocks-specific session manager.

        Args:
            configs: Starrocks configuration.

        Returns:
            An async Starrocks session manager instance.
        """
        return AsyncStarRocksSQlAlchemySessionManager(configs)

options: show_root_toc_entry: false heading_level: 3