pysatl_tsp.core.scrubber.segmentation_scrubber

Module Contents

Classes

OfflineSegmentationScrubber

A scrubber that segments time series data based on changepoints in batch mode.

OnlineSegmentationScrubber

A scrubber that segments time series data in real-time based on a condition.

API

class pysatl_tsp.core.scrubber.segmentation_scrubber.OfflineSegmentationScrubber(segmentation_rule: Callable[[pysatl_tsp.core.scrubber.abstract.ScrubberWindow[pysatl_tsp.core.T]], list[int]], source: pysatl_tsp.core.Handler[Any, pysatl_tsp.core.T] | None = None)[source]

Bases: pysatl_tsp.core.scrubber.abstract.Scrubber[pysatl_tsp.core.T]

A scrubber that segments time series data based on changepoints in batch mode.

This scrubber processes the entire input data in a batch (offline) mode and segments it according to a provided segmentation rule. The rule identifies changepoints in the data, which are then used to create non-overlapping segments.

This approach is suitable for scenarios where the entire dataset is available upfront and the segmentation logic requires global context or multiple passes over the data.

Parameters:
  • segmentation_rule – Function that analyzes the complete series and returns a list of changepoint indices

  • source – The handler providing input data, defaults to None

Example:
# Create a data source with synthetic pattern
data = [1, 1, 2, 2, 5, 5, 5, 1, 1, 1, 6, 6, 6, 6]
data_source = SimpleDataProvider(data)


# Define a simple variance-based segmentation rule
def find_changepoints(window: ScrubberWindow[int]) -> list[int]:
    changepoints = []
    # Simple detection of value changes
    for i in range(1, len(window)):
        if abs(window[i] - window[i - 1]) > 2:  # Threshold for change
            changepoints.append(i)
    return changepoints


# Create the segmentation scrubber
segmenter = OfflineSegmentationScrubber(segmentation_rule=find_changepoints, source=data_source)

# Process the segments
for segment in segmenter:
    print(f"Segment values: {list(segment.values)}")

# Output:
# Segment values: [1, 1, 2, 2]
# Segment values: [5, 5, 5]
# Segment values: [1, 1, 1]
# Segment values: [6, 6, 6, 6]

Initialization

Initialize an offline segmentation scrubber.

Parameters:
  • segmentation_rule – Function that analyzes the complete series and returns a list of changepoint indices

  • source – The handler providing input data, defaults to None

__iter__() collections.abc.Iterator[pysatl_tsp.core.scrubber.abstract.ScrubberWindow[pysatl_tsp.core.T]][source]

Create an iterator that yields segments based on detected changepoints.

This method collects all data from the source, applies the segmentation rule to identify changepoints, and then yields segments between the detected changepoints.

Returns:

Iterator yielding ScrubberWindow instances for each segment

Raises:

ValueError – If no source has been set

class pysatl_tsp.core.scrubber.segmentation_scrubber.OnlineSegmentationScrubber(segmentation_rule: Callable[[pysatl_tsp.core.scrubber.abstract.ScrubberWindow[pysatl_tsp.core.T]], bool], max_segment_size: int = 2**64, source: pysatl_tsp.core.Handler[Any, pysatl_tsp.core.T] | None = None)[source]

Bases: pysatl_tsp.core.scrubber.abstract.Scrubber[pysatl_tsp.core.T]

A scrubber that segments time series data in real-time based on a condition.

This scrubber processes data points sequentially (online mode) and segments the time series whenever a specified condition is met or a maximum segment size is reached. It’s designed for streaming data where segments need to be identified in real-time without waiting for the complete dataset.

Parameters:
  • segmentation_rule – Function that evaluates the current window and returns True when a segment should end

  • max_segment_size – Maximum number of points in a segment before forcing a split, defaults to 2^64

  • source – The handler providing input data, defaults to None

Example:
# Create a data source with streaming values
data = [1, 1, 2, 3, 8, 9, 8, 2, 2, 3, 10, 10, 9, 9]
data_source = SimpleDataProvider(data)


# Define a threshold-based segmentation rule
def detect_jump(window: ScrubberWindow[int]) -> bool:
    if len(window) < 2:
        return False

    # Detect a large jump in values
    last_value = window[-1]
    prev_value = window[-2]
    return abs(last_value - prev_value) > 3


# Create the online segmentation scrubber
segmenter = OnlineSegmentationScrubber(
    segmentation_rule=detect_jump,
    max_segment_size=5,  # Force segmentation after 5 points if no jump detected
    source=data_source,
)

# Process the segments as they're detected
for segment in segmenter:
    print(f"Segment values: {list(segment.values)}")

# Output:
# Segment values: [1, 1, 2, 3, 8]  # Split due to jump from 3 to 8 and max size
# Segment values: [9, 8, 2]        # Split due to jump from 8 to 2
# Segment values: [2, 3, 10]       # Split due to jump from 3 to 10
# Segment values: [10, 9, 9]       # Remaining data

Initialization

Initialize an online segmentation scrubber.

Parameters:
  • segmentation_rule – Function that evaluates the current window and returns True when a segment should end

  • max_segment_size – Maximum number of points in a segment before forcing a split, defaults to 2^64

  • source – The handler providing input data, defaults to None

__iter__() collections.abc.Iterator[pysatl_tsp.core.scrubber.abstract.ScrubberWindow[pysatl_tsp.core.T]][source]

Create an iterator that yields segments as they’re detected in real-time.

This method processes data points one by one, accumulating them in a buffer and checking after each addition whether the segmentation condition is met or the maximum segment size is reached.

Returns:

Iterator yielding ScrubberWindow instances for each detected segment

Raises:

ValueError – If no source has been set