Skip to content

Commit f5e9908

Browse files
feat(extra): better OTEL instrumentation (#407)
* feat(extra): better OTEL instrumentation * chore: adapt to v2 * chore(deps): adapt opentelemetry deps Will probably be merged before in #406 * fix: use public httpx content API * docs: add some TODOs * chore: remove custom mistral ai total tokens span attribute * chore(extra): remove custom operation id span attribute
1 parent 809234f commit f5e9908

File tree

9 files changed

+3130
-216
lines changed

9 files changed

+3130
-216
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ dependencies = [
1212
"python-dateutil >=2.8.2",
1313
"typing-inspection >=0.4.0",
1414
"opentelemetry-api (>=1.33.1,<2.0.0)",
15-
"opentelemetry-semantic-conventions (>=0.59b0,<0.61)",
15+
"opentelemetry-semantic-conventions (>=0.60b1,<0.61)",
1616
]
1717

1818
[project.optional-dependencies]
@@ -37,6 +37,7 @@ dev = [
3737
"invoke>=2.2.0,<3",
3838
"pyyaml>=6.0.2,<7",
3939
"mypy==1.15.0",
40+
"opentelemetry-sdk (>=1.33.1,<2.0.0)",
4041
"pylint==3.2.3",
4142
"pytest>=8.2.2,<9",
4243
"pytest-asyncio>=0.23.7,<0.24",

src/mistralai/extra/observability/otel.py

Lines changed: 469 additions & 188 deletions
Large diffs are not rendered by default.
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
"""Serialization helpers for converting Mistral API payloads to OTEL GenAI convention formats.
2+
3+
These are pure functions with no OTEL dependencies — they transform dicts to JSON strings
4+
matching the GenAI semantic convention schemas for input/output messages and tool definitions.
5+
6+
Schemas:
7+
- Input messages: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-input-messages.json
8+
- Output messages: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-output-messages.json
9+
- Tool definitions: https://github.com/Cirilla-zmh/semantic-conventions/blob/cc4d07e7e56b80e9aa5904a3d524c134699da37f/docs/gen-ai/gen-ai-tool-definitions.json
10+
"""
11+
12+
import json
13+
from typing import Any
14+
15+
16+
def _content_to_parts(content) -> list[dict]:
17+
"""Convert Mistral message content to OTEL parts array.
18+
19+
Mistral content is either a string or an array of content chunks.
20+
"""
21+
if content is None:
22+
return []
23+
if isinstance(content, str):
24+
return [{"type": "text", "content": content}]
25+
# Content chunks array — map known Mistral types to OTEL part types
26+
parts = []
27+
for chunk in content:
28+
if isinstance(chunk, str):
29+
parts.append({"type": "text", "content": chunk})
30+
elif isinstance(chunk, dict):
31+
chunk_type = chunk.get("type", "")
32+
if chunk_type == "text":
33+
parts.append({"type": "text", "content": chunk.get("text", "")})
34+
elif chunk_type == "thinking":
35+
thinking = chunk.get("thinking", "")
36+
if isinstance(thinking, list):
37+
text_parts = [
38+
sub.get("text", "")
39+
for sub in thinking
40+
if isinstance(sub, dict) and sub.get("type") == "text"
41+
]
42+
content_str = "\n".join(text_parts)
43+
else: # Fallback
44+
content_str = str(thinking)
45+
parts.append({"type": "reasoning", "content": content_str})
46+
elif chunk_type == "image_url":
47+
url = chunk.get("image_url", {})
48+
uri = url.get("url", "") if isinstance(url, dict) else str(url)
49+
parts.append({"type": "uri", "modality": "image", "uri": uri})
50+
else:
51+
# Catch-all for other content chunk types
52+
parts.append({"type": chunk_type})
53+
return parts
54+
55+
56+
def _tool_calls_to_parts(tool_calls: list[dict] | None) -> list[dict]:
57+
"""Convert Mistral tool_calls to OTEL ToolCallRequestPart entries."""
58+
if not tool_calls:
59+
return []
60+
parts = []
61+
for tc in tool_calls:
62+
func = tc.get("function", {}) or {}
63+
part: dict = {
64+
"type": "tool_call",
65+
"name": func.get("name", ""),
66+
}
67+
if (tc_id := tc.get("id")) is not None:
68+
part["id"] = tc_id
69+
if (arguments := func.get("arguments")) is not None:
70+
part["arguments"] = arguments
71+
parts.append(part)
72+
return parts
73+
74+
75+
def serialize_input_message(message: dict[str, Any]) -> str:
76+
"""Serialize a single input message per the OTEL GenAI convention.
77+
78+
Schema: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-input-messages.json
79+
ChatMessage: {role (required), parts (required), name?}
80+
81+
Conversation entry objects (e.g. function.result) don't carry a "role"
82+
field — they are detected via their "type" and mapped to the closest
83+
OTEL role.
84+
"""
85+
entry_type = message.get("type")
86+
87+
# Conversation entry: function.result → OTEL tool role
88+
if entry_type == "function.result":
89+
part: dict = {"type": "tool_call_response", "response": message.get("result")}
90+
if (tool_call_id := message.get("tool_call_id")) is not None:
91+
part["id"] = tool_call_id
92+
return json.dumps({"role": "tool", "parts": [part]})
93+
94+
# TODO: may need to handle other types for conversations (e.g. agent handoff)
95+
96+
role = message.get("role", "unknown")
97+
parts: list[dict] = []
98+
99+
if role == "tool":
100+
# Tool messages are responses to tool calls
101+
tool_part: dict = {
102+
"type": "tool_call_response",
103+
"response": message.get("content"),
104+
}
105+
if (tool_call_id := message.get("tool_call_id")) is not None:
106+
tool_part["id"] = tool_call_id
107+
parts.append(tool_part)
108+
else:
109+
parts.extend(_content_to_parts(message.get("content")))
110+
parts.extend(_tool_calls_to_parts(message.get("tool_calls")))
111+
112+
return json.dumps({"role": role, "parts": parts})
113+
114+
115+
def serialize_output_message(choice: dict[str, Any]) -> str:
116+
"""Serialize a single output choice/message per the OTEL GenAI convention.
117+
118+
Schema: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-output-messages.json
119+
OutputMessage: {role (required), parts (required), finish_reason (required), name?}
120+
"""
121+
message = choice.get("message", {}) or {}
122+
parts: list[dict] = []
123+
parts.extend(_content_to_parts(message.get("content")))
124+
parts.extend(_tool_calls_to_parts(message.get("tool_calls")))
125+
126+
return json.dumps(
127+
{
128+
"role": message.get("role", "assistant"),
129+
"parts": parts,
130+
"finish_reason": choice.get("finish_reason", ""),
131+
}
132+
)
133+
134+
135+
def serialize_tool_definition(tool: dict[str, Any]) -> str | None:
136+
"""Flatten a Mistral tool definition to the OTEL GenAI convention schema.
137+
138+
Mistral format: {"type": "function", "function": {"name": ..., "description": ..., "parameters": ...}}
139+
OTEL format: {"type": "function", "name": ..., "description": ..., "parameters": ...}
140+
141+
Schema, still under review: https://github.com/Cirilla-zmh/semantic-conventions/blob/cc4d07e7e56b80e9aa5904a3d524c134699da37f/docs/gen-ai/gen-ai-tool-definitions.json
142+
"""
143+
# Early exit conditions: only functions supported for now, and name is required
144+
type = tool.get("type", "function")
145+
func = tool.get("function")
146+
if not func:
147+
return None
148+
name = func.get("name")
149+
if not name:
150+
return None
151+
serialized: dict = {"type": type, "name": name}
152+
if (description := func.get("description")) is not None:
153+
serialized["description"] = description
154+
if (parameters := func.get("parameters")) is not None:
155+
serialized["parameters"] = parameters
156+
return json.dumps(serialized)
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""Streaming response helpers for OTEL tracing.
2+
3+
Pure functions that parse SSE byte streams and accumulate CompletionChunk
4+
deltas into a ChatCompletionResponse-shaped dict suitable for span enrichment.
5+
6+
TODO: supports chat and agent completion streaming endpoints. Evolutions will
7+
be necessary to support other streaming endpoints (e.g. conversations).
8+
9+
NOTE: The SSE bytes are re-parsed here even though EventStream already
10+
parsed them during iteration.
11+
TracedResponse sits below EventStream and can only accumulate raw bytes; it
12+
has no access to the decoded events. Hooking into EventStream could eliminate
13+
this double-parse, but EventStream is Speakeasy-generated code.
14+
"""
15+
16+
from typing import Any
17+
18+
from mistralai.client.models import CompletionChunk, UsageInfo
19+
20+
21+
def parse_sse_chunks(raw_sse_bytes: bytes) -> list[CompletionChunk]:
22+
"""Parse raw SSE bytes into a list of typed CompletionChunk models.
23+
24+
Only CompletionChunk is handled. If new SSE-streamed response types
25+
are added, parsing and typing here will need updating.
26+
"""
27+
chunks: list[CompletionChunk] = []
28+
text = raw_sse_bytes.decode("utf-8", errors="replace")
29+
for line in text.split("\n"):
30+
line = line.strip()
31+
if not line.startswith("data: "):
32+
continue
33+
payload = line[6:]
34+
if payload == "[DONE]":
35+
continue
36+
try:
37+
chunks.append(CompletionChunk.model_validate_json(payload))
38+
except Exception:
39+
continue
40+
return chunks
41+
42+
43+
def accumulate_chunks_to_response_dict(
44+
chunks: list[CompletionChunk],
45+
) -> dict[str, Any]:
46+
"""Accumulate streaming CompletionChunk deltas into a ChatCompletionResponse-shaped dict."""
47+
response_id: str | None = None
48+
model: str | None = None
49+
usage: UsageInfo | None = None
50+
choices: dict[int, dict[str, Any]] = {}
51+
52+
for chunk in chunks:
53+
response_id = response_id or chunk.id
54+
model = model or chunk.model
55+
usage = usage or chunk.usage
56+
57+
for choice in chunk.choices:
58+
accumulated = choices.setdefault(
59+
choice.index,
60+
{
61+
"message": {"role": "assistant", "content": ""},
62+
"finish_reason": "",
63+
},
64+
)
65+
msg = accumulated["message"]
66+
delta = choice.delta
67+
if isinstance(delta.role, str):
68+
msg["role"] = delta.role
69+
if isinstance(delta.content, str) and delta.content:
70+
msg["content"] += delta.content
71+
if isinstance(choice.finish_reason, str):
72+
accumulated["finish_reason"] = choice.finish_reason
73+
if isinstance(delta.tool_calls, list):
74+
tc_list = msg.setdefault("tool_calls", [])
75+
for tc in delta.tool_calls:
76+
tc_idx = tc.index if tc.index is not None else len(tc_list)
77+
while len(tc_list) <= tc_idx:
78+
tc_list.append(
79+
{"id": None, "function": {"name": "", "arguments": ""}}
80+
)
81+
# ToolCall.id defaults to the string "null" (Speakeasy codegen quirk)
82+
if tc.id is not None and tc.id != "null":
83+
tc_list[tc_idx]["id"] = tc.id
84+
if tc.function.name:
85+
tc_list[tc_idx]["function"]["name"] += tc.function.name
86+
if isinstance(tc.function.arguments, str) and tc.function.arguments:
87+
tc_list[tc_idx]["function"]["arguments"] += (
88+
tc.function.arguments
89+
)
90+
91+
result: dict[str, Any] = {
92+
"id": response_id,
93+
"model": model,
94+
"choices": [choices[idx] for idx in sorted(choices)],
95+
}
96+
if usage is not None:
97+
result["usage"] = usage.model_dump(mode="json", by_alias=True)
98+
return result

src/mistralai/extra/run/tools.py

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,28 @@
88
import opentelemetry.semconv._incubating.attributes.gen_ai_attributes as gen_ai_attributes
99
from griffe import (
1010
Docstring,
11-
DocstringSectionKind,
12-
DocstringSectionText,
1311
DocstringParameter,
1412
DocstringSection,
13+
DocstringSectionKind,
14+
DocstringSectionText,
1515
)
1616
from opentelemetry import trace
17+
from opentelemetry.trace import Status, StatusCode
1718
from pydantic import Field, create_model
1819
from pydantic.fields import FieldInfo
1920

20-
from mistralai.extra.exceptions import RunException
21-
from mistralai.extra.mcp.base import MCPClientProtocol
22-
from mistralai.extra.observability.otel import GenAISpanEnum, MistralAIAttributes, set_available_attributes
23-
from mistralai.extra.run.result import RunOutputEntries
2421
from mistralai.client.models import (
25-
FunctionResultEntry,
26-
FunctionTool,
2722
Function,
2823
FunctionCallEntry,
24+
FunctionResultEntry,
25+
FunctionTool,
2926
)
30-
27+
from mistralai.extra.exceptions import RunException
28+
from mistralai.extra.mcp.base import MCPClientProtocol
29+
from mistralai.extra.observability.otel import (
30+
set_available_attributes,
31+
)
32+
from mistralai.extra.run.result import RunOutputEntries
3133

3234
logger = logging.getLogger(__name__)
3335

@@ -193,22 +195,35 @@ async def create_function_result(
193195
else function_call.arguments
194196
)
195197
tracer = trace.get_tracer(__name__)
196-
with tracer.start_as_current_span(GenAISpanEnum.function_call(function_call.name)) as span:
198+
with tracer.start_as_current_span(
199+
f"{gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value} {function_call.name}"
200+
) as span:
201+
# Always record identity attributes so the span is useful even on error
202+
function_call_attributes = {
203+
gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value,
204+
gen_ai_attributes.GEN_AI_PROVIDER_NAME: gen_ai_attributes.GenAiProviderNameValues.MISTRAL_AI.value,
205+
gen_ai_attributes.GEN_AI_TOOL_CALL_ID: function_call.id,
206+
gen_ai_attributes.GEN_AI_TOOL_CALL_ARGUMENTS: function_call.arguments
207+
if isinstance(function_call.arguments, str)
208+
else json.dumps(function_call.arguments),
209+
gen_ai_attributes.GEN_AI_TOOL_NAME: function_call.name,
210+
gen_ai_attributes.GEN_AI_TOOL_TYPE: "function",
211+
}
212+
set_available_attributes(span, function_call_attributes)
197213
try:
198214
if isinstance(run_tool, RunFunction):
199215
res = run_tool.callable(**arguments)
200216
elif isinstance(run_tool, RunCoroutine):
201217
res = await run_tool.awaitable(**arguments)
202218
elif isinstance(run_tool, RunMCPTool):
203-
res = await run_tool.mcp_client.execute_tool(function_call.name, arguments)
204-
function_call_attributes = {
205-
gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value,
206-
gen_ai_attributes.GEN_AI_TOOL_CALL_ID: function_call.id,
207-
MistralAIAttributes.MISTRAL_AI_TOOL_CALL_ARGUMENTS: str(function_call.arguments),
208-
gen_ai_attributes.GEN_AI_TOOL_NAME: function_call.name
209-
}
210-
set_available_attributes(span, function_call_attributes)
219+
res = await run_tool.mcp_client.execute_tool(
220+
function_call.name, arguments
221+
)
222+
result_str = res if isinstance(res, str) else json.dumps(res)
223+
span.set_attribute(gen_ai_attributes.GEN_AI_TOOL_CALL_RESULT, result_str)
211224
except Exception as e:
225+
span.record_exception(e)
226+
span.set_status(Status(StatusCode.ERROR, str(e)))
212227
if continue_on_fn_error is True:
213228
return FunctionResultEntry(
214229
tool_call_id=function_call.tool_call_id,
@@ -219,8 +234,7 @@ async def create_function_result(
219234
) from e
220235

221236
return FunctionResultEntry(
222-
tool_call_id=function_call.tool_call_id,
223-
result=res if isinstance(res, str) else json.dumps(res),
237+
tool_call_id=function_call.tool_call_id, result=result_str
224238
)
225239

226240

0 commit comments

Comments
 (0)