Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ dependencies = [
"pydantic>=2.11.0",
"pydantic-settings>=2.9.0",
"httpx>=0.28.1",
"opentelemetry-api>=1.33.0",
"opentelemetry-sdk>=1.33.0",
"opentelemetry-exporter-otlp-proto-grpc>=1.33.0",
"opentelemetry-instrumentation-fastapi>=0.62b0",
"opentelemetry-instrumentation-httpx>=0.54b0",
"opentelemetry-instrumentation-logging>=0.54b0",
]

[project.optional-dependencies]
Expand Down
10 changes: 10 additions & 0 deletions src/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@

from src.api.routes import router as v1_router
from src.api.sessions import SessionStore
from src.observability.logging import setup_logging
from src.observability.tracing import (
instrument_fastapi,
instrument_httpx,
setup_tracing,
)

if TYPE_CHECKING:
from collections.abc import AsyncIterator
Expand All @@ -36,6 +42,10 @@ def _package_version() -> str:
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""Application lifespan: initialise process-wide services on startup."""
setup_tracing()
setup_logging()
instrument_httpx()
instrument_fastapi(app)
app.state.session_store = SessionStore()
logger.info("harness-python-react API started (v%s)", _package_version())
yield
Expand Down
54 changes: 54 additions & 0 deletions src/observability/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Structured JSON logging with OpenTelemetry trace correlation."""

from __future__ import annotations

import json
import logging
import os
import sys
from datetime import UTC, datetime


class _JSONFormatter(logging.Formatter):
"""Format log records as single-line JSON with OTel trace context."""

def format(self, record: logging.LogRecord) -> str:
entry: dict[str, str | int | float] = {
"timestamp": datetime.fromtimestamp(record.created, tz=UTC).isoformat(),
"level": record.levelname,
"module": record.module,
"message": record.getMessage(),
}

# Trace context injected by OTel logging instrumentation
entry["trace_id"] = getattr(record, "otelTraceID", "0")
entry["span_id"] = getattr(record, "otelSpanID", "0")

return json.dumps(entry, default=str)


def setup_logging(level: str | None = None) -> None:
"""Configure Python logging with structured JSON output and OTel correlation.

Reads ``LOG_LEVEL`` from the environment if *level* is not provided.

Args:
level: Logging level name (e.g. "INFO", "DEBUG"). Falls back to the
``LOG_LEVEL`` environment variable, then to ``"INFO"``.
"""
from opentelemetry.instrumentation.logging import LoggingInstrumentor

resolved_level = (level if level else os.getenv("LOG_LEVEL", "INFO")).upper()

# Instrument stdlib logging so trace/span IDs are injected
LoggingInstrumentor().instrument(set_logging_format=False)

root = logging.getLogger()
root.setLevel(resolved_level)

# Remove existing handlers to avoid duplicate output
root.handlers.clear()

handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(_JSONFormatter())
root.addHandler(handler)
84 changes: 84 additions & 0 deletions src/observability/spans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Span helper utilities and OTel semantic-convention attribute keys.

Use semconv-defined attribute names where one exists. The full GenAI registry
is at https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/
and the DB registry at https://opentelemetry.io/docs/specs/semconv/database/.

Semconv-stable attribute keys live as module constants so a typo at the
call site is a NameError, not a silently-different attribute.
"""

from __future__ import annotations

from contextlib import contextmanager
from typing import TYPE_CHECKING

from opentelemetry import trace

if TYPE_CHECKING:
from collections.abc import Iterator, Mapping

# ---------------------------------------------------------------------------
# GenAI semantic convention attribute keys
# https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/
# ---------------------------------------------------------------------------

# GenAI — conversation & request
GENAI_CONVERSATION_ID: str = "gen_ai.conversation.id"
GENAI_REQUEST_MODEL: str = "gen_ai.request.model"
GENAI_OPERATION_NAME: str = "gen_ai.operation.name"
GENAI_PROVIDER_NAME: str = "gen_ai.provider.name"

# GenAI — usage (token counts)
GENAI_USAGE_INPUT_TOKENS: str = "gen_ai.usage.input_tokens"
GENAI_USAGE_OUTPUT_TOKENS: str = "gen_ai.usage.output_tokens"

# GenAI — tool calling
GENAI_TOOL_NAME: str = "gen_ai.tool.name"
GENAI_TOOL_CALL_ARGUMENTS: str = "gen_ai.tool.call.arguments"
GENAI_TOOL_CALL_RESULT: str = "gen_ai.tool.call.result"

# DB semantic convention attribute keys
# https://opentelemetry.io/docs/specs/semconv/database/
DB_QUERY_TEXT: str = "db.query.text"
DB_RESPONSE_RETURNED_ROWS: str = "db.response.returned_rows"


# ---------------------------------------------------------------------------
# Span helpers
# ---------------------------------------------------------------------------


@contextmanager
def agent_span(
name: str,
attributes: Mapping[str, str | int | float | bool] | None = None,
) -> Iterator[trace.Span]:
"""Create a span and yield it for further mutation.

Args:
name: Human-readable span name.
attributes: Initial key-value attributes to set on the span.
"""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(name) as span:
if attributes:
for key, value in attributes.items():
span.set_attribute(key, value)
yield span


def set_span_attributes(
span: trace.Span,
**kwargs: str | int | float | bool | None,
) -> None:
"""Set multiple attributes on a span, filtering ``None`` values.

