Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions env-config.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
profile:
test:
AWS_PROFILE: 'critic-test'
# Devs need to set their own namespace via mise.local.toml. This prevents it from being cleared.
CRITIC_NAMESPACE: '{env.CRITIC_NAMESPACE}'
MU_DEFAULT_ENV: '{env.CRITIC_NAMESPACE}'
MU_CONFIG_PATH: '{env.CRITIC_SRC}/mu-test.toml'
ci:
AWS_PROFILE: 'critic-test'
CRITIC_NAMESPACE: 'ci'
MU_DEFAULT_ENV: 'ci'
MU_CONFIG_PATH: '{env.CRITIC_SRC}/mu-ci.toml'
dev:
AWS_PROFILE: 'critic-dev'
# Devs need to set their own namespace via mise.local.toml. This prevents it from being cleared.
CRITIC_NAMESPACE: '{env.CRITIC_NAMESPACE}'
MU_DEFAULT_ENV: '{env.CRITIC_NAMESPACE}'
MU_CONFIG_PATH: '{env.CRITIC_SRC}/mu-dev.toml'
qa:
Expand All @@ -18,5 +23,6 @@ profile:
MU_CONFIG_PATH: '{env.CRITIC_SRC}/mu-qa.toml'
prod:
AWS_PROFILE: 'critic-prod'
CRITIC_NAMESPACE: 'prod'
MU_DEFAULT_ENV: 'prod'
MU_CONFIG_PATH: '{env.CRITIC_SRC}/mu-prod.toml'
93 changes: 70 additions & 23 deletions src/critic/libs/ddb.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from boto3 import client
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from botocore.exceptions import ClientError
from pydantic import AwareDatetime, BaseModel, TypeAdapter

from critic.libs.dt import to_utc
Expand Down Expand Up @@ -133,45 +134,91 @@ def get(cls, partition_value: Any, sort_value: Any | None = None) -> BaseModel |
@classmethod
def query(cls, partition_value: Any) -> list[BaseModel]:
"""Query for all items with the given partition key."""
# We use aliases for the partition key to avoid reserved word conflicts
pk_alias = f'#{cls.partition_key}'
names, values, clauses = cls.alias({cls.partition_key: partition_value})

response = get_client().query(
TableName=cls.name(),
KeyConditionExpression=f'{pk_alias} = :pk',
ExpressionAttributeNames={pk_alias: cls.partition_key},
ExpressionAttributeValues=serialize({':pk': partition_value}),
KeyConditionExpression=clauses[0],
ExpressionAttributeNames=names,
ExpressionAttributeValues=values,
)

items = response.get('Items', [])
return [cls.model(**deserialize(item)) for item in items]

@staticmethod
def alias(data: dict, val_suffix: str = '') -> tuple[dict, dict, list]:
"""
Serializes a dict of key-value pairs and returns:
1. A dict mapping aliased keys to actual keys (often used as ExpressionAttributeNames)
Ex. {'#key1': 'key1', '#key2': 'key2'}
2. A dict mapping aliased values to actual values (often used as ExpressionAttributeValues)
Ex. {':key1': 'value1', ':key2': 'value2'}
3. A list of aliased key-value pairs in DDB expression format (often used in *Expression)
Ex. ['#key1 = :key1', '#key2 = :key2']

Sometimes you may want to use the same key with different values in different expressions.
For example, the condition expression may require a key to have one value while the update
expression will set it to another. In that case, you can pass a suffix to differentiate
them.

You can then safely combine the results of multiple calls to this function into
ExpressionAttributeValues. You'll have something that looks like this:
{':key-suffix1': 'value1', ':key-suffix2': 'value2'}
"""
data = serialize(data)
names = {f'#{k}': k for k in data}
values = {f':{k}{val_suffix}': v for k, v in data.items()}
clauses = [f'#{k} = :{k}{val_suffix}' for k in data]
return names, values, clauses

