feat(query): add get_raw_metrics for SDK access to un-deduped writes#105
feat(query): add get_raw_metrics for SDK access to un-deduped writes#105asaiacai wants to merge 2 commits into
Conversation
Backend PR Trainy-ai/server-private#448 added /api/runs/metrics/raw alongside the dedup migration: most reads go through the deduped v2 tables and summaries, but power users debugging duplicate writes (resumes, re-logs, manual corrections) need to see every row that actually landed in ClickHouse, with the original write time. This is the SDK side of that endpoint. pluto.query.get_raw_metrics takes a single (run, metric) — keeping payloads bounded — and returns rows with logGroup, step, time, value, and a nonFiniteFlags bitmask (bit0=NaN, bit1=+Inf, bit2=-Inf). The bitmask is the only reliable signal of non-finite values once the payload is JSON-serialized; value is 0 when any flag is set. The server caps at 50_000 rows per request. When the cap is hit the SDK emits a UserWarning telling the caller to narrow step_min/step_max — making truncation impossible to miss in audit work without changing the return shape away from the DataFrame/list pattern get_metrics already uses. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request adds the get_raw_metrics method to the Pluto SDK, enabling the retrieval of un-deduped raw metric data. The implementation includes support for step-based filtering, response truncation warnings, and automatic conversion to pandas DataFrames. Review feedback suggests renaming the logGroup field to metric for consistency with other SDK methods and adopting a safer pattern for handling potentially null response data.
| rows = resp.get('rows', []) | ||
| if resp.get('truncated'): | ||
| warnings.warn( | ||
| f'get_raw_metrics: result truncated at {len(rows)} rows. ' | ||
| f'Narrow step_min/step_max to retrieve more.', | ||
| stacklevel=2, | ||
| ) | ||
|
|
||
| try: | ||
| import pandas as pd | ||
|
|
||
| if not rows: | ||
| return pd.DataFrame( | ||
| columns=['logGroup', 'step', 'time', 'value', 'nonFiniteFlags'] | ||
| ) | ||
| return pd.DataFrame(rows) | ||
| except ImportError: | ||
| return rows |
There was a problem hiding this comment.
Consider renaming the logGroup column to metric in the returned data for consistency with get_metrics. In the Pluto SDK, metric is the standard name for the metric identifier column. Additionally, using resp.get('rows') or [] is safer than resp.get('rows', []) as it handles cases where the server might explicitly return null for the rows key, preventing potential iteration errors.
rows = resp.get('rows') or []
if resp.get('truncated'):
warnings.warn(
f'get_raw_metrics: result truncated at {len(rows)} rows. '
f'Narrow step_min/step_max to retrieve more.',
stacklevel=2,
)
# For consistency with get_metrics, map logGroup to metric
for row in rows:
if 'logGroup' in row and 'metric' not in row:
row['metric'] = row.pop('logGroup')
try:
import pandas as pd
if not rows:
return pd.DataFrame(
columns=['metric', 'step', 'time', 'value', 'nonFiniteFlags']
)
return pd.DataFrame(rows)
except ImportError:
return rowsThere was a problem hiding this comment.
Pushing back on both:
1. logGroup → metric rename. Two different fields:
logNameis the full metric identifier (train/loss) — that's whatget_metricsrenames tometric(see pluto/query.py:247-249).logGroupis the prefix only (trainfortrain/loss) — a separate, narrower field.
Renaming logGroup → metric would put the prefix string into a column named metric, which is semantically wrong and would mislead callers. Also, metric_name is required on get_raw_metrics so every row in a response is already for the same metric — there's no need for a metric column to disambiguate (which is the actual reason get_metrics has one: it can be called for many metrics at once and merges results).
2. resp.get('rows') or []. The server endpoint's Zod schema is rows: z.array(...) — non-null by contract. The current .get('rows', []) is already more defensive than peer methods in this file (e.g. get_metrics does bare result['metrics'] indexing at pluto/query.py:241/244). Adding or [] would guard a state the response schema can't produce and would be inconsistent with the rest of the module.
Not applying either.
The four e2e tests that log a metric and immediately verify it on the server were going through pq.get_metric_names, which reads from mlop_metric_summaries_v2 — a refreshable MV with a 5-minute interval. Worst-case staleness exceeds the 60s poll window, so the check times out before the summary has refreshed even though the underlying row landed in mlop_metrics within milliseconds. (See ingest/docker-setup/ sql/07_metric_summaries_v2_refresh_mv.sql in server-private — the schema comment calls out interval + compute_time as the staleness budget.) Switch the affected sites to a new _poll_metric_present helper that reads the un-deduped mlop_metrics table directly via the new pq.get_raw_metrics method. That endpoint has no MV in its path, so write-then-read is consistent the moment run.finish() drains the sync process. The follow-up pq.get_metrics value checks already use a real-time path (mlop_metrics_v2 FINAL, fed by a non-refreshable mirror MV) and remain unchanged. Tests fixed: test_e2e_metrics_logged test_e2e_multiple_metrics_single_log test_e2e_full_lifecycle test_fork_e2e_log_metrics This does not address the underlying server-side behavior of the /metric-names endpoint — real callers (UI, MCP, SDK list flows) still see up to ~5min of staleness on freshly-logged metrics. Tracking that as a separate server-private follow-up. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Companion to server-private#448, which added
GET /api/runs/metrics/rawalongside the dedup migration. This is the SDK side:pluto.query.get_raw_metrics.get_metrics,get_statistics, MCP analysis) stay on the deduped v2 tables and pre-computed summaries — those are the right surfaces for analysis or chart rendering and they can't be skewed by historical bad writes.time. That's what this method exposes.API
metric_nameis required — every raw scan is scoped to a single(run, metric)so payloads stay bounded.nonFiniteFlagsbitmask:bit0=NaN,bit1=+Inf,bit2=-Inf. When any flag is set,valueis0and the flag is the source of truth (JSON serializes non-finite floats as null).UserWarningtelling the caller to narrowstep_min/step_max. Truncation can't be silently missed in audit work.Return shape mirrors
get_metrics:pandas.DataFramewhen pandas is installed,list[dict]otherwise.Test plan
poetry run pytest tests/test_query.py— 57 passed (9 new inTestGetRawMetrics)poetry run ruff check pluto/query.py tests/test_query.py— cleanpoetry run ruff format pluto/query.py tests/test_query.py— cleanpoetry run mypy pluto/query.py— clean🤖 Generated with Claude Code