From 8f241e0ae92ac50ef3c63eb87fb5788e8f570d99 Mon Sep 17 00:00:00 2001 From: Jean-Regis-M <240509606@firat.edu.tr> Date: Sat, 30 May 2026 15:40:39 +0300 Subject: [PATCH 1/2] feat(aegis): add telemetry JSON-LD schema and scaffolding - Add finbot/aegis/telemetry/schema.py with AuditEvent models - Add AEGIS_ENABLED and AEGIS_TELEMETRY_ENABLED settings - Extend events.py to support 'aegis.*' namespaces - Add unit tests for telemetry schema - Update conftest.py for aegis package discovery Week 1 deliverable - GSoC 2026 OWASP FinBot AEGIS --- finbot/aegis/__init__.py | 24 ++ finbot/aegis/telemetry/__init__.py | 28 ++ finbot/aegis/telemetry/schema.py | 231 +++++++++++++++ finbot/config.py | 16 + finbot/core/messaging/events.py | 45 +++ test_telemetry_standalone.py | 0 tests/unit/aegis/__init__.py | 1 + tests/unit/aegis/test_telemetry_schema.py | 337 ++++++++++++++++++++++ 8 files changed, 682 insertions(+) create mode 100644 finbot/aegis/__init__.py create mode 100644 finbot/aegis/telemetry/__init__.py create mode 100644 finbot/aegis/telemetry/schema.py create mode 100644 test_telemetry_standalone.py create mode 100644 tests/unit/aegis/__init__.py create mode 100644 tests/unit/aegis/test_telemetry_schema.py diff --git a/finbot/aegis/__init__.py b/finbot/aegis/__init__.py new file mode 100644 index 00000000..90d3663d --- /dev/null +++ b/finbot/aegis/__init__.py @@ -0,0 +1,24 @@ +# ============================================================ +# File: finbot/aegis/__init__.py +# Purpose: Public exports for FinBot-AEGIS runtime security layer +# Author: Jean Francois Regis MUKIZA +# GSoC Week: 1 +# OWASP Category: ASI01–ASI10 (platform-wide) +# ============================================================ +"""FinBot-AEGIS: runtime security layer for OWASP FinBot CTF.""" + +from finbot.aegis.intent_gate import IntentGate +from finbot.aegis.schemas import PolicyVerdict +from finbot.aegis.sentinel import AuditEvent, SentinelStream +from finbot.aegis.service import AegisEnforcementService +from finbot.aegis.trust_mesh import AttestationResult, TrustMesh + +__all__ = [ + "AegisEnforcementService", + "AttestationResult", + "AuditEvent", + "IntentGate", + "PolicyVerdict", + "SentinelStream", + "TrustMesh", +] diff --git a/finbot/aegis/telemetry/__init__.py b/finbot/aegis/telemetry/__init__.py new file mode 100644 index 00000000..c081107b --- /dev/null +++ b/finbot/aegis/telemetry/__init__.py @@ -0,0 +1,28 @@ +# ============================================================ +# File: finbot/aegis/telemetry/__init__.py +# Purpose: Telemetry package initialization +# Author: Jean Francois Regis MUKIZA +# GSoC Week: 1 +# OWASP Category: ASI01 (Prompt Injection), ASI06 (Sandboxing) +# ============================================================ +"""AEGIS Telemetry: structured audit event pipeline with HMAC chaining.""" + +from finbot.aegis.telemetry.chain import AuditChain +from finbot.aegis.telemetry.schema import ( + AuditEvent, + DelegationEvent, + MemoryWriteEvent, + PolicyDecisionEvent, + ToolCallEvent, + ToolResultEvent, +) + +__all__ = [ + "AuditEvent", + "ToolCallEvent", + "ToolResultEvent", + "MemoryWriteEvent", + "DelegationEvent", + "PolicyDecisionEvent", + "AuditChain", +] diff --git a/finbot/aegis/telemetry/schema.py b/finbot/aegis/telemetry/schema.py new file mode 100644 index 00000000..f6e669c5 --- /dev/null +++ b/finbot/aegis/telemetry/schema.py @@ -0,0 +1,231 @@ +# ============================================================ +# File: finbot/aegis/telemetry/schema.py +# Purpose: JSON-LD schemas for structured audit events +# Author: Jean Francois Regis MUKIZA +# GSoC Week: 1 +# OWASP Category: ASI01 (Prompt Injection), ASI06 (Sandboxing) +# ============================================================ +"""JSON-LD event schemas for AEGIS telemetry pipeline. + +All events include: +- @context: JSON-LD context URL +- @type: Event type (ToolCall, ToolResult, etc.) +- timestamp: ISO 8601 timestamp +- namespace: Player's isolated namespace +- workflow_id: Execution trace identifier +- prev_hash: HMAC of previous event (for chaining) +- event_hash: HMAC of this event +""" + +from datetime import UTC, datetime +from enum import Enum +from typing import Any, Optional + +from pydantic import BaseModel, Field, field_validator + + +class EventType(str, Enum): + """AEGIS event types for audit trail.""" + + TOOL_CALL = "aegis.tool.call" + TOOL_RESULT = "aegis.tool.result" + MEMORY_WRITE = "aegis.memory.write" + DELEGATION = "aegis.delegation" + POLICY_DECISION = "aegis.policy.decision" + ANOMALY_DETECTION = "aegis.anomaly.detection" + + +class BaseAuditEvent(BaseModel): + """Base class for all AEGIS audit events.""" + + context: str = Field( + default="https://owasp.org/aegis/v1/context.jsonld", + alias="@context", + ) + type: str = Field(alias="@type") + timestamp: str = Field( + default_factory=lambda: datetime.now(UTC).isoformat().replace("+00:00", "Z") + ) + namespace: str = Field( + description="Player's isolated namespace (e.g., 'player_abc123')" + ) + workflow_id: str = Field( + description="Execution workflow identifier for tracing" + ) + user_id: str = Field(description="User who initiated the action") + agent_name: str = Field(description="Agent performing the action") + prev_hash: Optional[str] = Field(default=None, description="HMAC of previous event") + event_hash: Optional[str] = Field(default=None, description="HMAC of this event") + severity: str = Field( + default="info", + description="Event severity: debug, info, warning, critical", + ) + labels: dict[str, str] = Field( + default_factory=dict, + description="Custom labels for filtering (e.g., {'asi': 'ASI01'})", + ) + + class Config: + """Pydantic config.""" + + populate_by_name = True + json_schema_extra = { + "examples": [ + { + "@context": "https://owasp.org/aegis/v1/context.jsonld", + "@type": "aegis.tool.call", + "timestamp": "2026-05-27T12:34:56Z", + "namespace": "player_abc123", + "workflow_id": "wf_xyz789", + "user_id": "user_1", + "agent_name": "OnboardingAgent", + "tool_name": "create_vendor", + "arguments": {"name": "Acme Corp"}, + "severity": "info", + "labels": {"asi": "ASI01", "phase": "recon"}, + } + ] + } + + +class ToolCallEvent(BaseAuditEvent): + """Fired when an agent calls a tool (before execution).""" + + type: str = Field(default=EventType.TOOL_CALL.value, alias="@type") + tool_name: str = Field(description="Name of the tool being called") + tool_source: str = Field( + description="Source of the tool (e.g., 'findrive', 'finmail', 'finstripe')" + ) + arguments: dict[str, Any] = Field( + default_factory=dict, + description="Tool arguments (sanitized; sensitive values masked)", + ) + tool_description: Optional[str] = Field( + default=None, + description="Description of what the tool does", + ) + + +class ToolResultEvent(BaseAuditEvent): + """Fired when a tool returns a result (after execution).""" + + type: str = Field(default=EventType.TOOL_RESULT.value, alias="@type") + tool_name: str = Field(description="Name of the tool that was called") + return_value: Optional[str] = Field( + default=None, + description="Tool result (truncated if large; first 500 chars)", + ) + success: bool = Field(description="Whether the tool call succeeded") + error_message: Optional[str] = Field(default=None, description="Error message if failed") + execution_time_ms: Optional[float] = Field(default=None, description="Execution time in ms") + + +class MemoryWriteEvent(BaseAuditEvent): + """Fired when an agent writes to its memory/context.""" + + type: str = Field(default=EventType.MEMORY_WRITE.value, alias="@type") + memory_key: str = Field(description="Key in the memory store") + memory_scope: str = Field( + description="Scope: 'workflow', 'session', 'long_term'", + pattern="^(workflow|session|long_term)$", + ) + value_preview: Optional[str] = Field( + default=None, + description="Preview of value (first 200 chars; actual value hashed)", + ) + size_bytes: int = Field(description="Size of the value in bytes") + + +class DelegationEvent(BaseAuditEvent): + """Fired when an agent delegates to another agent.""" + + type: str = Field(default=EventType.DELEGATION.value, alias="@type") + delegating_agent: str = Field(description="Agent that is delegating") + delegated_agent: str = Field(description="Agent being delegated to") + task_summary: str = Field(description="High-level task being delegated") + delegation_scope: dict[str, Any] = Field( + default_factory=dict, + description="What tools/data the delegated agent can access", + ) + + +class PolicyDecisionEvent(BaseAuditEvent): + """Fired when the AEGIS policy engine makes a decision.""" + + type: str = Field(default=EventType.POLICY_DECISION.value, alias="@type") + action: str = Field( + description="Decision: 'allow', 'deny', 'quarantine'", + pattern="^(allow|deny|quarantine)$", + ) + rule_id: Optional[str] = Field(default=None, description="Which policy rule matched") + reason: str = Field(description="Human-readable reason for the decision") + asi_tags: list[str] = Field( + default_factory=list, + description="OWASP ASI categories this decision protects against", + ) + confidence: float = Field( + default=1.0, + description="Confidence score (0.0–1.0)", + ge=0.0, + le=1.0, + ) + + +class AnomalyDetectionEvent(BaseAuditEvent): + """Fired when an anomaly is detected in the execution flow.""" + + type: str = Field(default=EventType.ANOMALY_DETECTION.value, alias="@type") + anomaly_type: str = Field( + description="Type of anomaly: 'cascade_failure', 'resource_exhaustion', 'policy_violation'" + ) + affected_agent: Optional[str] = Field( + default=None, + description="Agent affected by the anomaly", + ) + anomaly_score: float = Field( + description="Anomaly score (0.0–1.0)", + ge=0.0, + le=1.0, + ) + details: dict[str, Any] = Field( + default_factory=dict, + description="Additional anomaly details", + ) + + +class AuditEvent(BaseModel): + """Union type for all audit events. + + Used for type hinting and validation in the telemetry chain. + In practice, events are serialized to JSON and deserialized + from Redis Streams. + """ + + event: ( + ToolCallEvent + | ToolResultEvent + | MemoryWriteEvent + | DelegationEvent + | PolicyDecisionEvent + | AnomalyDetectionEvent + ) = Field(discriminator="type") + + @field_validator("event", mode="before") + @classmethod + def validate_event(cls, v: Any) -> Any: + """Validate and construct the correct event type.""" + if isinstance(v, dict): + event_type = v.get("@type") or v.get("type") + if event_type == EventType.TOOL_CALL.value: + return ToolCallEvent(**v) + elif event_type == EventType.TOOL_RESULT.value: + return ToolResultEvent(**v) + elif event_type == EventType.MEMORY_WRITE.value: + return MemoryWriteEvent(**v) + elif event_type == EventType.DELEGATION.value: + return DelegationEvent(**v) + elif event_type == EventType.POLICY_DECISION.value: + return PolicyDecisionEvent(**v) + elif event_type == EventType.ANOMALY_DETECTION.value: + return AnomalyDetectionEvent(**v) + return v diff --git a/finbot/config.py b/finbot/config.py index df362f5c..3600ab14 100644 --- a/finbot/config.py +++ b/finbot/config.py @@ -137,6 +137,22 @@ class Settings(BaseSettings): LABS_GUARDRAIL_MAX_TIMEOUT: int = 30 # seconds LABS_GUARDRAIL_MAX_PAYLOAD_BYTES: int = 65536 # 64 KiB + # FinBot-AEGIS runtime security (GSoC 2026) + AEGIS_ENABLED: bool = True + AEGIS_ENFORCEMENT_MODE: str = "observe" # observe | enforce + AEGIS_POLICY_DIR: str = "finbot/aegis/policies" + AEGIS_TRUST_ENFORCE: bool = False + AEGIS_TRUST_MANIFESTS_JSON: str = "" + AEGIS_AUDIT_CHAIN_TTL: int = 86400 + AEGIS_CASCADE_WINDOW_SECONDS: int = 30 + AEGIS_CASCADE_MAX_CALLS: int = 25 + + # AEGIS Telemetry Pipeline (Week 1-3) + AEGIS_TELEMETRY_ENABLED: bool = True + AEGIS_CHAIN_SECRET: str = "default-telemetry-chain-secret" # Change in production + AEGIS_TELEMETRY_STREAM_NAME: str = "finbot:aegis:audit" + AEGIS_TELEMETRY_RETENTION_DAYS: int = 7 + # Email Config EMAIL_PROVIDER: str = "console" # "console" | "resend" RESEND_API_KEY: str = "" diff --git a/finbot/core/messaging/events.py b/finbot/core/messaging/events.py index 866ae04b..1677af15 100644 --- a/finbot/core/messaging/events.py +++ b/finbot/core/messaging/events.py @@ -17,6 +17,17 @@ - agent.onboarding_agent.llm_request_success (llm) - agent.invoice_agent.tool_call_success (tool) +- aegis: Events for AEGIS security telemetry (GSoC Week 1-3) + - pattern: aegis.. + - categories: tool, policy, memory, delegation, anomaly + - Examples: + - aegis.tool.call (before tool execution) + - aegis.tool.result (after tool execution) + - aegis.policy.decision (policy engine verdict) + - aegis.memory.write (memory/context write) + - aegis.delegation (agent-to-agent delegation) + - aegis.anomaly.detection (cascade, resource exhaustion, etc.) + Note: CTF outcomes (challenge completions, badge awards) are derived by the CTFEventProcessor from these events, not emitted directly. event_subtype="ctf" can be used to support CTF challenges and badges as needed. @@ -187,6 +198,40 @@ async def emit_agent_event( stream_name, ) + async def emit_aegis_event( + self, + event_type: str, + event_data: dict[str, Any], + session_context: SessionContext, + workflow_id: str | None = None, + ) -> None: + """Emit AEGIS security telemetry event. + + Args: + event_type: Event type (e.g., 'tool.call', 'policy.decision', 'memory.write') + event_data: Event payload (tool_name, action, reason, etc.) + session_context: Session context for namespace/user tracking + workflow_id: Workflow identifier for tracing + """ + aegis_event = { + "namespace": session_context.namespace, + "user_id": session_context.user_id, + "session_id": session_context.session_id, + "event_type": f"aegis.{event_type}", + "workflow_id": workflow_id or "", + "timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"), + **(event_data or {}), + } + + self._apply_workflow_context(aegis_event) + encoded_event = self._encode_event_data(aegis_event) + + stream_name = f"{self.event_prefix}:aegis" + await self.redis.xadd( + stream_name, encoded_event, maxlen=settings.EVENT_BUFFER_SIZE + ) + logger.debug("Emitted AEGIS event %s to stream %s", event_type, stream_name) + def subscribe_to_events(self, event_pattern: str, callback: Callable) -> None: """Subscribe to events""" stream_name = f"{self.event_prefix}:{event_pattern}" diff --git a/test_telemetry_standalone.py b/test_telemetry_standalone.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/aegis/__init__.py b/tests/unit/aegis/__init__.py new file mode 100644 index 00000000..779b488d --- /dev/null +++ b/tests/unit/aegis/__init__.py @@ -0,0 +1 @@ +"""Unit tests for FinBot-AEGIS.""" diff --git a/tests/unit/aegis/test_telemetry_schema.py b/tests/unit/aegis/test_telemetry_schema.py new file mode 100644 index 00000000..2f3ff75f --- /dev/null +++ b/tests/unit/aegis/test_telemetry_schema.py @@ -0,0 +1,337 @@ +# ============================================================ +# File: tests/unit/aegis/test_telemetry_schema.py +# Purpose: Unit tests for telemetry event schemas +# Author: Jean Francois Regis MUKIZA +# GSoC Week: 1 +# OWASP Category: ASI01, ASI06 +# ============================================================ +"""Tests for AEGIS telemetry JSON-LD schemas.""" + +import pytest +from datetime import UTC, datetime + +from finbot.aegis.telemetry.schema import ( + ToolCallEvent, + ToolResultEvent, + MemoryWriteEvent, + DelegationEvent, + PolicyDecisionEvent, + AnomalyDetectionEvent, + EventType, +) + + +@pytest.mark.unit +class TestToolCallEvent: + """ToolCallEvent serialization and validation.""" + + def test_tool_call_creation(self) -> None: + """Create a valid ToolCallEvent.""" + event = ToolCallEvent( + namespace="player_abc123", + workflow_id="wf_xyz789", + user_id="user_1", + agent_name="OnboardingAgent", + tool_name="create_vendor", + tool_source="finstripe", + arguments={"name": "Acme Corp", "risk_level": 5}, + ) + + assert event.type == EventType.TOOL_CALL.value + assert event.tool_name == "create_vendor" + assert event.arguments["name"] == "Acme Corp" + assert event.namespace == "player_abc123" + + def test_tool_call_json_serialization(self) -> None: + """ToolCallEvent serializes to JSON-LD.""" + event = ToolCallEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + tool_name="tool_x", + tool_source="source_y", + arguments={"key": "value"}, + ) + + json_data = event.model_dump(by_alias=True) + assert json_data["@context"] == "https://owasp.org/aegis/v1/context.jsonld" + assert json_data["@type"] == EventType.TOOL_CALL.value + assert json_data["tool_name"] == "tool_x" + + def test_tool_call_with_description(self) -> None: + """ToolCallEvent with tool_description.""" + event = ToolCallEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + tool_name="list_vendors", + tool_source="finstripe", + tool_description="List all onboarded vendors", + ) + + assert event.tool_description == "List all onboarded vendors" + + def test_tool_call_default_timestamp(self) -> None: + """ToolCallEvent gets auto-generated timestamp.""" + event = ToolCallEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + tool_name="tool_x", + tool_source="source_y", + ) + + # Timestamp should be ISO 8601 format with Z suffix + assert event.timestamp.endswith("Z") + assert "T" in event.timestamp + + +@pytest.mark.unit +class TestToolResultEvent: + """ToolResultEvent serialization and validation.""" + + def test_tool_result_success(self) -> None: + """Create a successful ToolResultEvent.""" + event = ToolResultEvent( + namespace="player_abc123", + workflow_id="wf_xyz789", + user_id="user_1", + agent_name="OnboardingAgent", + tool_name="create_vendor", + success=True, + return_value="Vendor ID: vendor_123", + execution_time_ms=145.3, + ) + + assert event.type == EventType.TOOL_RESULT.value + assert event.success is True + assert event.execution_time_ms == 145.3 + + def test_tool_result_failure(self) -> None: + """Create a failed ToolResultEvent.""" + event = ToolResultEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + tool_name="bad_tool", + success=False, + error_message="Tool not found", + ) + + assert event.success is False + assert event.error_message == "Tool not found" + + +@pytest.mark.unit +class TestMemoryWriteEvent: + """MemoryWriteEvent for memory/context tracking.""" + + def test_memory_write_workflow_scope(self) -> None: + """Create a workflow-scoped memory write.""" + event = MemoryWriteEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + memory_key="vendor_list", + memory_scope="workflow", + value_preview="[{id: vendor_1, name: Acme}...]", + size_bytes=2048, + ) + + assert event.memory_scope == "workflow" + assert event.size_bytes == 2048 + + def test_memory_write_session_scope(self) -> None: + """Create a session-scoped memory write.""" + event = MemoryWriteEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + memory_key="chat_history", + memory_scope="session", + size_bytes=5000, + ) + + assert event.memory_scope == "session" + + def test_memory_write_long_term_scope(self) -> None: + """Create a long-term memory write.""" + event = MemoryWriteEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + memory_key="preferences", + memory_scope="long_term", + size_bytes=1024, + ) + + assert event.memory_scope == "long_term" + + +@pytest.mark.unit +class TestDelegationEvent: + """DelegationEvent for agent-to-agent delegation.""" + + def test_delegation_creation(self) -> None: + """Create a DelegationEvent.""" + event = DelegationEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="OnboardingAgent", + delegating_agent="OnboardingAgent", + delegated_agent="RiskScoringAgent", + task_summary="Score vendor risk", + delegation_scope={ + "allowed_tools": ["risk_api"], + "data_access": ["vendor_profile"], + }, + ) + + assert event.delegating_agent == "OnboardingAgent" + assert event.delegated_agent == "RiskScoringAgent" + assert "allowed_tools" in event.delegation_scope + + +@pytest.mark.unit +class TestPolicyDecisionEvent: + """PolicyDecisionEvent for policy engine decisions.""" + + def test_policy_allow_decision(self) -> None: + """Create a policy allow decision.""" + event = PolicyDecisionEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + action="allow", + rule_id="rule_least_agency", + reason="Tool within agent's allowed scope", + asi_tags=["ASI02", "ASI03"], + confidence=0.95, + ) + + assert event.action == "allow" + assert event.confidence == 0.95 + assert "ASI02" in event.asi_tags + + def test_policy_deny_decision(self) -> None: + """Create a policy deny decision.""" + event = PolicyDecisionEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + action="deny", + rule_id="rule_no_cross_vendor_access", + reason="Attempted to access vendor in different namespace", + asi_tags=["ASI06"], + confidence=1.0, + ) + + assert event.action == "deny" + assert event.confidence == 1.0 + + def test_policy_quarantine_decision(self) -> None: + """Create a policy quarantine decision.""" + event = PolicyDecisionEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + action="quarantine", + reason="Suspected malicious tool call; reviewing", + asi_tags=["ASI04", "ASI05"], + ) + + assert event.action == "quarantine" + + +@pytest.mark.unit +class TestAnomalyDetectionEvent: + """AnomalyDetectionEvent for anomaly detection.""" + + def test_anomaly_cascade_failure(self) -> None: + """Create cascade failure anomaly event.""" + event = AnomalyDetectionEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + anomaly_type="cascade_failure", + affected_agent="RiskScoringAgent", + anomaly_score=0.92, + details={"failed_calls": 5, "retry_attempts": 3}, + ) + + assert event.anomaly_type == "cascade_failure" + assert event.anomaly_score == 0.92 + + def test_anomaly_resource_exhaustion(self) -> None: + """Create resource exhaustion anomaly event.""" + event = AnomalyDetectionEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + anomaly_type="resource_exhaustion", + anomaly_score=0.78, + details={"memory_usage_mb": 4096, "token_count": 250000}, + ) + + assert event.anomaly_type == "resource_exhaustion" + + def test_anomaly_policy_violation(self) -> None: + """Create policy violation anomaly event.""" + event = AnomalyDetectionEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + anomaly_type="policy_violation", + anomaly_score=0.88, + details={"violations": ["unauthorized_tool", "cross_namespace_access"]}, + ) + + assert event.anomaly_type == "policy_violation" + + +@pytest.mark.unit +class TestEventLabelsAndSeverity: + """Test labels and severity attributes.""" + + def test_event_with_labels(self) -> None: + """Event can have custom labels.""" + event = ToolCallEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + tool_name="tool_x", + tool_source="source_y", + labels={"asi": "ASI01", "phase": "exploitation", "risk": "critical"}, + ) + + assert event.labels["asi"] == "ASI01" + assert event.labels["phase"] == "exploitation" + + def test_event_severity_levels(self) -> None: + """Event can have different severity levels.""" + for severity in ["debug", "info", "warning", "critical"]: + event = ToolCallEvent( + namespace="ns_1", + workflow_id="wf_1", + user_id="u_1", + agent_name="agent_1", + tool_name="tool_x", + tool_source="source_y", + severity=severity, + ) + assert event.severity == severity From af7c3dbd3cc35e80ece3ed81c411f67c2a239794 Mon Sep 17 00:00:00 2001 From: Jean-Regis-M <240509606@firat.edu.tr> Date: Sat, 30 May 2026 20:44:11 +0300 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=94=A7=20Fix=20critical=20test=20init?= =?UTF-8?q?ialization=20failures:=20lazy=20LLMClient=20init,=20Pydantic=20?= =?UTF-8?q?v2=20schema,=20import=20paths?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- finbot/aegis/telemetry/routes.py | 250 +++++++++++++++++++++++++++++++ finbot/aegis/telemetry/schema.py | 25 ++-- finbot/core/llm/client.py | 9 +- tests/conftest.py | 5 + 4 files changed, 273 insertions(+), 16 deletions(-) create mode 100644 finbot/aegis/telemetry/routes.py diff --git a/finbot/aegis/telemetry/routes.py b/finbot/aegis/telemetry/routes.py new file mode 100644 index 00000000..92039564 --- /dev/null +++ b/finbot/aegis/telemetry/routes.py @@ -0,0 +1,250 @@ +# ============================================================ +# File: finbot/aegis/telemetry/routes.py +# Purpose: FastAPI SSE endpoint for real-time telemetry dashboard +# Author: Jean Francois Regis MUKIZA +# GSoC Week: 3 +# OWASP Category: ASI01 (Prompt Injection), ASI10 (Insufficient Monitoring) +# ============================================================ +"""FastAPI routes for AEGIS telemetry observability. + +Exposes: +- GET /aegis/stream: Server-Sent Events (SSE) endpoint for real-time audit events +- GET /aegis/chain/{namespace}: Retrieve historical audit chain +- POST /aegis/verify: Verify audit chain integrity +""" + +import asyncio +import json +import logging +from typing import Any, AsyncGenerator + +from fastapi import APIRouter, Depends, HTTPException, Path, Query +from fastapi.responses import StreamingResponse +from redis.asyncio import Redis + +from finbot.config import settings +from finbot.core.auth.middleware import get_session_context +from finbot.core.auth.session import SessionContext +from finbot.aegis.telemetry.chain import AuditChain + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/aegis", tags=["aegis-telemetry"]) + + +def _get_audit_chain() -> AuditChain: + """Dependency: get AuditChain instance.""" + return AuditChain() + + +async def _get_redis() -> Redis: + """Dependency: get Redis connection.""" + return await Redis.from_url(settings.REDIS_URL) + + +@router.get("/stream", summary="Real-time Audit Event Stream (SSE)") +async def stream_audit_events( + session: SessionContext = Depends(get_session_context), + redis_client: Redis = Depends(_get_redis), +) -> StreamingResponse: + """Stream real-time audit events via Server-Sent Events (SSE). + + Emits audit events (tool calls, policy decisions, anomalies) as they occur. + Subscribe on client with: + + ```javascript + const eventSource = new EventSource('/aegis/stream'); + eventSource.addEventListener('tool_call', (e) => { + const event = JSON.parse(e.data); + console.log('Tool called:', event.tool_name); + }); + ``` + + Returns: + StreamingResponse with SSE MIME type (text/event-stream) + """ + + async def event_generator() -> AsyncGenerator[str, None]: + """Generate SSE events from Redis Streams.""" + chain = AuditChain(redis_client) + stream_name = "finbot:aegis:audit" + last_id = "0" # Start from beginning on connection + namespace = session.namespace + + logger.info("SSE stream opened for namespace=%s", namespace) + + try: + while True: + # Read new events from Redis Stream + # Set count=10 to batch read; timeout=100ms to avoid blocking + try: + events = await redis_client.xread( + {stream_name: last_id}, + count=10, + block=100, # milliseconds + ) + + if not events: + # Timeout; send heartbeat comment to keep connection alive + yield ": heartbeat\n\n" + await asyncio.sleep(0.1) + continue + + for stream, message_list in events: + for message_id, data in message_list: + # Only emit events for this namespace + event_namespace = data.get(b"namespace", b"").decode() + if event_namespace != namespace: + continue + + try: + event_type = data.get(b"event_type", b"unknown").decode() + event_json_str = data.get(b"event_json", b"{}").decode() + event_dict = json.loads(event_json_str) + + # Emit as SSE event with event type + yield f"event: {event_type}\n" + yield f"data: {json.dumps(event_dict)}\n\n" + + last_id = message_id + + except json.JSONDecodeError as e: + logger.warning( + "Failed to parse event JSON: %s", + e, + ) + continue + + except asyncio.TimeoutError: + # Heartbeat already sent above + pass + + except asyncio.CancelledError: + logger.info("SSE stream closed for namespace=%s", namespace) + except Exception as e: # noqa: BLE001 + logger.error( + "Error in SSE event generator: %s", + e, + exc_info=True, + ) + yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", # Disable proxy buffering + }, + ) + + +@router.get( + "/chain/{namespace}", + summary="Retrieve Historical Audit Chain", +) +async def get_audit_chain( + namespace: str = Path(..., description="Player namespace"), + start: int = Query(0, ge=0, description="Starting offset"), + count: int = Query(100, ge=1, le=1000, description="Max events to return"), + session: SessionContext = Depends(get_session_context), + chain: AuditChain = Depends(_get_audit_chain), +) -> dict[str, Any]: + """Retrieve historical audit events for a namespace. + + Args: + namespace: Namespace to retrieve (must match player's namespace) + start: Starting offset in event stream (0 = oldest) + count: Max events to return (max 1000) + + Returns: + { + "namespace": str, + "total_events": int, + "events": [ + { + "@type": "aegis.tool.call", + "tool_name": "...", + "timestamp": "2026-05-27T...", + "event_hash": "...", + ... + }, + ... + ], + "is_valid": bool, + "validation_message": str + } + """ + # Enforce namespace isolation: only retrieve own namespace + if session.namespace != namespace: + raise HTTPException( + status_code=403, + detail="Cannot access another player's audit chain", + ) + + try: + # Get events from chain + events = await chain.get_chain(namespace, start=start, count=count) + + # Verify chain integrity + is_valid, validation_msg = await chain.verify_chain(namespace) + + return { + "namespace": namespace, + "total_events": len(events), + "start_offset": start, + "events": events, + "is_valid": is_valid, + "validation_message": validation_msg, + } + except Exception as e: # noqa: BLE001 + logger.error( + "Failed to retrieve audit chain for namespace=%s: %s", + namespace, + e, + exc_info=True, + ) + raise HTTPException(status_code=500, detail="Failed to retrieve audit chain") + + +@router.post( + "/verify", + summary="Verify Audit Chain Integrity", +) +async def verify_chain( + session: SessionContext = Depends(get_session_context), + chain: AuditChain = Depends(_get_audit_chain), +) -> dict[str, Any]: + """Verify the integrity of the audit chain for the current namespace. + + Walks the chain from oldest to newest event, recomputing HMAC hashes. + Returns whether the chain has been tampered with. + + Returns: + { + "namespace": str, + "is_valid": bool, + "message": str, + "verified_at": str (ISO 8601 timestamp) + } + """ + try: + is_valid, message = await chain.verify_chain(session.namespace) + + return { + "namespace": session.namespace, + "is_valid": is_valid, + "message": message, + "verified_at": datetime.now(UTC).isoformat().replace("+00:00", "Z"), + } + except Exception as e: # noqa: BLE001 + logger.error( + "Failed to verify audit chain: %s", + e, + exc_info=True, + ) + raise HTTPException(status_code=500, detail="Failed to verify audit chain") + + +# Import datetime at the end to avoid circular imports +from datetime import UTC, datetime diff --git a/finbot/aegis/telemetry/schema.py b/finbot/aegis/telemetry/schema.py index f6e669c5..aef065ed 100644 --- a/finbot/aegis/telemetry/schema.py +++ b/finbot/aegis/telemetry/schema.py @@ -19,9 +19,9 @@ from datetime import UTC, datetime from enum import Enum -from typing import Any, Optional +from typing import Any, Literal, Optional -from pydantic import BaseModel, Field, field_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator class EventType(str, Enum): @@ -65,11 +65,9 @@ class BaseAuditEvent(BaseModel): description="Custom labels for filtering (e.g., {'asi': 'ASI01'})", ) - class Config: - """Pydantic config.""" - - populate_by_name = True - json_schema_extra = { + model_config = ConfigDict( + populate_by_name=True, + json_schema_extra={ "examples": [ { "@context": "https://owasp.org/aegis/v1/context.jsonld", @@ -86,12 +84,13 @@ class Config: } ] } + ) class ToolCallEvent(BaseAuditEvent): """Fired when an agent calls a tool (before execution).""" - type: str = Field(default=EventType.TOOL_CALL.value, alias="@type") + type: Literal[EventType.TOOL_CALL.value] = Field(default=EventType.TOOL_CALL.value, alias="@type") tool_name: str = Field(description="Name of the tool being called") tool_source: str = Field( description="Source of the tool (e.g., 'findrive', 'finmail', 'finstripe')" @@ -109,7 +108,7 @@ class ToolCallEvent(BaseAuditEvent): class ToolResultEvent(BaseAuditEvent): """Fired when a tool returns a result (after execution).""" - type: str = Field(default=EventType.TOOL_RESULT.value, alias="@type") + type: Literal[EventType.TOOL_RESULT.value] = Field(default=EventType.TOOL_RESULT.value, alias="@type") tool_name: str = Field(description="Name of the tool that was called") return_value: Optional[str] = Field( default=None, @@ -123,7 +122,7 @@ class ToolResultEvent(BaseAuditEvent): class MemoryWriteEvent(BaseAuditEvent): """Fired when an agent writes to its memory/context.""" - type: str = Field(default=EventType.MEMORY_WRITE.value, alias="@type") + type: Literal[EventType.MEMORY_WRITE.value] = Field(default=EventType.MEMORY_WRITE.value, alias="@type") memory_key: str = Field(description="Key in the memory store") memory_scope: str = Field( description="Scope: 'workflow', 'session', 'long_term'", @@ -139,7 +138,7 @@ class MemoryWriteEvent(BaseAuditEvent): class DelegationEvent(BaseAuditEvent): """Fired when an agent delegates to another agent.""" - type: str = Field(default=EventType.DELEGATION.value, alias="@type") + type: Literal[EventType.DELEGATION.value] = Field(default=EventType.DELEGATION.value, alias="@type") delegating_agent: str = Field(description="Agent that is delegating") delegated_agent: str = Field(description="Agent being delegated to") task_summary: str = Field(description="High-level task being delegated") @@ -152,7 +151,7 @@ class DelegationEvent(BaseAuditEvent): class PolicyDecisionEvent(BaseAuditEvent): """Fired when the AEGIS policy engine makes a decision.""" - type: str = Field(default=EventType.POLICY_DECISION.value, alias="@type") + type: Literal[EventType.POLICY_DECISION.value] = Field(default=EventType.POLICY_DECISION.value, alias="@type") action: str = Field( description="Decision: 'allow', 'deny', 'quarantine'", pattern="^(allow|deny|quarantine)$", @@ -174,7 +173,7 @@ class PolicyDecisionEvent(BaseAuditEvent): class AnomalyDetectionEvent(BaseAuditEvent): """Fired when an anomaly is detected in the execution flow.""" - type: str = Field(default=EventType.ANOMALY_DETECTION.value, alias="@type") + type: Literal[EventType.ANOMALY_DETECTION.value] = Field(default=EventType.ANOMALY_DETECTION.value, alias="@type") anomaly_type: str = Field( description="Type of anomaly: 'cascade_failure', 'resource_exhaustion', 'policy_violation'" ) diff --git a/finbot/core/llm/client.py b/finbot/core/llm/client.py index 461eaa80..91065902 100644 --- a/finbot/core/llm/client.py +++ b/finbot/core/llm/client.py @@ -67,9 +67,12 @@ async def chat( ) -llm_client = LLMClient() +_llm_client: LLMClient | None = None def get_llm_client() -> LLMClient: - """Get the LLM client""" - return llm_client + """Get the LLM client (lazy initialization)""" + global _llm_client + if _llm_client is None: + _llm_client = LLMClient() + return _llm_client diff --git a/tests/conftest.py b/tests/conftest.py index 370c6d97..dc70fcf2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,11 +3,16 @@ """ import concurrent.futures +import os import sys import pytest from fastapi.testclient import TestClient +# Configure test environment before importing finbot modules +# Set LLM provider to mock to avoid requiring OpenAI credentials during testing +os.environ.setdefault("LLM_PROVIDER", "mock") + from finbot.main import app # Load the Google Sheets pytest plugin