@staticmethod
def alias_all(*data: dict) -> tuple[dict, dict, list[list]]:
"""
Calls alias() for each dict and returns the combined results. See alias() for more info.
"""
names, values, clauses = {}, {}, []
for i, d in enumerate(data):
n, v, c = Table.alias(d, f'_{i}')
names |= n
values |= v
clauses.append(c)
return names, values, clauses

@classmethod
def update(
cls,
partition_value: Any,
sort_value: Any | None = None,
**updates,
):
updates: dict | None = None,
condition: dict | None = None,
) -> bool:
if not updates:
raise ValueError('No updates provided')
updates = serialize(updates)

# Alias every field
expr_attr_names = {f'#{k}': k for k in updates}
expr_attr_values = {f':{k}': v for k, v in updates.items()}

# Build SET expression
set_clauses = [f'#{k} = :{k}' for k in updates]
update_expr = 'SET ' + ', '.join(set_clauses)

get_client().update_item(
TableName=cls.name(),
Key=cls.key(partition_value, sort_value),
UpdateExpression=update_expr,
ExpressionAttributeNames=expr_attr_names,
ExpressionAttributeValues=expr_attr_values,
)
if condition:
names, values, clauses = cls.alias_all(updates, condition)
update_clauses, cond_clauses = clauses
else:
names, values, update_clauses = cls.alias(updates)

kwargs = {
'TableName': cls.name(),
'Key': cls.key(partition_value, sort_value),
'UpdateExpression': 'SET ' + ', '.join(update_clauses),
'ExpressionAttributeNames': names,
'ExpressionAttributeValues': values,
}
if condition:
kwargs['ConditionExpression'] = ' AND '.join(cond_clauses)

try:
get_client().update_item(**kwargs)
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
return False
raise
return True

@classmethod
def delete(cls, partition_value: Any, sort_value: Any | None = None):
Expand Down
11 changes: 10 additions & 1 deletion src/critic/libs/dt.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta


def is_aware(dt: datetime) -> bool:
Expand All @@ -9,3 +9,12 @@ def to_utc(dt: datetime) -> datetime:
if not is_aware(dt):
raise ValueError(f'datetime must be timezone aware, got {dt}')
return dt.astimezone(UTC)


def round_minute(dt: datetime) -> datetime:
# Most of the time we want to round down (that usually means we're erring on the side of
# running a monitor too often vs. not enough). However, if we're very late in the minute, it's
# more likely that the task ran just a bit too early and we should round up.
if dt.second >= 55:
return dt.replace(second=0, microsecond=0) + timedelta(minutes=1)
return dt.replace(second=0, microsecond=0)
150 changes: 150 additions & 0 deletions src/critic/libs/uptime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from datetime import UTC, datetime, timedelta
from functools import cached_property
import logging
import time

import httpx

from critic.libs.dt import round_minute
from critic.models import MonitorState, UptimeLog, UptimeMonitorModel
from critic.tables import UptimeLogTable, UptimeMonitorTable


log = logging.getLogger(__name__)


class MonitorNotFoundError(ValueError):
pass


class UptimeCheck:
"""
This class is responsible for running a single uptime check for a given monitor. It handles
making the request, checking the response, updating the monitor, and saving a log.
"""

def __init__(self, project_id: str, monitor_slug: str):
self.now = datetime.now(UTC)
self.project_id = project_id
self.monitor_slug = monitor_slug

self.monitor: UptimeMonitorModel = UptimeMonitorTable.get(
self.project_id, self.monitor_slug
)
if not self.monitor:
raise MonitorNotFoundError(f'Monitor {project_id}/{monitor_slug} not found')

# Used internally to make sure we don't duplicate DB operations
self._updated_monitor = False
self._put_log = False

@cached_property
def new_next_due_at(self):
"""Returns what the monitor's next due time should be updated to."""
# Normally we just add the frequency to the last due time. (Not using now helps prevent
# drift from imprecise scheduling.)
next_due_at = self.monitor.next_due_at + timedelta(minutes=self.monitor.frequency_mins)

