From 93f0e8434a82b6973a3b67a0afb09bf47a8810a7 Mon Sep 17 00:00:00 2001 From: "jiyao.hu" Date: Wed, 17 Dec 2025 14:58:55 -0800 Subject: [PATCH 1/2] Support flexible time split and merge policy for intelligent threshold --- .../test_robust_daily_period_detector.py | 2 +- ...test_threshold_recommendation_algorithm.py | 92 ---------- .../test_threshold_recommender.py | 24 +-- .../intelligent_threshold/configs.py | 6 +- .../robust_daily_period_detector.py | 4 + .../threshold_recommendation_algorithm.py | 24 ++- .../threshold_recommender.py | 173 +++++++++++++++++- 7 files changed, 205 insertions(+), 120 deletions(-) diff --git a/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py b/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py index e2bee696..67124db6 100644 --- a/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py +++ b/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py @@ -113,7 +113,7 @@ def short_data(): def test_init_default_parameters(): """Test initialization with default parameters.""" detector = RobustDailyPeriodDetector() - assert detector.correlation_threshold == 0.75 + assert detector.correlation_threshold == 0.3 assert detector.min_data_points_per_day == 720 assert detector.min_common_points == 720 diff --git a/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py b/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py index 1cc50cca..36ff3b55 100644 --- a/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py +++ b/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py @@ -526,98 +526,6 @@ def test_check_and_consolidate_threshold_groups_single_group(threshold_recommend assert result is None -def test_recommend_threshold_with_consolidation_up(threshold_recommender, sample_timestamps): - """Test full threshold recommendation with consolidation for up direction.""" - # Create values that result in similar thresholds across time periods - values = [] - for i in range(len(sample_timestamps)): - # Create values that vary slightly but result in similar thresholds - hour = (i // 60) % 24 - if hour < 6: - values.append(45) # Night: around 45 - elif hour < 12: - values.append(48) # Morning: around 48 - elif hour < 18: - values.append(52) # Afternoon: around 52 - else: - values.append(47) # Evening: around 47 - - with patch.object(threshold_recommender.period_detector, "detect") as mock_detect: - mock_detect.return_value = True # Daily periodicity detected - - # Mock sliding window to return similar thresholds - with patch.object(threshold_recommender, "threshold_recommendation_with_sliding_window") as mock_sliding_window: - # Return very similar thresholds for all time periods (within 10%) - mock_sliding_window.side_effect = [(49.0, 5), (50.0, 5), (51.0, 5), (50.5, 5)] * 2 - - result = threshold_recommender.recommend_threshold( - timestamp_list=sample_timestamps[:1000], # Use subset - value_list=values[:1000], - default_window_size=5, - time_split=True, - auto_window_adjust=False, - min_value=0.0, - max_value=100.0, - normal_threshold=None, - min_ts_length=50, - sensitivity=0.5, - direction="up", - ) - - # Should consolidate to single group since thresholds are close (49-51, diff=2, 2/51=3.9% < 10%) - assert len(result) == 1 - assert result[0]["start_hour"] == 0 - assert result[0]["end_hour"] == 24 - assert result[0]["upper_bound"] == 50.5 # Max value from the sorted thresholds - assert result[0]["lower_bound"] is None - assert result[0]["window_size"] == 5 - - -def test_recommend_threshold_with_consolidation_down(threshold_recommender, sample_timestamps): - """Test full threshold recommendation with consolidation for down direction.""" - # Create values that result in similar thresholds across time periods - values = [] - for i in range(len(sample_timestamps)): - hour = (i // 60) % 24 - if hour < 6: - values.append(25) # Night: around 25 - elif hour < 12: - values.append(23) # Morning: around 23 - elif hour < 18: - values.append(27) # Afternoon: around 27 - else: - values.append(24) # Evening: around 24 - - with patch.object(threshold_recommender.period_detector, "detect") as mock_detect: - mock_detect.return_value = True # Daily periodicity detected - - with patch.object(threshold_recommender, "threshold_recommendation_with_sliding_window") as mock_sliding_window: - # Return very similar thresholds for all time periods (within 10%) - mock_sliding_window.side_effect = [(24.5, 3), (23.5, 3), (25.5, 3), (24.0, 3)] * 2 - - result = threshold_recommender.recommend_threshold( - timestamp_list=sample_timestamps[:1000], - value_list=values[:1000], - default_window_size=5, - time_split=True, - auto_window_adjust=False, - min_value=0.0, - max_value=100.0, - normal_threshold=None, - min_ts_length=50, - sensitivity=0.5, - direction="down", - ) - - # Should consolidate to single group since thresholds are close (23.5-25.5, diff=2, 2/25.5=7.8% < 10%) - assert len(result) == 1 - assert result[0]["start_hour"] == 0 - assert result[0]["end_hour"] == 24 - assert result[0]["upper_bound"] is None - assert result[0]["lower_bound"] == 23.5 # Min value for down direction - assert result[0]["window_size"] == 3 - - def test_recommend_threshold_no_consolidation_different_thresholds( threshold_recommender, sample_timestamps, sample_values_periodic ): diff --git a/tests/algorithm/intelligent_threshold/test_threshold_recommender.py b/tests/algorithm/intelligent_threshold/test_threshold_recommender.py index f9ded0b7..4242b079 100644 --- a/tests/algorithm/intelligent_threshold/test_threshold_recommender.py +++ b/tests/algorithm/intelligent_threshold/test_threshold_recommender.py @@ -1428,21 +1428,21 @@ def test_merge_threshold_results_consolidation_mismatch_down_consolidated_up_not IntelligentThresholdConfig( start_hour=6, end_hour=12, - upper_bound=75.0, + upper_bound=72.0, lower_bound=None, window_size=5, ), IntelligentThresholdConfig( start_hour=12, end_hour=18, - upper_bound=80.0, + upper_bound=35.0, lower_bound=None, window_size=5, ), IntelligentThresholdConfig( start_hour=18, end_hour=24, - upper_bound=85.0, + upper_bound=36.0, lower_bound=None, window_size=5, ), @@ -1478,14 +1478,16 @@ def test_merge_threshold_results_consolidation_mismatch_down_consolidated_up_not assert len(merged_results) == 1 result = merged_results[0] assert result.status == "Success" - assert len(result.thresholds) == 4 # Should match the non-consolidated direction (up) - - # Check that down threshold (25.0) is distributed across all up periods - for i, threshold in enumerate(result.thresholds): - assert threshold.upper_bound == up_results[0].thresholds[i].upper_bound # Keep original up bounds - assert threshold.lower_bound == 25.0 # All should have the consolidated down threshold - assert threshold.start_hour == up_results[0].thresholds[i].start_hour - assert threshold.end_hour == up_results[0].thresholds[i].end_hour + assert len(result.thresholds) == 2 # Should match the non-consolidated direction (up) + + assert result.thresholds[0].upper_bound == up_results[0].thresholds[1].upper_bound # Keep original up bounds + assert result.thresholds[0].lower_bound == 25.0 # All should have the consolidated down threshold + assert result.thresholds[0].start_hour == up_results[0].thresholds[0].start_hour + assert result.thresholds[0].end_hour == up_results[0].thresholds[1].end_hour + assert result.thresholds[1].upper_bound == up_results[0].thresholds[3].upper_bound # Keep original up bounds + assert result.thresholds[1].lower_bound == 25.0 # All should have the consolidated down threshold + assert result.thresholds[1].start_hour == up_results[0].thresholds[2].start_hour + assert result.thresholds[1].end_hour == up_results[0].thresholds[3].end_hour def test_merge_threshold_results_both_consolidated(threshold_recommender): diff --git a/veaiops/algorithm/intelligent_threshold/configs.py b/veaiops/algorithm/intelligent_threshold/configs.py index 95f2f4f8..110ec56f 100644 --- a/veaiops/algorithm/intelligent_threshold/configs.py +++ b/veaiops/algorithm/intelligent_threshold/configs.py @@ -47,8 +47,8 @@ # Default timezone for time-based operations DEFAULT_TIMEZONE = tzlocal.get_localzone_name() -# Default time split ranges for daily analysis (hours) -DEFAULT_TIME_SPLIT_RANGES = [[0, 6], [6, 12], [12, 18], [18, 24]] +# Default number of time split (default = 4) +DEFAULT_NUMBER_OF_TIME_SPLIT = int(os.getenv("DEFAULT_NUMBER_OF_TIME_SPLIT", 4)) # ============================================================================= # Algorithm Parameters @@ -81,7 +81,7 @@ # ============================================================================= # Default correlation threshold for period detection -DEFAULT_CORRELATION_THRESHOLD = 0.75 +DEFAULT_CORRELATION_THRESHOLD = float(os.getenv("DEFAULT_CORRELATION_THRESHOLD", 0.3)) # Minimum data points per day for analysis DEFAULT_MIN_DATA_POINTS_PER_DAY = 720 diff --git a/veaiops/algorithm/intelligent_threshold/robust_daily_period_detector.py b/veaiops/algorithm/intelligent_threshold/robust_daily_period_detector.py index 7de429da..aea81229 100644 --- a/veaiops/algorithm/intelligent_threshold/robust_daily_period_detector.py +++ b/veaiops/algorithm/intelligent_threshold/robust_daily_period_detector.py @@ -78,10 +78,13 @@ def __init__( ValueError: If any parameter is outside valid range. """ if not 0 <= correlation_threshold <= 1: + print("111") raise ValueError("correlation_threshold must be between 0 and 1") if min_data_points_per_day <= 0: + print("222") raise ValueError("min_data_points_per_day must be positive") if min_common_points <= 0: + print("333") raise ValueError("min_common_points must be positive") self.correlation_threshold = correlation_threshold @@ -373,6 +376,7 @@ def _correlate_common_timepoints(self, daily_data: Dict, common_time_points: Set return False avg_correlation = sum(correlations) / len(correlations) + print(avg_correlation) logger.debug(f"Average correlation: {avg_correlation:.3f} (threshold: {self.correlation_threshold})") return avg_correlation >= self.correlation_threshold diff --git a/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py b/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py index f57481f2..664ee730 100644 --- a/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py +++ b/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py @@ -26,7 +26,7 @@ from dbscan1d.core import DBSCAN1D from veaiops.algorithm.intelligent_threshold.configs import ( - DEFAULT_TIME_SPLIT_RANGES, + DEFAULT_NUMBER_OF_TIME_SPLIT, DEFAULT_TIMEZONE, MICROSECOND_THRESHOLD, MILLISECOND_THRESHOLD, @@ -51,19 +51,28 @@ class ThresholdRecommendAlgorithm: Attributes: timezone (str): Timezone for timestamp processing. - time_split_ranges (List[List[int]]): Time ranges for splitting analysis. + time_split_ranges (List[List[float]]): Time ranges for splitting analysis. period_detector (RobustDailyPeriodDetector): Daily period detection instance. """ - def __init__(self, timezone: str = DEFAULT_TIMEZONE, time_split_ranges: Optional[List[List[int]]] = None) -> None: + def __init__(self, timezone: str = DEFAULT_TIMEZONE, time_split_ranges: Optional[List[List[float]]] = None) -> None: """Initialize the ThresholdRecommendAlgorithm. Args: timezone (str): Timezone for timestamp processing. - time_split_ranges (Optional[List[List[int]]]): Custom time split ranges. + time_split_ranges (Optional[List[List[float]]]): Custom time split ranges. """ self.timezone = timezone - self.time_split_ranges = time_split_ranges or DEFAULT_TIME_SPLIT_RANGES + + if time_split_ranges is None: + self.time_split_ranges = [] + start_time = 0.0 + for _ in range(DEFAULT_NUMBER_OF_TIME_SPLIT): + end_time = start_time + 24 / DEFAULT_NUMBER_OF_TIME_SPLIT + self.time_split_ranges.append([start_time, end_time]) + start_time = end_time + else: + self.time_split_ranges = time_split_ranges self.period_detector = RobustDailyPeriodDetector() logger.debug(f"Initialized ThresholdRecommendAlgorithm with timezone={timezone}") @@ -363,11 +372,6 @@ def _process_time_split_periods( threshold_groups.append(threshold_group) - # Check if consolidation is needed - consolidated_group = self._check_and_consolidate_threshold_groups(threshold_groups, direction) - if consolidated_group: - return [consolidated_group] - return threshold_groups def _check_and_consolidate_threshold_groups( diff --git a/veaiops/algorithm/intelligent_threshold/threshold_recommender.py b/veaiops/algorithm/intelligent_threshold/threshold_recommender.py index dd0d73cd..f2788209 100644 --- a/veaiops/algorithm/intelligent_threshold/threshold_recommender.py +++ b/veaiops/algorithm/intelligent_threshold/threshold_recommender.py @@ -327,6 +327,174 @@ def _get_normal_threshold( return normal_range_end if direction == "up" else normal_range_start + @staticmethod + def can_merge_threshold_configs(configs: List[IntelligentThresholdConfig]) -> bool: + """Check if a list of threshold configurations can be merged. + + Merge conditions: + 1. window_size must be the same + 2. upper_bound max-min difference <= 10% + 3. lower_bound max-min difference <= 10% + + Args: + configs: List of threshold configurations + + Returns: + bool: Whether the configs can be merged + """ + if len(configs) <= 1: + return True + + # Check if all window_sizes are the same + window_sizes = [c.window_size for c in configs] + if len(set(window_sizes)) != 1: + return False + + # Extract upper_bound and lower_bound + upper_bounds = [c.upper_bound for c in configs if c.upper_bound is not None] + lower_bounds = [c.lower_bound for c in configs if c.lower_bound is not None] + + # Check upper_bound merge condition + if upper_bounds: + max_upper = max(upper_bounds) + min_upper = min(upper_bounds) + + if max_upper == 0: + if max_upper != min_upper: + return False + else: + upper_diff_ratio = (max_upper - min_upper) / max_upper + if upper_diff_ratio > 0.1: # 10% threshold + return False + + # Check lower_bound merge condition + if lower_bounds: + max_lower = max(lower_bounds) + min_lower = min(lower_bounds) + + if max_lower == 0: + if max_lower != min_lower: + return False + else: + lower_diff_ratio = (max_lower - min_lower) / max_lower + if lower_diff_ratio > 0.1: # 10% threshold + return False + + return True + + @staticmethod + def merge_threshold_configs(configs: List[IntelligentThresholdConfig]) -> IntelligentThresholdConfig: + """Merge multiple threshold configurations into a single configuration. + + Args: + configs: List of threshold configurations to merge + + Returns: + IntelligentThresholdConfig: Merged threshold configuration + """ + if len(configs) == 1: + return configs[0] + + # Extract upper_bound and lower_bound + upper_bounds = [c.upper_bound for c in configs if c.upper_bound is not None] + lower_bounds = [c.lower_bound for c in configs if c.lower_bound is not None] + + # For upper bound, use max (conservative strategy) + merged_upper = max(upper_bounds) if upper_bounds else None + + # For lower bound, use min (conservative strategy) + merged_lower = min(lower_bounds) if lower_bounds else None + + return IntelligentThresholdConfig( + start_hour=configs[0].start_hour, + end_hour=configs[-1].end_hour, + upper_bound=merged_upper, + lower_bound=merged_lower, + window_size=configs[0].window_size, + ) + + def merge_continuous_thresholds( + self, thresholds: List[IntelligentThresholdConfig] + ) -> List[IntelligentThresholdConfig]: + """Merge continuous threshold configurations. + + Uses greedy algorithm to merge consecutive time periods that satisfy: + - Both upper and lower bounds variance within 10% + - Same window_size + + Args: + thresholds: Original list of threshold configurations + + Returns: + List[IntelligentThresholdConfig]: List of merged threshold configurations + """ + # Filter out configs that have neither upper_bound nor lower_bound + valid_thresholds = [t for t in thresholds if t.upper_bound is not None or t.lower_bound is not None] + + if len(valid_thresholds) <= 1: + return thresholds + + # Sort by start_hour + sorted_thresholds = sorted(valid_thresholds, key=lambda x: x.start_hour) + + merged_result = [] + current_group = [sorted_thresholds[0]] + + for i in range(1, len(sorted_thresholds)): + # Check if continuous + if current_group[-1].end_hour == sorted_thresholds[i].start_hour: + # Try to add current threshold to merge group + test_group = current_group + [sorted_thresholds[i]] + + if self.can_merge_threshold_configs(test_group): + # Can merge, add to current group + current_group.append(sorted_thresholds[i]) + else: + # Cannot merge, finalize current group and start new group + merged_result.append(self.merge_threshold_configs(current_group)) + current_group = [sorted_thresholds[i]] + else: + # Not continuous, finalize current group and start new group + merged_result.append(self.merge_threshold_configs(current_group)) + current_group = [sorted_thresholds[i]] + + # Process the last group + merged_result.append(self.merge_threshold_configs(current_group)) + + return merged_result + + def merge_metric_threshold_results(self, results: List[MetricThresholdResult]) -> List[MetricThresholdResult]: + """Merge threshold configurations in MetricThresholdResult list. + + Merges the thresholds for each metric (MetricThresholdResult), + keeping other fields unchanged. + + Args: + results: List containing multiple metric threshold results + + Returns: + List[MetricThresholdResult]: List of merged results + """ + merged_results = [] + + for result in results: + # Merge thresholds for each metric + merged_thresholds = self.merge_continuous_thresholds(result.thresholds) + + # Create new MetricThresholdResult, keeping other fields unchanged + merged_result = MetricThresholdResult( + name=result.name, + thresholds=merged_thresholds, + labels=result.labels, + unique_key=result.unique_key, + status=result.status, + error_message=result.error_message, + ) + + merged_results.append(merged_result) + + return merged_results + def _merge_threshold_results( self, up_results: list[MetricThresholdResult], down_results: list[MetricThresholdResult] ) -> list[MetricThresholdResult]: @@ -351,7 +519,6 @@ def _merge_threshold_results( for up_result in up_results: down_result = down_results_map.get(up_result.unique_key) - if down_result: # Check if either direction failed if up_result.status != "Success" or down_result.status != "Success": @@ -497,8 +664,8 @@ def _merge_threshold_results( else: # Only down bound available and it succeeded merged_results.append(down_result) - - return merged_results + print(merged_results) + return self.merge_metric_threshold_results(merged_results) async def _fetch_and_validate_data( self, datasource_id: str From 42249acf5995b7db05aed957be9be31f6279c711 Mon Sep 17 00:00:00 2001 From: "jiyao.hu" Date: Thu, 18 Dec 2025 18:40:05 -0800 Subject: [PATCH 2/2] support DEFAULT_MAXIMUM_THRESHOLD_BLOCKS setting --- .../intelligent_threshold/configs.py | 3 + .../robust_daily_period_detector.py | 4 - .../threshold_recommender.py | 145 +++++++++++++++++- 3 files changed, 141 insertions(+), 11 deletions(-) diff --git a/veaiops/algorithm/intelligent_threshold/configs.py b/veaiops/algorithm/intelligent_threshold/configs.py index 110ec56f..4a79abe3 100644 --- a/veaiops/algorithm/intelligent_threshold/configs.py +++ b/veaiops/algorithm/intelligent_threshold/configs.py @@ -50,6 +50,9 @@ # Default number of time split (default = 4) DEFAULT_NUMBER_OF_TIME_SPLIT = int(os.getenv("DEFAULT_NUMBER_OF_TIME_SPLIT", 4)) +# Maximum number of threshold blocks after merging (default = 8) +DEFAULT_MAXIMUM_THRESHOLD_BLOCKS = int(os.getenv("DEFAULT_MAXIMUM_THRESHOLD_BLOCKS", 8)) + # ============================================================================= # Algorithm Parameters # ============================================================================= diff --git a/veaiops/algorithm/intelligent_threshold/robust_daily_period_detector.py b/veaiops/algorithm/intelligent_threshold/robust_daily_period_detector.py index aea81229..7de429da 100644 --- a/veaiops/algorithm/intelligent_threshold/robust_daily_period_detector.py +++ b/veaiops/algorithm/intelligent_threshold/robust_daily_period_detector.py @@ -78,13 +78,10 @@ def __init__( ValueError: If any parameter is outside valid range. """ if not 0 <= correlation_threshold <= 1: - print("111") raise ValueError("correlation_threshold must be between 0 and 1") if min_data_points_per_day <= 0: - print("222") raise ValueError("min_data_points_per_day must be positive") if min_common_points <= 0: - print("333") raise ValueError("min_common_points must be positive") self.correlation_threshold = correlation_threshold @@ -376,7 +373,6 @@ def _correlate_common_timepoints(self, daily_data: Dict, common_time_points: Set return False avg_correlation = sum(correlations) / len(correlations) - print(avg_correlation) logger.debug(f"Average correlation: {avg_correlation:.3f} (threshold: {self.correlation_threshold})") return avg_correlation >= self.correlation_threshold diff --git a/veaiops/algorithm/intelligent_threshold/threshold_recommender.py b/veaiops/algorithm/intelligent_threshold/threshold_recommender.py index f2788209..62c71cdc 100644 --- a/veaiops/algorithm/intelligent_threshold/threshold_recommender.py +++ b/veaiops/algorithm/intelligent_threshold/threshold_recommender.py @@ -31,6 +31,7 @@ from tenacity import retry, stop_after_attempt, wait_exponential from veaiops.algorithm.intelligent_threshold.configs import ( + DEFAULT_MAXIMUM_THRESHOLD_BLOCKS, EXTREME_VALUE_THRESHOLD, FETCH_DATA_TIMEOUT, HISTORICAL_DAYS, @@ -84,27 +85,36 @@ class ThresholdRecommender: Attributes: threshold_algorithm (ThresholdRecommendAlgorithm): Threshold recommendation algorithm instance. max_concurrent_tasks (int): Maximum number of concurrent tasks (default: 5). + maximum_threshold_blocks (int): Maximum number of threshold blocks after merging (default: 8). task_queue (List[TaskRequest]): Priority queue for pending tasks. running_tasks (Dict[str, asyncio.Task]): Currently running tasks. queue_lock (asyncio.Lock): Lock for thread-safe queue operations. """ def __init__( - self, threshold_algorithm: Optional[ThresholdRecommendAlgorithm] = None, max_concurrent_tasks: int = 5 + self, + threshold_algorithm: Optional[ThresholdRecommendAlgorithm] = None, + max_concurrent_tasks: int = 5, + maximum_threshold_blocks: int = DEFAULT_MAXIMUM_THRESHOLD_BLOCKS, ) -> None: """Initialize the ThresholdRecommender. Args: threshold_algorithm (Optional[ThresholdRecommendAlgorithm]): Threshold recommender algorithm instance. max_concurrent_tasks (int): Maximum number of concurrent tasks (default: 5). + maximum_threshold_blocks (int): Maximum number of threshold blocks after merging (default: 8). """ self.threshold_algorithm = threshold_algorithm or ThresholdRecommendAlgorithm() self.max_concurrent_tasks = max_concurrent_tasks + self.maximum_threshold_blocks = maximum_threshold_blocks self.task_queue: List[TaskRequest] = [] self.running_tasks: Dict[str, asyncio.Task] = {} self.queue_lock = asyncio.Lock() - logger.debug(f"Initialized ThresholdRecommender with max_concurrent_tasks={max_concurrent_tasks}") + logger.debug( + f"Initialized ThresholdRecommender with max_concurrent_tasks={max_concurrent_tasks}, " + f"maximum_threshold_blocks={maximum_threshold_blocks}" + ) async def _process_queue(self) -> None: """Process the task queue and start new tasks if capacity allows.""" @@ -413,14 +423,122 @@ def merge_threshold_configs(configs: List[IntelligentThresholdConfig]) -> Intell window_size=configs[0].window_size, ) + @staticmethod + def _calculate_merge_difference(config1: IntelligentThresholdConfig, config2: IntelligentThresholdConfig) -> float: + """Calculate the difference between two adjacent threshold configurations for merging. + + The difference is calculated based on the relative change in both upper_bound and lower_bound. + Returns a normalized difference value that can be used to determine merge priority. + + Args: + config1: First threshold configuration + config2: Second threshold configuration + + Returns: + float: Normalized difference value (lower is more similar, higher is more different) + """ + differences = [] + + # Calculate upper_bound difference + if config1.upper_bound is not None and config2.upper_bound is not None: + upper1 = config1.upper_bound + upper2 = config2.upper_bound + max_upper = max(abs(upper1), abs(upper2)) + if max_upper > 0: + upper_diff = abs(upper1 - upper2) / max_upper + differences.append(upper_diff) + + # Calculate lower_bound difference + if config1.lower_bound is not None and config2.lower_bound is not None: + lower1 = config1.lower_bound + lower2 = config2.lower_bound + max_lower = max(abs(lower1), abs(lower2)) + if max_lower > 0: + lower_diff = abs(lower1 - lower2) / max_lower + differences.append(lower_diff) + + # Return average difference, or 0 if no valid differences + return sum(differences) / len(differences) if differences else 0.0 + + def _hierarchical_merge_thresholds( + self, thresholds: List[IntelligentThresholdConfig], max_blocks: int + ) -> List[IntelligentThresholdConfig]: + """Merge threshold configurations using hierarchical clustering approach. + + This method uses a bottom-up hierarchical clustering approach: + 1. Start with all individual threshold blocks + 2. Iteratively merge the two adjacent blocks with smallest difference + 3. Continue until the number of blocks <= max_blocks + + Args: + thresholds: Original list of threshold configurations (must be sorted by start_hour) + max_blocks: Maximum number of blocks after merging + + Returns: + List[IntelligentThresholdConfig]: List of merged threshold configurations + """ + if len(thresholds) <= max_blocks: + return thresholds + + # Create a mutable list of threshold blocks (as lists to track merged ranges) + blocks = [[threshold] for threshold in thresholds] + + logger.debug(f"Starting hierarchical merge: {len(blocks)} blocks -> target {max_blocks} blocks") + + # Iteratively merge until we reach the target number of blocks + while len(blocks) > max_blocks: + # Calculate differences between all adjacent blocks + min_diff = float("inf") + min_diff_idx = -1 + + for i in range(len(blocks) - 1): + # Get the last config of current block and first config of next block + current_last = blocks[i][-1] + next_first = blocks[i + 1][0] + + # Only merge if blocks are continuous + if current_last.end_hour == next_first.start_hour: + # Calculate difference between the two blocks + diff = self._calculate_merge_difference(current_last, next_first) + + if diff < min_diff: + min_diff = diff + min_diff_idx = i + + # If no mergeable adjacent blocks found, break + if min_diff_idx == -1: + logger.warning(f"Cannot merge further: no continuous blocks found. Current blocks: {len(blocks)}") + break + + # Merge the two blocks with minimum difference + merged_block = blocks[min_diff_idx] + blocks[min_diff_idx + 1] + blocks[min_diff_idx : min_diff_idx + 2] = [merged_block] + + logger.debug( + f"Merged blocks at index {min_diff_idx} (diff={min_diff:.4f}), remaining blocks: {len(blocks)}" + ) + + # Convert blocks back to merged threshold configurations + result = [] + for block in blocks: + merged_config = self.merge_threshold_configs(block) + result.append(merged_config) + + logger.info( + f"Hierarchical merge completed: {len(thresholds)} -> {len(result)} blocks " + f"(target: {max_blocks}, maximum_threshold_blocks={self.maximum_threshold_blocks})" + ) + + return result + def merge_continuous_thresholds( self, thresholds: List[IntelligentThresholdConfig] ) -> List[IntelligentThresholdConfig]: - """Merge continuous threshold configurations. + """Merge continuous threshold configurations with adaptive strategy. - Uses greedy algorithm to merge consecutive time periods that satisfy: - - Both upper and lower bounds variance within 10% - - Same window_size + Strategy: + 1. First apply greedy algorithm to merge consecutive time periods with variance within 10% + 2. If result exceeds maximum_threshold_blocks, apply hierarchical clustering to further merge Args: thresholds: Original list of threshold configurations @@ -437,6 +555,7 @@ def merge_continuous_thresholds( # Sort by start_hour sorted_thresholds = sorted(valid_thresholds, key=lambda x: x.start_hour) + # Step 1: Apply greedy merge (10% threshold) merged_result = [] current_group = [sorted_thresholds[0]] @@ -461,6 +580,19 @@ def merge_continuous_thresholds( # Process the last group merged_result.append(self.merge_threshold_configs(current_group)) + logger.debug( + f"Greedy merge completed: {len(valid_thresholds)} -> {len(merged_result)} blocks " + f"(maximum_threshold_blocks={self.maximum_threshold_blocks})" + ) + + # Step 2: If still exceeds maximum_threshold_blocks, apply hierarchical clustering + if len(merged_result) > self.maximum_threshold_blocks: + logger.info( + f"Greedy merge result ({len(merged_result)} blocks) exceeds maximum_threshold_blocks " + f"({self.maximum_threshold_blocks}), applying hierarchical clustering" + ) + merged_result = self._hierarchical_merge_thresholds(merged_result, self.maximum_threshold_blocks) + return merged_result def merge_metric_threshold_results(self, results: List[MetricThresholdResult]) -> List[MetricThresholdResult]: @@ -664,7 +796,6 @@ def _merge_threshold_results( else: # Only down bound available and it succeeded merged_results.append(down_result) - print(merged_results) return self.merge_metric_threshold_results(merged_results) async def _fetch_and_validate_data(