pysatl_tsp.core.data_providers.websocket_data_provider

Module Contents

Classes

WebSocketDataProvider

A data provider that streams time series data from a WebSocket connection.

API

class pysatl_tsp.core.data_providers.websocket_data_provider.WebSocketDataProvider(uri: str, subscribe_message: dict[str, Any] | None = None)[source]

Bases: pysatl_tsp.core.data_providers.abstract.DataProvider[str]

A data provider that streams time series data from a WebSocket connection.

This class establishes a WebSocket connection to a specified URI and streams received messages into the processing pipeline. It handles the WebSocket connection in a separate thread to avoid blocking the main processing flow and provides a clean streaming interface through the standard iterator protocol.

Parameters:
  • uri – WebSocket endpoint URI

  • subscribe_message – Optional message to send after connection to subscribe to specific data streams

Example:
# Example: Connecting to Bybit WebSocket API for Bitcoin price data

bybit_provider = WebSocketDataProvider(
    uri="wss://stream.bybit.com/v5/public/spot",
    subscribe_message={"op": "subscribe", "args": ["tickers.BTCUSDT"]},
)

try:
    for message in bybit_provider:
        data = json.loads(message)
        if "data" in data and data.get("topic") == "tickers.BTCUSDT":
            price_data = data["data"]
            print(f"BTC/USDT: {price_data['lastPrice']} (Time: {price_data['timestamp']})")
except KeyboardInterrupt:
    bybit_provider.close()

Initialization

Initialize a WebSocket data provider.

Parameters:
  • uri – WebSocket endpoint URI

  • subscribe_message – Optional message to send after connection to subscribe to specific data streams

__iter__() collections.abc.Iterator[str][source]

Create an iterator over the messages received from the WebSocket.

This method starts a background thread to handle the WebSocket connection if it’s not already running, and yields messages as they are received.

Returns:

An iterator yielding message strings from the WebSocket

async _receiver() None[source]

Asynchronous method to handle WebSocket communication.

This method establishes the WebSocket connection, sends the subscription message if provided, and places received messages into the queue for the iterator.

_thread_main() None[source]

Entry point for the background thread.

This method runs the asyncio event loop that handles the WebSocket connection.

close() None[source]

Close the WebSocket connection and stop the background thread.

This method should be called when the provider is no longer needed to release resources properly.