pysatl_tsp.core.data_providers.websocket_data_provider
Module Contents
Classes
A data provider that streams time series data from a WebSocket connection. |
API
- class pysatl_tsp.core.data_providers.websocket_data_provider.WebSocketDataProvider(uri: str, subscribe_message: dict[str, Any] | None = None)[source]
Bases:
pysatl_tsp.core.data_providers.abstract.DataProvider[str]A data provider that streams time series data from a WebSocket connection.
This class establishes a WebSocket connection to a specified URI and streams received messages into the processing pipeline. It handles the WebSocket connection in a separate thread to avoid blocking the main processing flow and provides a clean streaming interface through the standard iterator protocol.
- Parameters:
uri – WebSocket endpoint URI
subscribe_message – Optional message to send after connection to subscribe to specific data streams
- Example:
# Example: Connecting to Bybit WebSocket API for Bitcoin price data bybit_provider = WebSocketDataProvider( uri="wss://stream.bybit.com/v5/public/spot", subscribe_message={"op": "subscribe", "args": ["tickers.BTCUSDT"]}, ) try: for message in bybit_provider: data = json.loads(message) if "data" in data and data.get("topic") == "tickers.BTCUSDT": price_data = data["data"] print(f"BTC/USDT: {price_data['lastPrice']} (Time: {price_data['timestamp']})") except KeyboardInterrupt: bybit_provider.close()
Initialization
Initialize a WebSocket data provider.
- Parameters:
uri – WebSocket endpoint URI
subscribe_message – Optional message to send after connection to subscribe to specific data streams
- __iter__() collections.abc.Iterator[str][source]
Create an iterator over the messages received from the WebSocket.
This method starts a background thread to handle the WebSocket connection if it’s not already running, and yields messages as they are received.
- Returns:
An iterator yielding message strings from the WebSocket
- async _receiver() None[source]
Asynchronous method to handle WebSocket communication.
This method establishes the WebSocket connection, sends the subscription message if provided, and places received messages into the queue for the iterator.