Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def short_data():
def test_init_default_parameters():
"""Test initialization with default parameters."""
detector = RobustDailyPeriodDetector()
assert detector.correlation_threshold == 0.75
assert detector.correlation_threshold == 0.3
assert detector.min_data_points_per_day == 720
assert detector.min_common_points == 720

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,98 +526,6 @@ def test_check_and_consolidate_threshold_groups_single_group(threshold_recommend
assert result is None


def test_recommend_threshold_with_consolidation_up(threshold_recommender, sample_timestamps):
"""Test full threshold recommendation with consolidation for up direction."""
# Create values that result in similar thresholds across time periods
values = []
for i in range(len(sample_timestamps)):
# Create values that vary slightly but result in similar thresholds
hour = (i // 60) % 24
if hour < 6:
values.append(45) # Night: around 45
elif hour < 12:
values.append(48) # Morning: around 48
elif hour < 18:
values.append(52) # Afternoon: around 52
else:
values.append(47) # Evening: around 47

with patch.object(threshold_recommender.period_detector, "detect") as mock_detect:
mock_detect.return_value = True # Daily periodicity detected

# Mock sliding window to return similar thresholds
with patch.object(threshold_recommender, "threshold_recommendation_with_sliding_window") as mock_sliding_window:
# Return very similar thresholds for all time periods (within 10%)
mock_sliding_window.side_effect = [(49.0, 5), (50.0, 5), (51.0, 5), (50.5, 5)] * 2

result = threshold_recommender.recommend_threshold(
timestamp_list=sample_timestamps[:1000], # Use subset
value_list=values[:1000],
default_window_size=5,
time_split=True,
auto_window_adjust=False,
min_value=0.0,
max_value=100.0,
normal_threshold=None,
min_ts_length=50,
sensitivity=0.5,
direction="up",
)

# Should consolidate to single group since thresholds are close (49-51, diff=2, 2/51=3.9% < 10%)
assert len(result) == 1
assert result[0]["start_hour"] == 0
assert result[0]["end_hour"] == 24
assert result[0]["upper_bound"] == 50.5 # Max value from the sorted thresholds
assert result[0]["lower_bound"] is None
assert result[0]["window_size"] == 5


def test_recommend_threshold_with_consolidation_down(threshold_recommender, sample_timestamps):
"""Test full threshold recommendation with consolidation for down direction."""
# Create values that result in similar thresholds across time periods
values = []
for i in range(len(sample_timestamps)):
hour = (i // 60) % 24
if hour < 6:
values.append(25) # Night: around 25
elif hour < 12:
values.append(23) # Morning: around 23
elif hour < 18:
values.append(27) # Afternoon: around 27
else:
values.append(24) # Evening: around 24

with patch.object(threshold_recommender.period_detector, "detect") as mock_detect:
mock_detect.return_value = True # Daily periodicity detected

with patch.object(threshold_recommender, "threshold_recommendation_with_sliding_window") as mock_sliding_window:
# Return very similar thresholds for all time periods (within 10%)
mock_sliding_window.side_effect = [(24.5, 3), (23.5, 3), (25.5, 3), (24.0, 3)] * 2

result = threshold_recommender.recommend_threshold(
timestamp_list=sample_timestamps[:1000],
value_list=values[:1000],
default_window_size=5,
time_split=True,
auto_window_adjust=False,
min_value=0.0,
max_value=100.0,
normal_threshold=None,
min_ts_length=50,
sensitivity=0.5,
direction="down",
)

# Should consolidate to single group since thresholds are close (23.5-25.5, diff=2, 2/25.5=7.8% < 10%)
assert len(result) == 1
assert result[0]["start_hour"] == 0
assert result[0]["end_hour"] == 24
assert result[0]["upper_bound"] is None
assert result[0]["lower_bound"] == 23.5 # Min value for down direction
assert result[0]["window_size"] == 3


def test_recommend_threshold_no_consolidation_different_thresholds(
threshold_recommender, sample_timestamps, sample_values_periodic
):
Expand Down
24 changes: 13 additions & 11 deletions tests/algorithm/intelligent_threshold/test_threshold_recommender.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,21 +1428,21 @@ def test_merge_threshold_results_consolidation_mismatch_down_consolidated_up_not
IntelligentThresholdConfig(
start_hour=6,
end_hour=12,
upper_bound=75.0,
upper_bound=72.0,
lower_bound=None,
window_size=5,
),
IntelligentThresholdConfig(
start_hour=12,
end_hour=18,
upper_bound=80.0,
upper_bound=35.0,
lower_bound=None,
window_size=5,
),
IntelligentThresholdConfig(
start_hour=18,
end_hour=24,
upper_bound=85.0,
upper_bound=36.0,
lower_bound=None,
window_size=5,
),
Expand Down Expand Up @@ -1478,14 +1478,16 @@ def test_merge_threshold_results_consolidation_mismatch_down_consolidated_up_not
assert len(merged_results) == 1
result = merged_results[0]
assert result.status == "Success"
assert len(result.thresholds) == 4 # Should match the non-consolidated direction (up)

# Check that down threshold (25.0) is distributed across all up periods
for i, threshold in enumerate(result.thresholds):
assert threshold.upper_bound == up_results[0].thresholds[i].upper_bound # Keep original up bounds
assert threshold.lower_bound == 25.0 # All should have the consolidated down threshold
assert threshold.start_hour == up_results[0].thresholds[i].start_hour
assert threshold.end_hour == up_results[0].thresholds[i].end_hour
assert len(result.thresholds) == 2 # Should match the non-consolidated direction (up)

assert result.thresholds[0].upper_bound == up_results[0].thresholds[1].upper_bound # Keep original up bounds
assert result.thresholds[0].lower_bound == 25.0 # All should have the consolidated down threshold
assert result.thresholds[0].start_hour == up_results[0].thresholds[0].start_hour
assert result.thresholds[0].end_hour == up_results[0].thresholds[1].end_hour
assert result.thresholds[1].upper_bound == up_results[0].thresholds[3].upper_bound # Keep original up bounds
assert result.thresholds[1].lower_bound == 25.0 # All should have the consolidated down threshold
assert result.thresholds[1].start_hour == up_results[0].thresholds[2].start_hour
assert result.thresholds[1].end_hour == up_results[0].thresholds[3].end_hour


def test_merge_threshold_results_both_consolidated(threshold_recommender):
Expand Down
9 changes: 6 additions & 3 deletions veaiops/algorithm/intelligent_threshold/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@
# Default timezone for time-based operations
DEFAULT_TIMEZONE = tzlocal.get_localzone_name()

# Default time split ranges for daily analysis (hours)
DEFAULT_TIME_SPLIT_RANGES = [[0, 6], [6, 12], [12, 18], [18, 24]]
# Default number of time split (default = 4)
DEFAULT_NUMBER_OF_TIME_SPLIT = int(os.getenv("DEFAULT_NUMBER_OF_TIME_SPLIT", 4))

# Maximum number of threshold blocks after merging (default = 8)
DEFAULT_MAXIMUM_THRESHOLD_BLOCKS = int(os.getenv("DEFAULT_MAXIMUM_THRESHOLD_BLOCKS", 8))

# =============================================================================
# Algorithm Parameters
Expand Down Expand Up @@ -81,7 +84,7 @@
# =============================================================================

# Default correlation threshold for period detection
DEFAULT_CORRELATION_THRESHOLD = 0.75
DEFAULT_CORRELATION_THRESHOLD = float(os.getenv("DEFAULT_CORRELATION_THRESHOLD", 0.3))

# Minimum data points per day for analysis
DEFAULT_MIN_DATA_POINTS_PER_DAY = 720
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from dbscan1d.core import DBSCAN1D

from veaiops.algorithm.intelligent_threshold.configs import (
DEFAULT_TIME_SPLIT_RANGES,
DEFAULT_NUMBER_OF_TIME_SPLIT,
DEFAULT_TIMEZONE,
MICROSECOND_THRESHOLD,
MILLISECOND_THRESHOLD,
Expand All @@ -51,19 +51,28 @@ class ThresholdRecommendAlgorithm:

Attributes:
timezone (str): Timezone for timestamp processing.
time_split_ranges (List[List[int]]): Time ranges for splitting analysis.
time_split_ranges (List[List[float]]): Time ranges for splitting analysis.
period_detector (RobustDailyPeriodDetector): Daily period detection instance.
"""

def __init__(self, timezone: str = DEFAULT_TIMEZONE, time_split_ranges: Optional[List[List[int]]] = None) -> None:
def __init__(self, timezone: str = DEFAULT_TIMEZONE, time_split_ranges: Optional[List[List[float]]] = None) -> None:
"""Initialize the ThresholdRecommendAlgorithm.

Args:
timezone (str): Timezone for timestamp processing.
time_split_ranges (Optional[List[List[int]]]): Custom time split ranges.
time_split_ranges (Optional[List[List[float]]]): Custom time split ranges.
"""
self.timezone = timezone
self.time_split_ranges = time_split_ranges or DEFAULT_TIME_SPLIT_RANGES

if time_split_ranges is None:
self.time_split_ranges = []
start_time = 0.0
for _ in range(DEFAULT_NUMBER_OF_TIME_SPLIT):
end_time = start_time + 24 / DEFAULT_NUMBER_OF_TIME_SPLIT
self.time_split_ranges.append([start_time, end_time])
start_time = end_time
else:
self.time_split_ranges = time_split_ranges
self.period_detector = RobustDailyPeriodDetector()

logger.debug(f"Initialized ThresholdRecommendAlgorithm with timezone={timezone}")
Expand Down Expand Up @@ -363,11 +372,6 @@ def _process_time_split_periods(

threshold_groups.append(threshold_group)

# Check if consolidation is needed
consolidated_group = self._check_and_consolidate_threshold_groups(threshold_groups, direction)
if consolidated_group:
return [consolidated_group]

return threshold_groups

def _check_and_consolidate_threshold_groups(
Expand Down
Loading