Source code for pysatl_tsp.core.processor.sampling_handler

from collections.abc import Iterator
from typing import Any, Callable

from pysatl_tsp.core import Handler, T
from pysatl_tsp.core.scrubber import OfflineSegmentationScrubber, OnlineSegmentationScrubber, ScrubberWindow

from .mapping_handler import MappingHandler


[docs] class OnlineSamplingHandler(Handler[T, T]): """A handler that samples time series data in real-time based on a condition. This handler uses segmentation to identify points where sampling should occur and extracts the last item from each segment. It processes data in real-time and is suitable for adaptive sampling strategies, where sampling decisions are made based on the recent history of the time series. :param sampling_rule: Function that decides when to take a sample :param source: The handler providing input data, defaults to None Example: .. code-block:: python # Create a data source with steadily increasing values data = list(range(100)) data_source = SimpleDataProvider(data) # Define a sampling rule that samples when the value changes by more than 5 def significant_change(window: ScrubberWindow[int]) -> bool: if len(window) < 2: return False # Get last sample taken (first item in window) and current value last_sampled = window[0] current = window[-1] # Sample if change is significant return abs(current - last_sampled) >= 5 # Create a sampling handler sampler = OnlineSamplingHandler(sampling_rule=significant_change, source=data_source) # Process and collect sampled points sampled_points = list(sampler) print(f"Original data points: {len(data)}") print(f"Sampled data points: {len(sampled_points)}") print(f"Sampled values: {sampled_points[:10]}...") # Output might look like: # Original data points: 100 # Sampled data points: 20 # Sampled values: [0, 5, 10, 15, 20, 25, 30, 35, 40, 45]... """ def __init__(self, sampling_rule: Callable[[ScrubberWindow[T]], bool], source: Handler[Any, T] | None = None): """Initialize an online sampling handler. :param sampling_rule: Function that decides when to take a sample :param source: The handler providing input data, defaults to None """ super().__init__(source) self.sampling_rule = sampling_rule
[docs] def __iter__(self) -> Iterator[T]: """Create an iterator that yields sampled values based on the sampling rule. This method uses OnlineSegmentationScrubber to segment the data and a MappingHandler to extract the last item from each segment. :return: Iterator yielding sampled values :raises ValueError: If no source has been set (propagated from segmentation scrubber) """ mapping_handler: MappingHandler[ScrubberWindow[T], T] = MappingHandler(map_func=lambda window: window[-1]) pipeline = ( OnlineSegmentationScrubber(segmentation_rule=self.sampling_rule, source=self.source) | mapping_handler ) yield from pipeline
[docs] class OfflineSamplingHandler(Handler[T, T]): """A handler that samples time series data in batch mode based on identified indices. This handler processes the entire dataset to identify sampling points before extracting the samples. It's suitable for global sampling strategies that consider the entire time series context, such as selecting representative points or key points that preserve the overall shape of the data. :param sampling_rule: Function that analyzes the entire series and returns indices of points to sample :param source: The handler providing input data, defaults to None Example: .. code-block:: python import numpy as np import matplotlib.pyplot as plt from typing import List # Create a data source with a sinusoidal signal x = np.linspace(0, 4*np.pi, 1000) y = np.sin(x) data_source = SimpleDataProvider(y) # Define an offline sampling rule that selects local extrema def find_extrema(window: ScrubberWindow[float]) -> List[int]: data = np.array(window.values) # Find local maxima and minima extrema_indices = [] # First point is always included extrema_indices.append(0) # Find local maxima and minima (simplified) for i in range(1, len(data)-1): if (data[i] > data[i-1] and data[i] > data[i+1]) or \ (data[i] < data[i-1] and data[i] < data[i+1]): extrema_indices.append(i) # Last point is always included extrema_indices.append(len(data)-1) return extrema_indices # Create a sampling handler sampler = OfflineSamplingHandler( sampling_rule=find_extrema, source=data_source ) # Process and collect sampled points sampled_indices = [] sampled_values = [] original_values = list(y) for i, value in enumerate(sampler): sampled_values.append(value) # Approximate index (not exact) sampled_indices.append(i * len(original_values) // len(sampled_values)) # Visualize the results plt.figure(figsize=(12, 6)) plt.plot(x, y, 'b-', label='Original signal') plt.plot(x[sampled_indices], sampled_values, 'ro', label='Sampled points') plt.legend() plt.title('Sinusoidal Signal with Extrema Sampling') plt.xlabel('x') plt.ylabel('sin(x)') plt.grid(True) plt.show() print(f"Original data points: {len(original_values)}") print(f"Sampled data points: {len(sampled_values)}") """ def __init__(self, sampling_rule: Callable[[ScrubberWindow[T]], list[int]], source: Handler[Any, T] | None = None): """Initialize an offline sampling handler. :param sampling_rule: Function that analyzes the entire series and returns indices of points to sample :param source: The handler providing input data, defaults to None """ super().__init__(source) self.sampling_rule = sampling_rule
[docs] def __iter__(self) -> Iterator[T]: """Create an iterator that yields sampled values based on the indices identified by the sampling rule. This method uses OfflineSegmentationScrubber to segment the data at the specified indices and a MappingHandler to extract the last item from each segment. :return: Iterator yielding sampled values :raises ValueError: If no source has been set (propagated from segmentation scrubber) """ mapping_handler: MappingHandler[ScrubberWindow[T], T] = MappingHandler(map_func=lambda window: window[-1]) pipeline = ( OfflineSegmentationScrubber(segmentation_rule=self.sampling_rule, source=self.source) | mapping_handler ) yield from pipeline