Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions src/critic/libs/assertions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from collections.abc import Callable
from enum import Enum
import operator
import re
import shlex
from typing import Any, ClassVar

import httpx
from pydantic import BaseModel, model_serializer, model_validator


class AssertionSubject(str, Enum):
"""
This class should take in strings like "status_code < 400" or "body contains 'foo'"
We can then evaluate operator(<given httpx data field actual value>, expected value)
"""

STATUS_CODE = 'status_code'
BODY = 'body'
RESPONSE_TIME = 'response_time'

def cast(self, value: str) -> Any:
# Casting logic here is simpler than in the validation method
if self == AssertionSubject.STATUS_CODE:
return int(value)
elif self == AssertionSubject.RESPONSE_TIME:
return float(value)
elif self == AssertionSubject.BODY:
return value
else:
raise ValueError()


class Assertion(BaseModel):
assertion_string: str
assertion_object: AssertionSubject
assertion_operator: str
assertion_expected_value: str | int | float

# Shared by all
_OPS: ClassVar[dict[str, Callable]] = {
'==': operator.eq,
'!=': operator.ne,
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
'contains': lambda a, b: b in a,
'not_contains': lambda a, b: b not in a,
'matches': lambda a, b: bool(re.search(b, a)),
}

@model_validator(mode='before')
@classmethod
def _parse_assertion(cls, data: dict):
if isinstance(data, str):
data = {'assertion_string': data}

if not isinstance(data, dict):
raise ValueError(
'Assertion must be initialized with a string or a dict containing assertion_string'
)

if 'assertion_string' in data:
raw_string: str = data['assertion_string']
"""
Things that can go wrong:
1. More than 3 parts
2. assertion subject must be one of the assertion subject possibilities
3. valid operator
4. expected value must map to the correct value that this will make
5. Must be able to parse correctly for body, which may be a string or regex
Parsing here will break the component into its 3 parts, since a body which may be
a string or a regex will be surrounded by ""'s it will be parsed as one part and we
can keep the 3 part format.
"""
try:
parts = shlex.split(raw_string)
except ValueError as e:
raise ValueError(
f'Invalid assertion format: unable to parse quotes in {raw_string}'
) from e

if len(parts) != 3:
raise ValueError(
f'Invalid assertion format: {raw_string} has more or less than 3 parts'
)

try:
subject = AssertionSubject(parts[0])
except ValueError as e:
raise ValueError(
f'Invalid assertion format: {parts[0]} is not a valid Assertion Subject'
) from e

if parts[1] not in cls._OPS:
raise ValueError(f'Invalid assertion format: {parts[1]} is not a valid operator')

try:
converted_value = subject.cast(parts[2])
except ValueError as e:
raise ValueError(f"Value '{parts[2]}' is not valid for {subject.value}") from e

data['assertion_object'] = parts[0]
data['assertion_operator'] = parts[1]
data['assertion_expected_value'] = converted_value

return data

@model_serializer(mode='plain')
def serialize_model(self) -> str:
return f'{self.assertion_string}'

def evaluate(self, response: httpx.Response) -> tuple[bool, str | None]:
"""Return true and empty string if true and false with a string explaining
what failed otherwise"""
op_func = self._OPS[self.assertion_operator]

# Get the actual value from the response based on the subject
actual = None
expected = self.assertion_expected_value
if self.assertion_object == AssertionSubject.STATUS_CODE:
actual = response.status_code
elif self.assertion_object == AssertionSubject.BODY:
actual = response.text
elif self.assertion_object == AssertionSubject.RESPONSE_TIME:
actual = response.elapsed.total_seconds() * 1000

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add an else here with an error for unhandled cases

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt this be dealt with by the cast function?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but if we update the cast function and forget to update this function we might end up with an odd error. Not a big deal either way.

try:
success = op_func(actual, expected)
if success:
return True, None
return (
False,
(
f'Expected {self.assertion_object} {self.assertion_operator} '
f'{expected}, but got {actual}'
),
)
except Exception as e:
return False, f'Error evaluating assertion: {e}'
45 changes: 33 additions & 12 deletions src/critic/libs/uptime.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, project_id: str, monitor_slug: str):
self.project_id = project_id
self.monitor_slug = monitor_slug