Args:
span: The span to annotate.
**kwargs: Attribute key-value pairs. ``None`` values are silently
skipped.
"""
for key, value in kwargs.items():
if value is not None:
span.set_attribute(key, value)
75 changes: 75 additions & 0 deletions src/observability/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Tracer provider setup, exporters, and auto-instrumentation."""

from __future__ import annotations

import os
from typing import TYPE_CHECKING

from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
)

if TYPE_CHECKING:
from fastapi import FastAPI


def setup_tracing() -> TracerProvider:
"""Create and configure the global tracer provider.

Reads configuration from environment variables:

- ``OTEL_SERVICE_NAME`` — Resource ``service.name`` attribute.
Default ``harness-python-react``.
- ``OTEL_EXPORTER`` — ``otlp`` (default) or ``console``.
- ``OTEL_EXPORTER_OTLP_ENDPOINT`` — gRPC endpoint for OTLP exporter
(default ``http://localhost:4317``;
Jaeger via docker-compose).

Returns the configured TracerProvider.
"""
service_name = os.getenv("OTEL_SERVICE_NAME", "harness-python-react")
resource = Resource.create({"service.name": service_name})
provider = TracerProvider(resource=resource)

exporter_type = os.getenv("OTEL_EXPORTER", "otlp")

if exporter_type == "console":
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
else:
endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)

exporter = OTLPSpanExporter(endpoint=endpoint, insecure=True)
provider.add_span_processor(BatchSpanProcessor(exporter))

trace.set_tracer_provider(provider)
return provider


def get_tracer(name: str) -> trace.Tracer:
"""Get a tracer from the global provider.

Args:
name: Logical name for the tracer, typically the module path.
"""
return trace.get_tracer(name)


def instrument_fastapi(app: FastAPI) -> None:
"""Apply OpenTelemetry auto-instrumentation to a FastAPI application."""
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

FastAPIInstrumentor.instrument_app(app)


def instrument_httpx() -> None:
"""Apply OpenTelemetry auto-instrumentation to httpx HTTP clients."""
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

HTTPXClientInstrumentor().instrument()
124 changes: 124 additions & 0 deletions tests/test_observability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Tests for src/observability/ — tracing, logging, span helpers."""

from __future__ import annotations

import json
import logging
from io import StringIO

import pytest
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)

from src.observability.logging import _JSONFormatter, setup_logging
from src.observability.spans import (
GENAI_CONVERSATION_ID,
GENAI_OPERATION_NAME,
GENAI_REQUEST_MODEL,
GENAI_TOOL_NAME,
agent_span,
set_span_attributes,
)


@pytest.fixture()
def exporter() -> InMemorySpanExporter:
"""Attach an in-memory span exporter to the (possibly pre-existing)
global tracer provider. Clears captured spans on entry so each test
sees a clean slate even when OTel's provider override has already
been set by a prior test."""
captured = InMemorySpanExporter()
provider = trace.get_tracer_provider()
if not isinstance(provider, TracerProvider):
provider = TracerProvider()
trace.set_tracer_provider(provider)
provider.add_span_processor(SimpleSpanProcessor(captured))
captured.clear()
return captured


def test_agent_span_records(exporter: InMemorySpanExporter) -> None:
with agent_span("op", attributes={GENAI_OPERATION_NAME: "echo"}):
pass

spans = exporter.get_finished_spans()
assert len(spans) == 1
assert spans[0].name == "op"
attrs = dict(spans[0].attributes or {})
assert attrs[GENAI_OPERATION_NAME] == "echo"


def test_set_span_attributes_skips_none(exporter: InMemorySpanExporter) -> None:
with agent_span("op") as span:
set_span_attributes(
span,
**{
GENAI_REQUEST_MODEL: "gpt-4o-mini",
GENAI_CONVERSATION_ID: None,
GENAI_TOOL_NAME: "echo_tool",
},
)

spans = exporter.get_finished_spans()
attrs = dict(spans[0].attributes or {})
assert attrs[GENAI_REQUEST_MODEL] == "gpt-4o-mini"
assert attrs[GENAI_TOOL_NAME] == "echo_tool"
assert GENAI_CONVERSATION_ID not in attrs


def test_semconv_attributes_have_dotted_names() -> None:
"""Sanity check: every exported semconv key uses the official dotted form."""
for key in (
GENAI_CONVERSATION_ID,
GENAI_REQUEST_MODEL,
GENAI_OPERATION_NAME,
GENAI_TOOL_NAME,
):
assert key.startswith("gen_ai.")


def test_json_formatter_emits_trace_and_span_ids() -> None:
formatter = _JSONFormatter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=1,
msg="hello %s",
args=("world",),
exc_info=None,
)
record.otelTraceID = "abc123"
record.otelSpanID = "def456"

payload = json.loads(formatter.format(record))
assert payload["message"] == "hello world"
assert payload["trace_id"] == "abc123"
assert payload["span_id"] == "def456"


def test_setup_logging_attaches_json_handler(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("LOG_LEVEL", "DEBUG")
setup_logging()

root = logging.getLogger()
assert root.level == logging.DEBUG
assert any(isinstance(h.formatter, _JSONFormatter) for h in root.handlers)

# Round-trip: emit a log record and assert the JSON is single-line valid.
handler = next(h for h in root.handlers if isinstance(h.formatter, _JSONFormatter))
assert isinstance(handler, logging.StreamHandler)
buf = StringIO()
handler.setStream(buf)
root.info("ping")
line = buf.getvalue().strip()
assert "\n" not in line
parsed = json.loads(line)
assert parsed["message"] == "ping"
assert parsed["level"] == "INFO"
Loading
Loading