pysatl_tsp.core.data_providers.database_data_provider
Module Contents
Classes
A data provider that sources time series data from a database. |
|
Abstract base class for database adapter implementations. |
API
- class pysatl_tsp.core.data_providers.database_data_provider.DataBaseDataProvider(connection_params: dict[str, Any], query: str, adapter: pysatl_tsp.core.data_providers.database_data_provider.DatabaseAdapter[pysatl_tsp.core.data_providers.abstract.T], params: tuple[Any, ...] = ())[source]
Bases:
pysatl_tsp.core.data_providers.abstract.DataProvider[pysatl_tsp.core.data_providers.abstract.T],typing.Generic[pysatl_tsp.core.data_providers.abstract.T]A data provider that sources time series data from a database.
This class provides a way to query time series data from any database system using adapters that implement the DatabaseAdapter interface. It handles the connection lifecycle and streaming of data from database queries.
- Parameters:
connection_params – Dictionary containing connection parameters
query – SQL query string to execute
adapter – Database adapter implementation
params – Query parameters, defaults to ()
- Example:
# Using the SQLiteAdapter from the example above import sqlite3 class SQLiteAdapter(DatabaseAdapter[dict[str, Any]]): # ... adapter implementation as shown above ... # Create a data provider for SQLite database provider = DataBaseDataProvider( connection_params={"database": "sensors.db"}, query="SELECT timestamp, value FROM temperature WHERE location_id = ? AND timestamp > ?", adapter=SQLiteAdapter(), params=("zone-1", "2023-01-01") ) # Use the provider in a processing pipeline for record in provider: print(f"Time: {record['timestamp']}, Value: {record['value']}") # Or connect to a processing pipeline pipeline = provider | WindowHandler(60) | AverageHandler()
Initialization
Initialize a database data provider.
- Parameters:
connection_params – Dictionary containing connection parameters
query – SQL query string to execute
adapter – Database adapter implementation
params – Query parameters, defaults to ()
- __iter__() collections.abc.Iterator[pysatl_tsp.core.data_providers.abstract.T][source]
Create an iterator over the query results from the database.
This method executes the query and yields data items by delegating to the adapter’s fetch_data method.
- Returns:
An iterator yielding data items from the database query
- Raises:
Exception – If database operations fail
- _connection_context() Any[source]
Context manager for database connection lifecycle.
This method handles the proper setup and teardown of database connections, ensuring that resources are properly released even in the case of errors.
- Returns:
Database cursor or similar query result object
- Raises:
Exception – If database operations fail
- class pysatl_tsp.core.data_providers.database_data_provider.DatabaseAdapter[source]
Bases:
abc.ABC,typing.Generic[pysatl_tsp.core.data_providers.abstract.T]Abstract base class for database adapter implementations.
This class defines the interface for adapters that connect to different database systems. Concrete implementations of this class handle the specific details of connecting to databases, executing queries, and transforming results into a consistent format for the data processing pipeline.
- Example:
import sqlite3 class SQLiteAdapter(DatabaseAdapter[dict[str, Any]]): def connect(self, connection_params: dict[str, Any]) -> sqlite3.Connection: connection = sqlite3.connect(connection_params["database"]) connection.row_factory = sqlite3.Row return connection def execute_query( self, connection: sqlite3.Connection, query: str, params: tuple[Any, ...] = () ) -> sqlite3.Cursor: cursor = connection.cursor() cursor.execute(query, params) return cursor def fetch_data(self, cursor: sqlite3.Cursor) -> Iterator[dict[str, Any]]: for row in cursor: yield dict(row) def close_cursor(self, cursor: sqlite3.Cursor) -> None: cursor.close() def close_connection(self, connection: sqlite3.Connection) -> None: connection.close() # Usage: adapter = SQLiteAdapter() provider = DataBaseDataProvider( connection_params={"database": "time_series.db"}, query="SELECT timestamp, value FROM measurements WHERE sensor_id = ?", adapter=adapter, params=(42,), )
- abstractmethod close_connection(connection: Any) None[source]
Close the database connection.
- Parameters:
connection – Database connection object
- abstractmethod close_cursor(cursor: Any) None[source]
Close the query cursor.
- Parameters:
cursor – Query result cursor
- abstractmethod connect(connection_params: dict[str, Any]) Any[source]
Establish a connection to the database.
- Parameters:
connection_params – Dictionary containing connection parameters (e.g., host, port, username, password, etc.)
- Returns:
Database connection object
- Raises:
Exception – If connection fails
- abstractmethod execute_query(connection: Any, query: str, params: tuple[Any, ...] = ()) Any[source]
Execute a query on the given database connection.
- Parameters:
connection – Database connection object
query – SQL query string
params – Query parameters, defaults to ()
- Returns:
Query result cursor or similar object
- Raises:
Exception – If query execution fails