self.monitor: UptimeMonitorModel = UptimeMonitorTable.get(
self.monitor: UptimeMonitorModel | None = UptimeMonitorTable.get(
self.project_id, self.monitor_slug
)
if not self.monitor:
Expand Down Expand Up @@ -81,32 +81,47 @@ def make_req(self) -> tuple[httpx.Response | None, float]:
start = time.perf_counter()
with httpx.Client() as client:
try:
response = client.head(
response: httpx.Response = client.head(
str(self.monitor.url), timeout=float(self.monitor.timeout_secs)
)
except httpx.TimeoutException:
response = None
finished = time.perf_counter()
latency = finished - start
latency = response.elapsed.total_seconds() * 1000 if response else (finished - start)
return response, latency

def alert(self):
"""TODO: alert self.monitor.alert_slack_channels and self.monitor.alert_emails."""

def check_resp(self, response: httpx.Response | None) -> tuple[MonitorState, int]:
"""Checks the response and returns the new state and consecutive fails. Also alerts if
needed.
def check_resp(self, response: httpx.Response | None) -> tuple[MonitorState, int, list[str]]:
"""Checks the response and returns the new state and consecutive fails and list of error
messages. Also alerts if needed consecutive fails is above failure maximum.
"""
error_messages = []
state = MonitorState.down

if response:
state = MonitorState.up
# TODO: check assertions
if self.monitor.assertions != []:
for assertions in self.monitor.assertions:
passed, error_message = assertions.evaluate(response)
if not passed:
error_messages.append(error_message)
if not error_messages:
state = MonitorState.up
else:
state = MonitorState.up
# else means there was a timeout
else:
error_messages.append('Connection Timeout')

consecutive_fails = 0 if state == MonitorState.up else self.monitor.consecutive_fails + 1
if consecutive_fails >= self.monitor.failures_before_alerting:
self.alert()
return state, consecutive_fails
return state, consecutive_fails, error_messages

def put_log(self, state: MonitorState, status_code: int, latency: float):
def put_log(
self, state: MonitorState, status_code: int, latency: float, error_message: str | None
):
"""
Puts a log for the check. This method should only be called once per monitor check.
"""
Expand All @@ -118,6 +133,7 @@ def put_log(self, state: MonitorState, status_code: int, latency: float):
status=state,
resp_code=status_code,
latency_secs=latency,
error_message=error_message,
)
UptimeLogTable.put(uptime_log)
self._put_log = True
Expand All @@ -140,11 +156,16 @@ def run(self):
resp, latency = self.make_req()

# Check the response (also kicks off alerts if needed)
state, consecutive_fails = self.check_resp(resp)
state, consecutive_fails, error_messages = self.check_resp(resp)

# Update the monitor
updated = self.update_monitor({'state': state, 'consecutive_fails': consecutive_fails})

# Save a log
if updated:
self.put_log(state, resp.status_code if resp else 0, latency)
self.put_log(
state,
resp.status_code if resp else 0,
latency,
error_messages if error_messages else None,
)
9 changes: 5 additions & 4 deletions src/critic/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

from datetime import UTC, datetime
from enum import Enum
from typing import Any
from uuid import UUID

from pydantic import AwareDatetime, BaseModel, Field, HttpUrl, field_validator

from critic.libs.assertions import Assertion
from critic.libs.ddb import CONSTANT_GSI_PK
from critic.libs.dt import to_utc

Expand Down Expand Up @@ -35,7 +35,7 @@ class UptimeMonitorModel(BaseModel):
default_factory=lambda: datetime.now(UTC).replace(second=0, microsecond=0)
)
timeout_secs: float = Field(ge=0, default=5)
assertions: dict[str, Any] = Field(default_factory=dict)
assertions: list[Assertion] = Field(default_factory=list)
Comment thread
calebsyring marked this conversation as resolved.
failures_before_alerting: int = Field(ge=1, default=1)
alert_slack_channels: list[str] = Field(default_factory=list)
alert_emails: list[str] = Field(default_factory=list)
Expand Down Expand Up @@ -63,8 +63,9 @@ class UptimeLogModel(BaseModel):
)
timestamp: AwareDatetime
status: MonitorState
resp_code: int
latency_secs: float
resp_code: int | None = None
latency_secs: float | None = None
error_message: list[str] | None = None

@staticmethod
def monitor_id_from_parts(project_id: UUID | str, slug: str) -> str:
Expand Down
81 changes: 81 additions & 0 deletions tests/critic_tests/test_assertions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from datetime import timedelta

import httpx
import pytest

from critic.libs.assertions import Assertion


RESPONSE_TIME_ASSERTION = {'assertion_string': 'response_time < 20.2'}
STATUS_CODE_ASSERTION = {'assertion_string': 'status_code == 200'}
BODY_ASSERTION = {'assertion_string': 'body contains "foo bar"'}
BAD_OP_ASSERTION = {'assertion_string': 'status_code foo 200'}
BAD_SUBJECT_ASSERTION = {'assertion_string': 'bad_subject > 200'}
BAD_EV_ASSERTION = {'assertion_string': 'status_code > "foo bar"'}
TOO_LONG_ASSERTION = {'assertion_string': 'status_code > 200 hello'}
TOO_SHORT_ASSERTION = {'assertion_string': 'status_code'}


def custom_response(request: httpx.Request):
resp = httpx.Response(status_code=200, text='foo bar')
# Manually set the elapsed time to 500ms
resp.elapsed = timedelta(milliseconds=20.1)
Comment on lines +21 to +22
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment doesn't match elapsed. Also, this function isn't used anywhere

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure what you mean its not used anywhere, elapsed is used for the response time in the evaluate method.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean custom_response isn't called anywhere as far as I can tell.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ``httpx_mock` is not being used you are correct, so I can just get rid of that. The resp is still being used, and it worked without that fixture as far as I know

return resp


class TestAssertions:
def test_assertion_validation_fails_with_bad_operator(self):
with pytest.raises(ValueError, match='is not a valid operator'):
Assertion(**BAD_OP_ASSERTION)

def test_assertion_validation_fails_with_bad_Assertion_subject(self):
with pytest.raises(ValueError, match='is not a valid Assertion Subject'):
Assertion(**BAD_SUBJECT_ASSERTION)

def test_assertion_validation_fails_with_bad_expected_value_casting(self):
with pytest.raises(ValueError, match='is not valid for status_code'):
Assertion(**BAD_EV_ASSERTION)

def test_assertion_validation_fails_as_too_long_and_short(self):
with pytest.raises(ValueError, match='has more or less than 3 parts'):
Assertion(**TOO_LONG_ASSERTION)
with pytest.raises(ValueError, match='has more or less than 3 parts'):
Assertion(**TOO_SHORT_ASSERTION)

def test_assertion_validation_correct(self):
assertion = Assertion(**STATUS_CODE_ASSERTION)
assert assertion.assertion_string == 'status_code == 200'
assert assertion.assertion_operator == '=='
assert assertion.assertion_expected_value == 200

assertion = Assertion(**RESPONSE_TIME_ASSERTION)
assert assertion.assertion_operator == '<'
assert assertion.assertion_expected_value == 20.2

assertion = Assertion(**BODY_ASSERTION)
assert assertion.assertion_operator == 'contains'
assert assertion.assertion_expected_value == 'foo bar'

def test_assertion_serialize_correctly(self):
assertion = Assertion(assertion_string='status_code > 200')
assert assertion.serialize_model() == 'status_code > 200'

def test_assertion_evaluates_correctly(self):
assertion_status_code = Assertion(**STATUS_CODE_ASSERTION)
assertion_resp_time = Assertion(**RESPONSE_TIME_ASSERTION)
assertion_body = Assertion(**BODY_ASSERTION)

resp = httpx.Response(status_code=200, text='foo bar')
resp.elapsed = timedelta(milliseconds=20.1)

status_code_eval: tuple[bool, str] = assertion_status_code.evaluate(response=resp)
assert status_code_eval[0]
assert status_code_eval[1] is None

resp_time_eval: tuple[bool, str] = assertion_resp_time.evaluate(response=resp)
assert resp_time_eval[0]
assert resp_time_eval[1] is None

assertion_body: tuple[bool, str] = assertion_body.evaluate(response=resp)
assert resp_time_eval[0]
assert resp_time_eval[1] is None
16 changes: 15 additions & 1 deletion tests/critic_tests/test_libs/test_ddb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from botocore.exceptions import ClientError
import pytest

from critic.libs.assertions import Assertion
from critic.libs.testing import ProjectFactory, UptimeLogFactory, UptimeMonitorFactory
from critic.models import ProjectModel, UptimeLogModel, UptimeMonitorModel
from critic.tables import ProjectTable, UptimeLogTable, UptimeMonitorTable
Expand All @@ -12,10 +13,15 @@
class TestTable:
@pytest.mark.integration
def test_integration(self):
# Pretend we've received data via the API
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to stop using the factories here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumb mistake 🤷‍♂️

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method still needs to be switched back to the factory I think.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oop, switched a different one

UptimeMonitorFactory.put(
project_id='6033aa47-a9f7-4d7f-b7ff-a11ba9b34474',
slug='my-monitor',
url='https://example.com/health',
assertions=[
Assertion(assertion_string='status_code == 200'),
Assertion(assertion_string="body contains 'OK'"),
],
)
out_data = UptimeMonitorTable.get('6033aa47-a9f7-4d7f-b7ff-a11ba9b34474', 'my-monitor')

Expand All @@ -33,7 +39,10 @@ def test_unit(self, input_as_model):
'consecutive_fails': 0,
'next_due_at': '2025-11-10T20:35:00Z',
'timeout_secs': 30,
'assertions': {'status_code': 200, 'body_contains': 'OK'},
'assertions': [
Assertion(assertion_string='status_code == 200'),
Assertion(assertion_string="body contains 'OK'"),
],
'failures_before_alerting': 2,
'alert_slack_channels': ['#ops'],
'alert_emails': ['alerts@example.com'],
Expand Down Expand Up @@ -63,7 +72,12 @@ def test_query_from_monitor_table(self):
project_id='6033aa47-a9f7-4d7f-b7ff-a11ba9b34474',
slug='my-monitor',
url='https://example.com/health',
assertions=[
Assertion(assertion_string='status_code == 200'),
Assertion(assertion_string="body contains 'OK'"),
],
)

out_data = UptimeMonitorTable.query('6033aa47-a9f7-4d7f-b7ff-a11ba9b34474')
assert len(out_data) == 1
assert str(out_data[0].url) == 'https://example.com/health'
Expand Down
Loading
Loading