# If we do that and the next due time is still less than right now, that indicates we're
# way behind on checks on this monitor, and we need to "reset" relative to now.
if next_due_at < self.now:
next_due_at = round_minute(self.now) + timedelta(minutes=self.monitor.frequency_mins)

return next_due_at

def update_monitor(self, updates: dict | None = None) -> bool:
"""
Updates the monitor with the given updates and also updates the next due time. This method
should only be called once per monitor check.

Returns True if the update was successful, False if it failed (due to a race condition).
"""
if self._updated_monitor:
raise Exception(
'Monitor already updated! Do not call this method more than once in one run.'
)
if updates is None:
updates = {}
updated = UptimeMonitorTable.update(
self.project_id,
self.monitor_slug,
updates={**updates, 'next_due_at': self.new_next_due_at},
condition={'next_due_at': self.monitor.next_due_at},
)
self._updated_monitor = True
return updated

def make_req(self) -> tuple[httpx.Response | None, float]:
"""
Makes the request and returns the response and the time it took to make the request.
"""
start = time.perf_counter()
with httpx.Client() as client:
try:
response = client.head(
str(self.monitor.url), timeout=float(self.monitor.timeout_secs)
)
except httpx.TimeoutException:
response = None
finished = time.perf_counter()
latency = finished - start
return response, latency

def alert(self):
"""TODO: alert self.monitor.alert_slack_channels and self.monitor.alert_emails."""

def check_resp(self, response: httpx.Response | None) -> tuple[MonitorState, int]:
"""Checks the response and returns the new state and consecutive fails. Also alerts if
needed.
"""
state = MonitorState.down
if response:
state = MonitorState.up
# TODO: check assertions
consecutive_fails = 0 if state == MonitorState.up else self.monitor.consecutive_fails + 1
if consecutive_fails >= self.monitor.failures_before_alerting:
self.alert()
return state, consecutive_fails

def put_log(self, state: MonitorState, status_code: int, latency: float):
"""
Puts a log for the check. This method should only be called once per monitor check.
"""
if self._put_log:
raise Exception('Log already put! Do not call this method more than once in one run.')
uptime_log = UptimeLog(
monitor_id=f'{self.monitor.project_id}/{self.monitor.slug}',
timestamp=self.now,
status=state,
resp_code=status_code,
latency_secs=latency,
)
UptimeLogTable.put(uptime_log)
self._put_log = True

def run(self):
"""
1. Make the request
2. Check the response
3. Update the monitor
4. Save a log
"""
log.info(f'Starting check for monitor: {self.monitor}')

# Handle paused by just updating the next due time
if self.monitor.state == MonitorState.paused:
self.update_monitor()
return

# Make the request
resp, latency = self.make_req()

# Check the response (also kicks off alerts if needed)
state, consecutive_fails = self.check_resp(resp)

# Update the monitor
updated = self.update_monitor({'state': state, 'consecutive_fails': consecutive_fails})

# Save a log
if updated:
self.put_log(state, resp.status_code if resp else 0, latency)
6 changes: 5 additions & 1 deletion src/critic/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class UptimeMonitorModel(BaseModel):
state: MonitorState = Field(default=MonitorState.new)
frequency_mins: int = Field(ge=1, default=1)
consecutive_fails: int = Field(ge=0, default=0)
next_due_at: AwareDatetime = Field(default_factory=lambda: datetime.now(UTC))
next_due_at: AwareDatetime = Field(
default_factory=lambda: datetime.now(UTC).replace(second=0, microsecond=0)
)
timeout_secs: float = Field(ge=0, default=5)
assertions: dict[str, Any] = Field(default_factory=dict)
failures_before_alerting: int = Field(ge=1, default=1)
Expand All @@ -44,6 +46,8 @@ class UptimeMonitorModel(BaseModel):
@classmethod
def validate_next_due_at(cls, v: datetime) -> datetime:
"""Normalize to UTC"""
if v.second or v.microsecond:
raise ValueError('next_due_at must be no more precise than minutes')
return to_utc(v)


Expand Down
Loading