Source code for pysatl_tsp.core.scrubber.linear_scrubber

from collections.abc import Iterator
from typing import Any, Callable

from pysatl_tsp.core import Handler

from .abstract import Scrubber, ScrubberWindow, T


[docs] class SlidingScrubber(Scrubber[T]): """A flexible scrubber that creates sliding windows of time series data based on custom conditions. This scrubber allows defining custom conditions for when to emit a window and how far to slide the window after each emission. It accumulates data points in a buffer and yields the current window whenever the take condition evaluates to True. :param take_condition: Function that determines when to emit the current window :param shift: Number of points to shift the window after each emission :param source: The handler providing input data, defaults to None Example: .. code-block:: python # Create a data source data_source = SimpleDataProvider([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # Emit windows when they contain exactly 3 elements, shift by 2 condition = lambda window: len(window) == 3 scrubber = SlidingScrubber(take_condition=condition, shift=2, source=data_source) # Process the windows for window in scrubber: print(f"Window values: {list(window.values)}") # Output: # Window values: [1, 2, 3] # Window values: [3, 4, 5] # Window values: [5, 6, 7] # Window values: [7, 8, 9] # Window values: [9, 10] # Create a scrubber that emits windows based on their sum sum_condition = lambda window: sum(window.values) >= 10 sum_scrubber = SlidingScrubber(take_condition=sum_condition, shift=1, source=data_source) for window in sum_scrubber: print(f"Window with sum >= 10: {list(window.values)}, sum: {sum(window.values)}") """ def __init__( self, take_condition: Callable[[ScrubberWindow[T]], bool], shift: int, source: Handler[Any, T] | None = None ) -> None: """Initialize a sliding scrubber with custom condition and shift. :param take_condition: Function that determines when to emit the current window :param shift: Number of points to shift the window after each emission :param source: The handler providing input data, defaults to None """ super().__init__(source) self._shift = shift self._buffer: ScrubberWindow[T] = ScrubberWindow() self._take_condition = take_condition
[docs] def __iter__(self) -> Iterator[ScrubberWindow[T]]: """Create an iterator that yields windows based on the take condition. This method accumulates data points in a buffer and yields the current window whenever the take condition evaluates to True. After yielding, it shifts the window by the specified number of points. :return: Iterator yielding ScrubberWindow instances :raises ValueError: If no source has been set """ if self.source is None: raise ValueError("Source is not set") for i, item in enumerate(self.source): self._buffer.append(item, i) if self._take_condition(self._buffer): yield self._buffer[:] for _ in range(self._shift): if self._buffer: self._buffer.popleft()
[docs] class LinearScrubber(SlidingScrubber[T]): """A scrubber that creates fixed-size sliding windows with configurable overlap. This is a specialized sliding scrubber that emits windows of a fixed size and allows controlling the overlap between consecutive windows through a shift factor. :param window_length: Number of points in each window, defaults to 100 :param shift_factor: Fraction of window to shift after each emission, defaults to 1/3 (e.g., 0.5 means 50% overlap between consecutive windows) :param source: The handler providing input data, defaults to None Example: .. code-block:: python # Create a data source with a sequence of numbers data_source = SimpleDataProvider(range(10)) # Create a linear scrubber with window size 4 and 50% overlap scrubber = LinearScrubber(window_length=4, shift_factor=0.5, source=data_source) # Process the windows for window in scrubber: print(f"Window values: {list(window.values)}") # Output: # Window values: [0, 1, 2, 3] # Window values: [2, 3, 4, 5] # Window values: [4, 5, 6, 7] # Window values: [6, 7, 8, 9] """ def __init__( self, window_length: int = 100, shift_factor: float = 1.0 / 3.0, source: Handler[Any, T] | None = None ) -> None: """Initialize a linear scrubber with fixed window size and overlap. :param window_length: Number of points in each window, defaults to 100 :param shift_factor: Fraction of window to shift after each emission, defaults to 1/3 :param source: The handler providing input data, defaults to None """ shift = max(1, int(shift_factor * window_length)) super().__init__(take_condition=lambda buffer: len(buffer) >= window_length, shift=shift, source=source)