Source code for pysatl_tsp.core.scrubber.segmentation_scrubber

from collections import deque
from collections.abc import Iterator
from typing import Any, Callable

from pysatl_tsp.core import Handler, T

from .abstract import Scrubber, ScrubberWindow


[docs] class OfflineSegmentationScrubber(Scrubber[T]): """A scrubber that segments time series data based on changepoints in batch mode. This scrubber processes the entire input data in a batch (offline) mode and segments it according to a provided segmentation rule. The rule identifies changepoints in the data, which are then used to create non-overlapping segments. This approach is suitable for scenarios where the entire dataset is available upfront and the segmentation logic requires global context or multiple passes over the data. :param segmentation_rule: Function that analyzes the complete series and returns a list of changepoint indices :param source: The handler providing input data, defaults to None Example: .. code-block:: python # Create a data source with synthetic pattern data = [1, 1, 2, 2, 5, 5, 5, 1, 1, 1, 6, 6, 6, 6] data_source = SimpleDataProvider(data) # Define a simple variance-based segmentation rule def find_changepoints(window: ScrubberWindow[int]) -> list[int]: changepoints = [] # Simple detection of value changes for i in range(1, len(window)): if abs(window[i] - window[i - 1]) > 2: # Threshold for change changepoints.append(i) return changepoints # Create the segmentation scrubber segmenter = OfflineSegmentationScrubber(segmentation_rule=find_changepoints, source=data_source) # Process the segments for segment in segmenter: print(f"Segment values: {list(segment.values)}") # Output: # Segment values: [1, 1, 2, 2] # Segment values: [5, 5, 5] # Segment values: [1, 1, 1] # Segment values: [6, 6, 6, 6] """ def __init__( self, segmentation_rule: Callable[[ScrubberWindow[T]], list[int]], source: Handler[Any, T] | None = None ): """Initialize an offline segmentation scrubber. :param segmentation_rule: Function that analyzes the complete series and returns a list of changepoint indices :param source: The handler providing input data, defaults to None """ super().__init__(source) self.segmentation_rule = segmentation_rule
[docs] def __iter__(self) -> Iterator[ScrubberWindow[T]]: """Create an iterator that yields segments based on detected changepoints. This method collects all data from the source, applies the segmentation rule to identify changepoints, and then yields segments between the detected changepoints. :return: Iterator yielding ScrubberWindow instances for each segment :raises ValueError: If no source has been set """ if self.source is None: raise ValueError("Source is not set") full_series_list = list(iter(self.source)) full_series_deque = deque(full_series_list) series_window = ScrubberWindow(full_series_deque) change_points = self.segmentation_rule(series_window) segments = [0, *change_points, len(full_series_deque)] for start, end in zip(segments[:-1], segments[1:]): yield series_window[start:end]
[docs] class OnlineSegmentationScrubber(Scrubber[T]): """A scrubber that segments time series data in real-time based on a condition. This scrubber processes data points sequentially (online mode) and segments the time series whenever a specified condition is met or a maximum segment size is reached. It's designed for streaming data where segments need to be identified in real-time without waiting for the complete dataset. :param segmentation_rule: Function that evaluates the current window and returns True when a segment should end :param max_segment_size: Maximum number of points in a segment before forcing a split, defaults to 2^64 :param source: The handler providing input data, defaults to None Example: .. code-block:: python # Create a data source with streaming values data = [1, 1, 2, 3, 8, 9, 8, 2, 2, 3, 10, 10, 9, 9] data_source = SimpleDataProvider(data) # Define a threshold-based segmentation rule def detect_jump(window: ScrubberWindow[int]) -> bool: if len(window) < 2: return False # Detect a large jump in values last_value = window[-1] prev_value = window[-2] return abs(last_value - prev_value) > 3 # Create the online segmentation scrubber segmenter = OnlineSegmentationScrubber( segmentation_rule=detect_jump, max_segment_size=5, # Force segmentation after 5 points if no jump detected source=data_source, ) # Process the segments as they're detected for segment in segmenter: print(f"Segment values: {list(segment.values)}") # Output: # Segment values: [1, 1, 2, 3, 8] # Split due to jump from 3 to 8 and max size # Segment values: [9, 8, 2] # Split due to jump from 8 to 2 # Segment values: [2, 3, 10] # Split due to jump from 3 to 10 # Segment values: [10, 9, 9] # Remaining data """ def __init__( self, segmentation_rule: Callable[[ScrubberWindow[T]], bool], max_segment_size: int = 2**64, source: Handler[Any, T] | None = None, ): """Initialize an online segmentation scrubber. :param segmentation_rule: Function that evaluates the current window and returns True when a segment should end :param max_segment_size: Maximum number of points in a segment before forcing a split, defaults to 2^64 :param source: The handler providing input data, defaults to None """ super().__init__(source) self.segmentation_rule = segmentation_rule self.max_segment_size = max_segment_size
[docs] def __iter__(self) -> Iterator[ScrubberWindow[T]]: """Create an iterator that yields segments as they're detected in real-time. This method processes data points one by one, accumulating them in a buffer and checking after each addition whether the segmentation condition is met or the maximum segment size is reached. :return: Iterator yielding ScrubberWindow instances for each detected segment :raises ValueError: If no source has been set """ if self.source is None: raise ValueError("Source is not set") current_window: ScrubberWindow[T] = ScrubberWindow(deque()) for index, item in enumerate(self.source): current_window.append(item, index) if self.segmentation_rule(current_window) or len(current_window) >= self.max_segment_size: yield current_window.copy() current_window.clear() if current_window: yield current_window.copy()