Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,12 @@ RETRY_ATTEMPTS = 3

# number of editors
NUM_EDITORS = 3

# Telemetry (Logfire) — disabled by default for local/tests
TELEMETRY_ENABLED=false
TELEMETRY_SERVICE_NAME=akd
TELEMETRY_ENVIRONMENT=local
TELEMETRY_SAMPLE_RATE=1.0
TELEMETRY_REDACT_PARAMS=true
TELEMETRY_SEND_TO_CLOUD=false
LOGFIRE_TOKEN=pylf_
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
.envrc.ci
.env
.worktrees/
.logfire/
.deepeval/
searxng-docker/
data/cmr_collections_umm.json
Expand Down
47 changes: 27 additions & 20 deletions akd/_base/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
create_model,
)

from akd.observability import span_agent_run
from akd.utils import get_model_fields, to_snake_case

from .errors import HumanInputRequired, SchemaValidationError
Expand Down Expand Up @@ -458,16 +459,19 @@ async def arun(
logger.debug(
f"Running {self.__class__.__name__} with params: {params}",
)
output = None
try:
output = await self._arun(params, **kwargs)
output = self._validate_output(output)
except HumanInputRequired:
logger.warning(f"{self.__class__.__name__}: HumanInputRequired (flow control)")
raise
except Exception as e:
logger.error(f"Error running {self.__class__.__name__}: {e}")
raise
run_context = kwargs.get("run_context")
run_id = getattr(run_context, "run_id", None) if run_context is not None else None
with span_agent_run(self.__class__.__name__, run_id=run_id):
output = None
try:
output = await self._arun(params, **kwargs)
output = self._validate_output(output)
except HumanInputRequired:
logger.warning(f"{self.__class__.__name__}: HumanInputRequired (flow control)")
raise
except Exception as e:
logger.error(f"Error running {self.__class__.__name__}: {e}")
raise
return output

@abstractmethod
Expand Down Expand Up @@ -590,16 +594,19 @@ async def arun(
logger.debug(
f"Running {self.__class__.__name__} with params: {params}",
)
output = None
try:
output = await self._arun(params, **kwargs)
output = self._validate_output(output)
except HumanInputRequired:
logger.warning(f"{self.__class__.__name__}: HumanInputRequired (flow control)")
raise
except Exception as e:
logger.error(f"Error running {self.__class__.__name__}: {e}")
raise
run_context = kwargs.get("run_context")
run_id = getattr(run_context, "run_id", None) if run_context is not None else None
with span_agent_run(self.__class__.__name__, run_id=run_id):
output = None
try:
output = await self._arun(params, **kwargs)
output = self._validate_output(output)
except HumanInputRequired:
logger.warning(f"{self.__class__.__name__}: HumanInputRequired (flow control)")
raise
except Exception as e:
logger.error(f"Error running {self.__class__.__name__}: {e}")
raise
return output

@abstractmethod
Expand Down
15 changes: 15 additions & 0 deletions akd/_base/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from pydantic import BaseModel, ConfigDict, Field

from akd.observability import log_stream_event

from .structures import RunContext
from .tool_calling import ToolCall, ToolResult

Expand Down Expand Up @@ -390,6 +392,11 @@ async def astream(
"""
# Input validation (before any events are yielded)
params = self._validate_input(params)
# Ensure run_context has run_id for trace correlation
if run_context is None:
run_context = RunContext()
if run_context.run_id is None:
run_context.run_id = uuid.uuid4().hex[:8]

# Stream from internal implementation
async for event in self._astream(params, run_context, **kwargs):
Expand All @@ -406,6 +413,14 @@ async def astream(
)
continue
yield event
# Log lifecycle events only (skip STREAMING to avoid token volume)
if event.event_type != StreamEventType.STREAMING:
log_stream_event(
str(event.event_type),
run_id=run_context.run_id if run_context else None,
source=getattr(event, "source", None),
is_lifecycle=True,
)

async def _astream(
self,
Expand Down
42 changes: 24 additions & 18 deletions akd/_base/tool_calling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from loguru import logger
from pydantic import BaseModel, Field

from akd.observability import span_tool_execution

if TYPE_CHECKING:
from akd.tools._base import BaseTool

Expand Down Expand Up @@ -101,24 +103,28 @@ async def _execute_tool(
error=f"Unknown tool: {tool_call.tool_name}",
)

try:
input_obj = tool.input_schema(**tool_call.arguments)
result = await tool.arun(input_obj)
# Use mode='json' to ensure JSON-serializable types (HttpUrl → str, datetime → ISO string)
content = result.model_dump(mode="json") if hasattr(result, "model_dump") else result
return ToolResult(
tool_call_id=tool_call.tool_call_id,
tool_name=tool_call.tool_name,
content=content,
)
except Exception as e:
logger.exception(f"Tool '{tool_call.tool_name}' failed with args {tool_call.arguments}")
return ToolResult(
tool_call_id=tool_call.tool_call_id,
tool_name=tool_call.tool_name,
content=None,
error=str(e),
)
with span_tool_execution(
tool_call.tool_name,
tool_call.tool_call_id,
):
try:
input_obj = tool.input_schema(**tool_call.arguments)
result = await tool.arun(input_obj)
# Use mode='json' to ensure JSON-serializable types (HttpUrl → str, datetime → ISO string)
content = result.model_dump(mode="json") if hasattr(result, "model_dump") else result
return ToolResult(
tool_call_id=tool_call.tool_call_id,
tool_name=tool_call.tool_name,
content=content,
)
except Exception as e:
logger.exception(f"Tool '{tool_call.tool_name}' failed with args {tool_call.arguments}")
return ToolResult(
tool_call_id=tool_call.tool_call_id,
tool_name=tool_call.tool_name,
content=None,
error=str(e),
)

async def _execute_tools_parallel(
self,
Expand Down
38 changes: 38 additions & 0 deletions akd/configs/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,43 @@ class ModelConfigSettings(BaseSettings):
default_no_answer: str = "Answer not found"


class TelemetrySettings(BaseSettings):
"""Telemetry/observability settings (Logfire). Disabled by default for local and tests.

Env vars: TELEMETRY_ENABLED, TELEMETRY_SERVICE_NAME, TELEMETRY_ENVIRONMENT,
TELEMETRY_SAMPLE_RATE, TELEMETRY_REDACT_PARAMS, TELEMETRY_SEND_TO_CLOUD.
"""

enabled: bool = Field(
default=False,
description="Enable Logfire telemetry (spans, logs). Off by default for local/tests.",
)
service_name: str = Field(
default="akd",
description="Service name sent to Logfire.",
)
environment: str = Field(
default="local",
description="Environment label (local, dev, staging, production).",
)
sample_rate: float = Field(
default=1.0,
ge=0.0,
le=1.0,
description="Fraction of traces to sample (0.0–1.0). Use <1.0 in high-volume.",
)
redact_params: bool = Field(
default=True,
description="Redact input/params and large payloads in telemetry to avoid PII.",
)
send_to_cloud: bool = Field(
default=False,
description="Send data to Logfire cloud. If False, console/file only.",
)

model_config = SettingsConfigDict(env_prefix="TELEMETRY_", extra="forbid")


class GuardrailSettings(BaseModel):
"""Project-level guardrail defaults for the @guardrail decorator.

Expand Down Expand Up @@ -81,6 +118,7 @@ class ProjectSettings(BaseSettings):
env: Environment = Environment.LOCAL
model_config_settings: ModelConfigSettings = ModelConfigSettings()
guardrails: GuardrailSettings = GuardrailSettings()
telemetry: TelemetrySettings = Field(default_factory=TelemetrySettings)

model_config = SettingsConfigDict(
env_file=(".env", ".env.prod"),
Expand Down
21 changes: 21 additions & 0 deletions akd/observability/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Observability (Logfire) bootstrap. Call init_observability() from script entrypoints."""

from __future__ import annotations

from akd.observability._logfire import (
init_observability,
is_observability_initialized,
log_stream_event,
redact_for_telemetry,
span_agent_run,
span_tool_execution,
)

__all__ = [
"init_observability",
"is_observability_initialized",
"log_stream_event",
"redact_for_telemetry",
"span_agent_run",
"span_tool_execution",
]
Loading
Loading