pysatl_tsp.core.data_providers.database_data_provider

Module Contents

Classes

DataBaseDataProvider

A data provider that sources time series data from a database.

DatabaseAdapter

Abstract base class for database adapter implementations.

API

class pysatl_tsp.core.data_providers.database_data_provider.DataBaseDataProvider(connection_params: dict[str, Any], query: str, adapter: pysatl_tsp.core.data_providers.database_data_provider.DatabaseAdapter[pysatl_tsp.core.data_providers.abstract.T], params: tuple[Any, ...] = ())[source]

Bases: pysatl_tsp.core.data_providers.abstract.DataProvider[pysatl_tsp.core.data_providers.abstract.T], typing.Generic[pysatl_tsp.core.data_providers.abstract.T]

A data provider that sources time series data from a database.

This class provides a way to query time series data from any database system using adapters that implement the DatabaseAdapter interface. It handles the connection lifecycle and streaming of data from database queries.

Parameters:
  • connection_params – Dictionary containing connection parameters

  • query – SQL query string to execute

  • adapter – Database adapter implementation

  • params – Query parameters, defaults to ()

Example:
# Using the SQLiteAdapter from the example above
import sqlite3

class SQLiteAdapter(DatabaseAdapter[dict[str, Any]]):
    # ... adapter implementation as shown above ...

# Create a data provider for SQLite database
provider = DataBaseDataProvider(
    connection_params={"database": "sensors.db"},
    query="SELECT timestamp, value FROM temperature WHERE location_id = ? AND timestamp > ?",
    adapter=SQLiteAdapter(),
    params=("zone-1", "2023-01-01")
)

# Use the provider in a processing pipeline
for record in provider:
    print(f"Time: {record['timestamp']}, Value: {record['value']}")

# Or connect to a processing pipeline
pipeline = provider | WindowHandler(60) | AverageHandler()

Initialization

Initialize a database data provider.

Parameters:
  • connection_params – Dictionary containing connection parameters

  • query – SQL query string to execute

  • adapter – Database adapter implementation

  • params – Query parameters, defaults to ()

__iter__() collections.abc.Iterator[pysatl_tsp.core.data_providers.abstract.T][source]

Create an iterator over the query results from the database.

This method executes the query and yields data items by delegating to the adapter’s fetch_data method.

Returns:

An iterator yielding data items from the database query

Raises:

Exception – If database operations fail

_connection_context() Any[source]

Context manager for database connection lifecycle.

This method handles the proper setup and teardown of database connections, ensuring that resources are properly released even in the case of errors.

Returns:

Database cursor or similar query result object

Raises:

Exception – If database operations fail

class pysatl_tsp.core.data_providers.database_data_provider.DatabaseAdapter[source]

Bases: abc.ABC, typing.Generic[pysatl_tsp.core.data_providers.abstract.T]

Abstract base class for database adapter implementations.

This class defines the interface for adapters that connect to different database systems. Concrete implementations of this class handle the specific details of connecting to databases, executing queries, and transforming results into a consistent format for the data processing pipeline.

Example:
import sqlite3


class SQLiteAdapter(DatabaseAdapter[dict[str, Any]]):
    def connect(self, connection_params: dict[str, Any]) -> sqlite3.Connection:
        connection = sqlite3.connect(connection_params["database"])
        connection.row_factory = sqlite3.Row
        return connection

    def execute_query(
        self, connection: sqlite3.Connection, query: str, params: tuple[Any, ...] = ()
    ) -> sqlite3.Cursor:
        cursor = connection.cursor()
        cursor.execute(query, params)
        return cursor

    def fetch_data(self, cursor: sqlite3.Cursor) -> Iterator[dict[str, Any]]:
        for row in cursor:
            yield dict(row)

    def close_cursor(self, cursor: sqlite3.Cursor) -> None:
        cursor.close()

    def close_connection(self, connection: sqlite3.Connection) -> None:
        connection.close()


# Usage:
adapter = SQLiteAdapter()
provider = DataBaseDataProvider(
    connection_params={"database": "time_series.db"},
    query="SELECT timestamp, value FROM measurements WHERE sensor_id = ?",
    adapter=adapter,
    params=(42,),
)
abstractmethod close_connection(connection: Any) None[source]

Close the database connection.

Parameters:

connection – Database connection object

abstractmethod close_cursor(cursor: Any) None[source]

Close the query cursor.

Parameters:

cursor – Query result cursor

abstractmethod connect(connection_params: dict[str, Any]) Any[source]

Establish a connection to the database.

Parameters:

connection_params – Dictionary containing connection parameters (e.g., host, port, username, password, etc.)

Returns:

Database connection object

Raises:

Exception – If connection fails

abstractmethod execute_query(connection: Any, query: str, params: tuple[Any, ...] = ()) Any[source]

Execute a query on the given database connection.

Parameters:
  • connection – Database connection object

  • query – SQL query string

  • params – Query parameters, defaults to ()

Returns:

Query result cursor or similar object

Raises:

Exception – If query execution fails

abstractmethod fetch_data(cursor: Any) collections.abc.Iterator[pysatl_tsp.core.data_providers.abstract.T][source]

Extract and transform data from the query result.

Parameters:

cursor – Query result cursor

Returns:

Iterator yielding data items of type T