diff --git a/.gitignore b/.gitignore
index dfa829df..cbba33c1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -69,3 +69,6 @@ coverage.xml
# version file generated by setuptools_scm
src/instruments/_version.py
+doc/examples/artifacts/
+InstrumentKit_1.zip
+InstrumentKit_2.zip
diff --git a/doc/examples/_owon_capture_common.py b/doc/examples/_owon_capture_common.py
new file mode 100644
index 00000000..bb61f7ed
--- /dev/null
+++ b/doc/examples/_owon_capture_common.py
@@ -0,0 +1,636 @@
+#!/usr/bin/env python
+"""
+Shared helpers for OWON DOS1104 example capture scripts.
+"""
+
+from __future__ import annotations
+
+import csv
+from dataclasses import dataclass
+from datetime import datetime, timezone
+import json
+from pathlib import Path
+from typing import Any
+import time
+
+import serial
+
+import instruments as ik
+import instruments.owon.sds1104 as owon_sds1104
+from instruments.units import ureg as u
+
+
+def _timestamp():
+ return datetime.now(timezone.utc)
+
+
+def _artifact_dir(root, label, timestamp=None):
+ timestamp = _timestamp() if timestamp is None else timestamp
+ date_dir = Path(root) / timestamp.strftime("%Y%m%d")
+ return date_dir / f"{timestamp.strftime('%Y%m%d_%H%M%S')}_{label}"
+
+
+def _clean_reply(text):
+ cleaned = str(text).strip()
+ if cleaned.endswith("->"):
+ cleaned = cleaned[:-2].rstrip()
+ return cleaned
+
+
+def _format_token(value, base_unit, units):
+ quantity = u.Quantity(value, base_unit) if not hasattr(value, "to") else value
+ quantity = quantity.to(base_unit)
+ for scale, suffix in units:
+ scaled = quantity.to(scale).magnitude
+ if abs(scaled) >= 1 or suffix == units[-1][1]:
+ text = f"{scaled:.12f}".rstrip("0").rstrip(".")
+ if text == "-0":
+ text = "0"
+ return f"{text}{suffix}"
+ raise ValueError(f"Could not format token for {value!r}")
+
+
+def _format_time_token(value):
+ return _format_token(
+ value,
+ u.second,
+ [
+ (u.microsecond, "us"),
+ (u.millisecond, "ms"),
+ (u.second, "s"),
+ ],
+ )
+
+
+def _format_voltage_token(value):
+ return _format_token(
+ value,
+ u.volt,
+ [
+ (u.millivolt, "mV"),
+ (u.volt, "V"),
+ ],
+ )
+
+
+@dataclass
+class WaveformSummary:
+ channel: int
+ sample_count: int
+ time_start_s: float
+ time_end_s: float
+ voltage_min_v: float
+ voltage_max_v: float
+ voltage_pp_v: float
+ csv_path: str
+
+
+@dataclass
+class RawWaveformSummary:
+ channel: int
+ sample_count: int
+ raw_min: int
+ raw_max: int
+ raw_first_16: list[int]
+ metadata_scale: str
+ metadata_probe: str
+ metadata_offset: Any
+
+
+@dataclass
+class TraceContext:
+ scpi_trace_path: str | None
+ state_trace_path: str | None
+ state_probe_style: str = "full"
+ state_seq: int = 0
+
+
+def _format_sample_rate_text(x_values):
+ if len(x_values) < 2:
+ return "unknown"
+ dt = abs(x_values[1] - x_values[0])
+ if dt <= 0:
+ return "unknown"
+ sample_rate = 1.0 / dt
+
+ def _fmt(value):
+ return f"{value:.6f}".rstrip("0").rstrip(".")
+
+ if sample_rate >= 0.9995e9:
+ return f"{_fmt(sample_rate / 1e9)}GS/s"
+ if sample_rate >= 0.9995e6:
+ return f"{_fmt(sample_rate / 1e6)}MS/s"
+ if sample_rate >= 0.9995e3:
+ return f"{_fmt(sample_rate / 1e3)}kS/s"
+ return f"{_fmt(sample_rate)}S/s"
+
+
+def _write_jsonl(path, payload):
+ if path is None:
+ return
+ with Path(path).open("a", encoding="utf-8") as handle:
+ handle.write(json.dumps(payload, ensure_ascii=True) + "\n")
+
+
+def _make_trace_context(out_dir, args):
+ trace_root = Path(args.trace_dir) / out_dir.name if args.trace_dir else out_dir
+ trace_root.mkdir(parents=True, exist_ok=True)
+ return TraceContext(
+ scpi_trace_path=(
+ str(trace_root / "scpi_trace.jsonl") if args.trace_scpi else None
+ ),
+ state_trace_path=(
+ str(trace_root / "state_trace.jsonl") if args.trace_state else None
+ ),
+ state_probe_style=args.state_probe_style,
+ )
+
+
+def _metadata_sample_rate_text(metadata):
+ sample = metadata.get("SAMPLE") if isinstance(metadata, dict) else None
+ if not isinstance(sample, dict):
+ return None
+ raw = sample.get("SAMPLERATE")
+ if raw in {None, ""}:
+ return None
+ return _clean_reply(str(raw)).replace("(", "").replace(")", "")
+
+
+def _open_scope(args):
+ deadline = time.monotonic() + args.scope_open_retry_s
+ last_exc = None
+ while time.monotonic() < deadline:
+ try:
+ strict_open = bool(getattr(args, "strict_open_scpi", False))
+ return ik.owon.OWONSDS1104.open_usb(
+ vid=int(args.scope_vid, 0),
+ pid=int(args.scope_pid, 0),
+ timeout=args.scope_timeout_s * u.second,
+ enable_scpi=(args.enable_scpi or strict_open),
+ ignore_scpi_failure=not strict_open,
+ settle_time=args.scope_settle_s,
+ )
+ except Exception as exc: # pragma: no cover - bench retry path
+ last_exc = exc
+ time.sleep(0.5)
+ raise RuntimeError(f"Scope open failed: {last_exc}")
+
+
+def _open_serial(args):
+ return serial.Serial(
+ port=args.esp_port,
+ baudrate=args.esp_baud,
+ timeout=0.2,
+ write_timeout=0.2,
+ )
+
+
+def _configure_esp_from_args(args):
+ with _open_serial(args) as port:
+ return _configure_esp(port, args)
+
+
+def _serial_write_line(port, line):
+ port.write((line + "\n").encode("utf-8"))
+ port.flush()
+
+
+def _serial_drain(port, duration_s):
+ deadline = time.monotonic() + max(duration_s, 0.0)
+ lines = []
+ while time.monotonic() < deadline:
+ raw = port.readline()
+ if raw:
+ lines.append(raw.decode("utf-8", errors="replace").rstrip())
+ else:
+ time.sleep(0.01)
+ return lines
+
+
+def _configure_esp(port, args):
+ _serial_write_line(port, "status")
+ if args.pulse_mode == "burst":
+ _serial_write_line(port, "burst")
+ else:
+ _serial_write_line(port, f"single {int(args.pulse_width_us)}")
+ _serial_write_line(port, f"gap {int(args.pulse_gap_us)}")
+ _serial_write_line(port, f"frame {int(args.pulse_frame_us)}")
+ _serial_write_line(port, f"half {int(args.slope_half_period_us)}")
+ return _serial_drain(port, 0.5)
+
+
+def _capture_screenshot(scope, out_dir, args):
+ bmp_path = out_dir / f"{args.profile}_{args.arm}_scope_screen.bmp"
+ bmp_path.write_bytes(scope.read_screen_bmp())
+ return bmp_path
+
+
+def _capture_waveform(scope, channel, out_dir):
+ x_axis, y_axis = scope.read_waveform(channel)
+ csv_path = out_dir / f"ch{channel}_waveform.csv"
+
+ x_values = [float(value) for value in x_axis]
+ y_values = [float(value) for value in y_axis]
+
+ with csv_path.open("w", encoding="utf-8", newline="") as handle:
+ writer = csv.writer(handle)
+ writer.writerow(["time_s", "voltage_v"])
+ writer.writerows(zip(x_values, y_values, strict=True))
+
+ v_min = min(y_values)
+ v_max = max(y_values)
+ return WaveformSummary(
+ channel=channel,
+ sample_count=len(y_values),
+ time_start_s=x_values[0],
+ time_end_s=x_values[-1],
+ voltage_min_v=v_min,
+ voltage_max_v=v_max,
+ voltage_pp_v=v_max - v_min,
+ csv_path=str(csv_path),
+ )
+
+
+def _capture_screen_channel_raw(scope, metadata, channel, out_dir):
+ point_count = scope._waveform_point_count(
+ metadata
+ ) # pylint: disable=protected-access
+ payload = scope._binary_query_exact( # pylint: disable=protected-access
+ f":DATA:WAVE:SCREen:CH{channel}?", 4 + 2 * point_count
+ )
+ raw_adc = owon_sds1104._parse_waveform_adc( # pylint: disable=protected-access
+ owon_sds1104._strip_packet_prefix(
+ payload, f"screen waveform CH{channel}"
+ ), # pylint: disable=protected-access
+ f"screen waveform CH{channel}",
+ )
+ x_axis = scope._waveform_time_axis(
+ metadata, point_count
+ ) # pylint: disable=protected-access
+ y_axis = scope._waveform_voltage_axis(
+ metadata, channel, raw_adc
+ ) # pylint: disable=protected-access
+ csv_path = out_dir / f"ch{channel}_waveform.csv"
+
+ x_values = [float(value) for value in x_axis]
+ y_values = [float(value) for value in y_axis]
+
+ with csv_path.open("w", encoding="utf-8", newline="") as handle:
+ writer = csv.writer(handle)
+ writer.writerow(["time_s", "voltage_v"])
+ writer.writerows(zip(x_values, y_values, strict=True))
+
+ v_min = min(y_values)
+ v_max = max(y_values)
+ summary = WaveformSummary(
+ channel=channel,
+ sample_count=len(y_values),
+ time_start_s=x_values[0],
+ time_end_s=x_values[-1],
+ voltage_min_v=v_min,
+ voltage_max_v=v_max,
+ voltage_pp_v=v_max - v_min,
+ csv_path=str(csv_path),
+ )
+ return summary, {"x": x_values, "y": y_values}
+
+
+def _capture_depmem_all_summary_and_series(scope, out_dir):
+ raw_payload = scope.read_deep_memory_all_raw()
+ raw_path = out_dir / "depmem_all_raw.bin"
+ raw_path.write_bytes(raw_payload)
+ capture = scope._parse_deep_memory_all_payload(
+ raw_payload
+ ) # pylint: disable=protected-access
+ summary = {
+ "metadata": {
+ "timebase_scale": capture.metadata.get("TIMEBASE", {}).get("SCALE"),
+ "hoffset": capture.metadata.get("TIMEBASE", {}).get("HOFFSET"),
+ "datalen": capture.metadata.get("SAMPLE", {}).get("DATALEN"),
+ "sample_rate": capture.metadata.get("SAMPLE", {}).get("SAMPLERATE"),
+ "depmem": capture.metadata.get("SAMPLE", {}).get("DEPMEM"),
+ "screenoffset": capture.metadata.get("SAMPLE", {}).get("SCREENOFFSET"),
+ "runstatus": capture.metadata.get("RUNSTATUS"),
+ },
+ "channels": {
+ f"CH{channel}": {
+ "samples": len(raw_values),
+ "raw_min": int(min(raw_values)),
+ "raw_max": int(max(raw_values)),
+ "first_16": [int(value) for value in list(raw_values)[:16]],
+ }
+ for channel, raw_values in capture.raw_channels.items()
+ },
+ }
+ summary_path = out_dir / "depmem_all_summary.json"
+ _safe_json_write(summary_path, summary)
+
+ series = {}
+ for channel, raw_values in capture.raw_channels.items():
+ x_axis = scope._waveform_time_axis(
+ capture.metadata, len(raw_values)
+ ) # pylint: disable=protected-access
+ y_axis = scope._waveform_voltage_axis(
+ capture.metadata, channel, raw_values
+ ) # pylint: disable=protected-access
+ series[channel] = {
+ "x": [float(value) for value in x_axis],
+ "y": [float(value) for value in y_axis],
+ }
+ return summary, summary_path, raw_path, series
+
+
+def _safe_json_write(path, payload):
+ path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
+
+
+def _build_scope_html_data_from_series(
+ args,
+ scope_state,
+ waveform_data,
+ memory_depth_text=None,
+ sample_rate_text=None,
+):
+ first_channel = next(iter(waveform_data.values()), {"x": [0.0, 1.0], "y": []})
+ time_window_s = None
+ if len(first_channel["x"]) >= 2:
+ time_window_s = abs(
+ float(first_channel["x"][-1]) - float(first_channel["x"][0])
+ )
+ if sample_rate_text is None:
+ sample_rate_text = _format_sample_rate_text(first_channel["x"])
+ if memory_depth_text is None:
+ memory_depth_text = (
+ str(len(first_channel["y"])) if first_channel["y"] else "unknown"
+ )
+
+ status_text = str(scope_state.get("trigger_status", "STOP"))
+ if "." in status_text:
+ status_text = status_text.split(".")[-1].upper()
+
+ if args.profile == "edge":
+ trigger_source = args.edge_source.upper()
+ trigger_coupling = args.edge_coupling.upper()
+ trigger_edge = "falling" if args.edge_slope.lower() == "fall" else "rising"
+ elif args.profile == "pulse":
+ trigger_source = args.pulse_source.upper()
+ trigger_coupling = args.pulse_coupling.upper()
+ trigger_edge = "falling" if args.pulse_dir.upper() == "NEG" else "rising"
+ else:
+ trigger_source = args.slope_source.upper()
+ trigger_coupling = "DC"
+ trigger_edge = "falling" if args.slope_edge.upper() == "NEG" else "rising"
+
+ return {
+ "meta": {
+ "status": status_text.title() if status_text == "STOP" else status_text,
+ "timebase_s_div": args.timebase_s_div,
+ "time_window_s": time_window_s,
+ "sample_rate_text": sample_rate_text,
+ "memory_depth_text": memory_depth_text,
+ },
+ "trigger": {
+ "source": trigger_source,
+ "coupling": trigger_coupling,
+ "edge": trigger_edge,
+ "level_v": args.trigger_level_v,
+ "horizontal_pos_s": 0.0,
+ },
+ "channels": {
+ "CH1": {
+ "visible": 1 in args.capture_channels,
+ "volts_per_div": args.ch1_scale_v_div,
+ "position_div": args.ch1_position_div,
+ "color": "#ffff00",
+ "data": waveform_data.get(1, {}).get("y", []),
+ "time_s": waveform_data.get(1, {}).get("x", []),
+ },
+ "CH2": {
+ "visible": 2 in args.capture_channels,
+ "volts_per_div": args.ch2_scale_v_div,
+ "position_div": args.ch2_position_div,
+ "color": "#00ffff",
+ "data": waveform_data.get(2, {}).get("y", []),
+ "time_s": waveform_data.get(2, {}).get("x", []),
+ },
+ "CH3": {
+ "visible": 3 in args.capture_channels,
+ "volts_per_div": args.ch3_scale_v_div,
+ "position_div": args.ch3_position_div,
+ "color": "#ff8800",
+ "data": waveform_data.get(3, {}).get("y", []),
+ "time_s": waveform_data.get(3, {}).get("x", []),
+ },
+ },
+ }
+
+
+def _build_scope_html_data(args, scope_state, waveforms):
+ waveform_data = {}
+ for summary in waveforms:
+ with Path(summary.csv_path).open(newline="", encoding="utf-8") as handle:
+ rows = list(csv.DictReader(handle))
+ waveform_data[summary.channel] = {
+ "x": [float(row["time_s"]) for row in rows],
+ "y": [float(row["voltage_v"]) for row in rows],
+ }
+ return _build_scope_html_data_from_series(args, scope_state, waveform_data)
+
+
+def _render_scope_html_with_data(out_dir, scope_data, stem):
+ template_path = Path(__file__).resolve().parent / "scope.html"
+ template = template_path.read_text(encoding="utf-8")
+ template = template.replace("scope_view_data.js", f"{stem}_data.js")
+ template = template.replace("scope_view.json", f"{stem}.json")
+ html_path = out_dir / f"{stem}.html"
+ json_path = out_dir / f"{stem}.json"
+ js_path = out_dir / f"{stem}_data.js"
+ html_path.write_text(template, encoding="utf-8")
+ json_path.write_text(json.dumps(scope_data, indent=2), encoding="utf-8")
+ js_path.write_text(
+ "window.__SCOPE_DATA__ = " + json.dumps(scope_data) + ";\n", encoding="utf-8"
+ )
+ return html_path, json_path
+
+
+def _render_scope_html(out_dir, args, scope_state, waveforms, sample_rate_text=None):
+ scope_data = _build_scope_html_data(args, scope_state, waveforms)
+ if sample_rate_text is not None:
+ scope_data["meta"]["sample_rate_text"] = sample_rate_text
+ return _render_scope_html_with_data(out_dir, scope_data, "scope_view")
+
+
+def _max_abs_delta(values_a, values_b):
+ if not values_a or not values_b:
+ return 0.0
+ return max(
+ abs(float(a) - float(b)) for a, b in zip(values_a, values_b, strict=True)
+ )
+
+
+def _mean_abs_delta(values_a, values_b):
+ if not values_a or not values_b:
+ return 0.0
+ deltas = [abs(float(a) - float(b)) for a, b in zip(values_a, values_b, strict=True)]
+ return sum(deltas) / len(deltas)
+
+
+def _align_waveform_series(series_a, series_b):
+ x_a = list(series_a["x"])
+ y_a = list(series_a["y"])
+ x_b = list(series_b["x"])
+ y_b = list(series_b["y"])
+ if len(y_a) == len(y_b):
+ return {
+ "alignment": "exact",
+ "x_a": x_a,
+ "y_a": y_a,
+ "x_b": x_b,
+ "y_b": y_b,
+ }
+
+ if abs(len(y_a) - len(y_b)) == 1:
+ if len(y_a) > len(y_b):
+ candidates = [
+ ("drop_first_a", x_a[1:], y_a[1:], x_b, y_b),
+ ("drop_last_a", x_a[:-1], y_a[:-1], x_b, y_b),
+ ]
+ else:
+ candidates = [
+ ("drop_first_b", x_a, y_a, x_b[1:], y_b[1:]),
+ ("drop_last_b", x_a, y_a, x_b[:-1], y_b[:-1]),
+ ]
+ best = min(
+ candidates,
+ key=lambda item: (
+ _max_abs_delta(item[2], item[4]),
+ _mean_abs_delta(item[2], item[4]),
+ ),
+ )
+ return {
+ "alignment": best[0],
+ "x_a": list(best[1]),
+ "y_a": list(best[2]),
+ "x_b": list(best[3]),
+ "y_b": list(best[4]),
+ }
+
+ shared = min(len(y_a), len(y_b))
+ return {
+ "alignment": f"prefix_trim_to_{shared}",
+ "x_a": x_a[:shared],
+ "y_a": y_a[:shared],
+ "x_b": x_b[:shared],
+ "y_b": y_b[:shared],
+ }
+
+
+def _compare_waveform_series(name_a, series_a, name_b, series_b):
+ aligned = _align_waveform_series(series_a, series_b)
+ delta_samples = [
+ abs(float(a) - float(b))
+ for a, b in zip(aligned["y_a"], aligned["y_b"], strict=True)
+ ]
+ return {
+ "series_a": name_a,
+ "series_b": name_b,
+ "count_a": len(series_a["y"]),
+ "count_b": len(series_b["y"]),
+ "aligned_count": len(aligned["y_a"]),
+ "alignment": aligned["alignment"],
+ "max_abs_voltage_delta_v": max(delta_samples) if delta_samples else 0.0,
+ "mean_abs_voltage_delta_v": (
+ sum(delta_samples) / len(delta_samples) if delta_samples else 0.0
+ ),
+ "diff_sample_count": sum(1 for delta in delta_samples if delta > 1e-9),
+ "max_abs_time_delta_s": _max_abs_delta(aligned["x_a"], aligned["x_b"]),
+ "voltage_pp_a_v": (
+ max(series_a["y"]) - min(series_a["y"]) if series_a["y"] else 0.0
+ ),
+ "voltage_pp_b_v": (
+ max(series_b["y"]) - min(series_b["y"]) if series_b["y"] else 0.0
+ ),
+ }
+
+
+def _render_waveform_comparison_html(out_dir, comparison):
+ html_path = out_dir / "waveform_comparison.html"
+ lines = [
+ "",
+ '',
+ "
",
+ '',
+ '',
+ "Waveform Comparison",
+ "",
+ "",
+ "",
+ "Waveform Comparison
",
+ 'BMP is the ground-truth screen capture. The HTML views below render the public waveform APIs grouped by acquisition family.
',
+ ]
+ if comparison.get("bmp_name"):
+ lines.extend(
+ [
+ "Reference BMP
",
+ f'',
+ ]
+ )
+
+ lines.extend(
+ [
+ "Numeric Comparison
",
+ "",
+ "| Family | Channel | A | B | Counts | Alignment | Mean |dV| | Max |dV| | Diff samples | Max |dt| |
",
+ "",
+ ]
+ )
+ for family, channels in comparison.get("families", {}).items():
+ for channel_name, metrics in channels.items():
+ lines.append(
+ ""
+ f"| {family} | "
+ f"{channel_name} | "
+ f"{metrics['series_a']} | "
+ f"{metrics['series_b']} | "
+ f"{metrics['count_a']} vs {metrics['count_b']} (shared {metrics['aligned_count']}) | "
+ f"{metrics['alignment']} | "
+ f"{metrics['mean_abs_voltage_delta_v']:.6g} V | "
+ f"{metrics['max_abs_voltage_delta_v']:.6g} V | "
+ f"{metrics['diff_sample_count']} | "
+ f"{metrics['max_abs_time_delta_s']:.6g} s | "
+ "
"
+ )
+ lines.extend(["", "
"])
+
+ views = comparison.get("views", [])
+ if views:
+ lines.extend(["Rendered Views
", ''])
+ for view in views:
+ lines.extend(
+ [
+ '
",
+ ]
+ )
+ lines.append("
")
+
+ lines.extend(["", ""])
+ html_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
+ return html_path
diff --git a/doc/examples/_owon_capture_debug.py b/doc/examples/_owon_capture_debug.py
new file mode 100644
index 00000000..388fa19f
--- /dev/null
+++ b/doc/examples/_owon_capture_debug.py
@@ -0,0 +1,1489 @@
+#!/usr/bin/env python
+"""
+Debug and validation helpers for the OWON ESP32 trigger jig runner.
+"""
+
+from __future__ import annotations
+
+import argparse
+from dataclasses import asdict
+import json
+from pathlib import Path
+import time
+
+import usb.core
+
+import instruments as ik
+from instruments.units import ureg as u
+
+from _owon_capture_common import (
+ RawWaveformSummary,
+ _build_scope_html_data_from_series,
+ _capture_screenshot,
+ _capture_waveform,
+ _clean_reply,
+ _compare_waveform_series,
+ _format_time_token,
+ _format_voltage_token,
+ _make_trace_context,
+ _metadata_sample_rate_text,
+ _open_scope,
+ _render_scope_html,
+ _render_scope_html_with_data,
+ _render_waveform_comparison_html,
+ _safe_json_write,
+ _timestamp,
+ _write_jsonl,
+)
+
+
+def _raw_waveform_summary(scope, channel):
+ metadata = scope.read_waveform_metadata()
+ point_count = scope._waveform_point_count(
+ metadata
+ ) # pylint: disable=protected-access
+ payload = scope._binary_query_exact( # pylint: disable=protected-access
+ f":DATA:WAVE:SCREEN:CH{channel}?",
+ 4 + 2 * point_count,
+ )
+ raw_adc = ik.owon.sds1104._parse_waveform_adc( # pylint: disable=protected-access
+ ik.owon.sds1104._strip_packet_prefix(payload, f"screen waveform CH{channel}"),
+ f"screen waveform CH{channel}",
+ )
+ raw_list = [int(value) for value in raw_adc]
+ channel_meta = metadata["CHANNEL"][channel - 1]
+ return (
+ RawWaveformSummary(
+ channel=channel,
+ sample_count=len(raw_list),
+ raw_min=min(raw_list),
+ raw_max=max(raw_list),
+ raw_first_16=raw_list[:16],
+ metadata_scale=str(channel_meta.get("SCALE")),
+ metadata_probe=str(channel_meta.get("PROBE")),
+ metadata_offset=channel_meta.get("OFFSET"),
+ ),
+ metadata,
+ )
+
+
+def _summarize_numeric_axis(values):
+ values = [float(value) for value in values]
+ return {
+ "count": len(values),
+ "min": min(values),
+ "max": max(values),
+ "pp": max(values) - min(values),
+ "first_16": values[:16],
+ }
+
+
+def _capture_waveform_truth(
+ scope, out_dir, args, scope_state, capture_channels, deep_first=False
+):
+ truth = {
+ "screen_bmp_path": None,
+ "screen_metadata_path": None,
+ "screen_metadata": None,
+ "screen_waveforms": {},
+ "screen_channel_alias_waveforms": {},
+ "deep_memory_metadata_path": None,
+ "deep_memory_metadata": None,
+ "deep_memory_channels": {},
+ "deep_memory_bundle": None,
+ "errors": [],
+ }
+ screen_series = {}
+ screen_alias_series = {}
+ deep_channel_series = {}
+ deep_bundle_series = {}
+ screen_metadata = None
+ deep_metadata = None
+ comparison = {
+ "bmp_name": None,
+ "views": [],
+ "families": {},
+ }
+
+ try:
+ deep_metadata = scope.read_deep_memory_metadata()
+ deep_metadata_path = out_dir / "deep_memory_metadata.json"
+ _safe_json_write(deep_metadata_path, deep_metadata)
+ truth["deep_memory_metadata_path"] = str(deep_metadata_path)
+ truth["deep_memory_metadata"] = deep_metadata
+ except Exception as exc:
+ truth["errors"].append(f"read_deep_memory_metadata: {exc}")
+ deep_metadata = None
+
+ for channel in capture_channels:
+ try:
+ x_axis, y_axis = scope.read_deep_memory_channel(channel)
+ x_values = [float(value) for value in x_axis]
+ y_values = [float(value) for value in y_axis]
+ truth["deep_memory_channels"][f"CH{channel}"] = {
+ "time": _summarize_numeric_axis(x_values),
+ "voltage": _summarize_numeric_axis(y_values),
+ }
+ deep_channel_series[channel] = {"x": x_values, "y": y_values}
+ except Exception as exc:
+ truth["errors"].append(f"read_deep_memory_channel({channel}): {exc}")
+
+ try:
+ bundle = scope.read_deep_memory_all()
+ truth["deep_memory_bundle"] = {
+ "metadata_keys": sorted(bundle.metadata.keys()),
+ "raw_channels": {
+ f"CH{channel}": {
+ "count": len(raw_values),
+ "min": int(min(raw_values)),
+ "max": int(max(raw_values)),
+ "first_16": [int(value) for value in list(raw_values)[:16]],
+ }
+ for channel, raw_values in bundle.raw_channels.items()
+ },
+ }
+ converted_channels = {}
+ for channel, raw_values in bundle.raw_channels.items():
+ x_axis = scope._waveform_time_axis(
+ bundle.metadata, len(raw_values)
+ ) # pylint: disable=protected-access
+ y_axis = scope._waveform_voltage_axis(
+ bundle.metadata, channel, raw_values
+ ) # pylint: disable=protected-access
+ x_values = [float(value) for value in x_axis]
+ y_values = [float(value) for value in y_axis]
+ converted_channels[f"CH{channel}"] = {
+ "time": _summarize_numeric_axis(x_values),
+ "voltage": _summarize_numeric_axis(y_values),
+ }
+ deep_bundle_series[channel] = {"x": x_values, "y": y_values}
+ truth["deep_memory_bundle"]["converted_channels"] = converted_channels
+ except Exception as exc:
+ truth["errors"].append(f"read_deep_memory_all: {exc}")
+
+ if not deep_first:
+ bmp_path = out_dir / "waveform_truth_screen.bmp"
+ bmp_path.write_bytes(scope.read_screen_bmp())
+ truth["screen_bmp_path"] = str(bmp_path)
+ comparison["bmp_name"] = bmp_path.name
+
+ try:
+ screen_metadata = scope.read_waveform_metadata()
+ screen_metadata_path = out_dir / "screen_waveform_metadata.json"
+ _safe_json_write(screen_metadata_path, screen_metadata)
+ truth["screen_metadata_path"] = str(screen_metadata_path)
+ truth["screen_metadata"] = screen_metadata
+ except Exception as exc:
+ truth["errors"].append(f"read_waveform_metadata: {exc}")
+ screen_metadata = None
+
+ for channel in capture_channels:
+ try:
+ raw_summary, metadata = _raw_waveform_summary(scope, channel)
+ truth["screen_waveforms"][f"CH{channel}"] = {
+ "raw_summary": asdict(raw_summary),
+ "metadata_scale": str(
+ metadata["CHANNEL"][channel - 1].get("SCALE")
+ ),
+ "metadata_probe": str(
+ metadata["CHANNEL"][channel - 1].get("PROBE")
+ ),
+ "metadata_offset": metadata["CHANNEL"][channel - 1].get("OFFSET"),
+ }
+ except Exception as exc:
+ truth["errors"].append(f"raw screen CH{channel}: {exc}")
+
+ try:
+ x_axis, y_axis = scope.read_waveform(channel)
+ x_values = [float(value) for value in x_axis]
+ y_values = [float(value) for value in y_axis]
+ truth["screen_waveforms"].setdefault(f"CH{channel}", {})
+ truth["screen_waveforms"][f"CH{channel}"]["converted_summary"] = {
+ "time": _summarize_numeric_axis(x_values),
+ "voltage": _summarize_numeric_axis(y_values),
+ }
+ screen_series[channel] = {"x": x_values, "y": y_values}
+ except Exception as exc:
+ truth["errors"].append(f"read_waveform({channel}): {exc}")
+
+ try:
+ x_axis, y_axis = scope.channel[channel - 1].read_waveform()
+ x_values = [float(value) for value in x_axis]
+ y_values = [float(value) for value in y_axis]
+ truth["screen_channel_alias_waveforms"][f"CH{channel}"] = {
+ "time": _summarize_numeric_axis(x_values),
+ "voltage": _summarize_numeric_axis(y_values),
+ }
+ screen_alias_series[channel] = {"x": x_values, "y": y_values}
+ except Exception as exc:
+ truth["errors"].append(f"channel[{channel - 1}].read_waveform(): {exc}")
+
+ truth_path = out_dir / "waveform_truth.json"
+ _safe_json_write(truth_path, truth)
+
+ view_specs = [
+ (
+ "screen_read_waveform_view",
+ "Screen `read_waveform(channel)`",
+ screen_series,
+ screen_metadata,
+ ),
+ (
+ "screen_channel_alias_view",
+ "Screen `channel[n].read_waveform()`",
+ screen_alias_series,
+ screen_metadata,
+ ),
+ (
+ "deep_memory_channel_view",
+ "Deep `read_deep_memory_channel(channel)`",
+ deep_channel_series,
+ deep_metadata,
+ ),
+ (
+ "deep_memory_bundle_view",
+ "Deep `read_deep_memory_all()` converted from raw bundle",
+ deep_bundle_series,
+ deep_metadata,
+ ),
+ ]
+ for stem, title, waveform_data, metadata in view_specs:
+ if not waveform_data:
+ continue
+ first_channel = next(iter(waveform_data.values()))
+ scope_data = _build_scope_html_data_from_series(
+ args,
+ scope_state,
+ waveform_data,
+ memory_depth_text=str(len(first_channel["y"])),
+ sample_rate_text=_metadata_sample_rate_text(metadata),
+ )
+ html_path, json_path = _render_scope_html_with_data(out_dir, scope_data, stem)
+ comparison["views"].append(
+ {
+ "stem": stem,
+ "title": title,
+ "html_name": html_path.name,
+ "json_name": json_path.name,
+ }
+ )
+
+ family_map = {
+ "screen": (
+ "read_waveform(channel)",
+ screen_series,
+ "channel[n].read_waveform()",
+ screen_alias_series,
+ ),
+ "deep_memory": (
+ "read_deep_memory_channel(channel)",
+ deep_channel_series,
+ "read_deep_memory_all()",
+ deep_bundle_series,
+ ),
+ }
+ for family_name, (name_a, series_a, name_b, series_b) in family_map.items():
+ channels = {}
+ for channel in sorted(set(series_a) & set(series_b)):
+ channels[f"CH{channel}"] = _compare_waveform_series(
+ name_a,
+ series_a[channel],
+ name_b,
+ series_b[channel],
+ )
+ if channels:
+ comparison["families"][family_name] = channels
+
+ comparison_path = out_dir / "waveform_comparison.json"
+ _safe_json_write(comparison_path, comparison)
+ comparison_html_path = _render_waveform_comparison_html(out_dir, comparison)
+ comparison["json_path"] = str(comparison_path)
+ comparison["html_path"] = str(comparison_html_path)
+
+ return truth, truth_path, comparison
+
+
+def _classify_error(text):
+ lowered = str(text).lower()
+ if "timeout" in lowered or "timed out" in lowered:
+ return "transport_timeout"
+ if "pipe error" in lowered:
+ return "transport_pipe_error"
+ if "length-prefixed body" in lowered or "length mismatch" in lowered:
+ return "malformed_length_prefix"
+ if "deep_memory" in lowered or "deep-memory" in lowered or "depmem" in lowered:
+ return "deep_memory_error"
+ if (
+ "waveform" in lowered
+ or lowered.startswith("ch1:")
+ or lowered.startswith("ch2:")
+ ):
+ return "waveform_read_error"
+ return "unknown_error"
+
+
+def _is_transport_failure(exc):
+ if isinstance(exc, (usb.core.USBTimeoutError, usb.core.USBError)):
+ return True
+ text = str(exc).lower()
+ if isinstance(exc, OSError) and (
+ "timeout" in text
+ or "timed out" in text
+ or "pipe" in text
+ or "access denied" in text
+ ):
+ return True
+ if isinstance(exc, ValueError) and (
+ "length-prefixed" in text
+ or "payload is too short" in text
+ or "packet prefix" in text
+ or "header" in text
+ ):
+ return True
+ return False
+
+
+def _snapshot_scope_state(scope):
+ def capture(fn):
+ try:
+ value = fn()
+ if hasattr(value, "units"):
+ return {"value": value.magnitude, "units": str(value.units)}
+ return str(value)
+ except Exception as exc:
+ return f"error: {type(exc).__name__}: {exc}"
+
+ return {
+ "trigger_status": capture(lambda: scope.trigger_status),
+ "trigger_type": capture(lambda: scope.trigger_type),
+ "single_trigger_mode": capture(lambda: scope.single_trigger_mode),
+ "trigger_sweep": capture(lambda: scope.trigger_sweep),
+ "timebase_scale": capture(lambda: scope.timebase_scale),
+ "horizontal_offset": capture(lambda: scope.horizontal_offset),
+ "memory_depth": capture(lambda: scope.memory_depth),
+ "acquire_mode": capture(lambda: scope.acquire_mode),
+ "acquire_averages": capture(lambda: scope.acquire_averages),
+ "measurement_display_enabled": capture(
+ lambda: scope.measurement_display_enabled
+ ),
+ "raw_edge_level": capture(
+ lambda: _clean_reply(scope.query(":TRIGger:SINGle:EDGE:LEVel?"))
+ ),
+ "trigger_configuration": capture(lambda: scope.read_trigger_configuration()),
+ }
+
+
+def _snapshot_scope_state_minimal(scope):
+ def capture(fn):
+ try:
+ return str(fn())
+ except Exception as exc:
+ return f"error: {type(exc).__name__}: {exc}"
+
+ return {
+ "trigger_status": capture(
+ lambda: _clean_reply(scope.query(":TRIGger:STATUS?"))
+ ),
+ "trigger_sweep": capture(
+ lambda: _clean_reply(scope.query(":TRIGger:SINGle:SWEEp?"))
+ ),
+ "raw_edge_level": capture(
+ lambda: _clean_reply(scope.query(":TRIGger:SINGle:EDGE:LEVel?"))
+ ),
+ "timebase_scale": capture(
+ lambda: _clean_reply(scope.query(":HORIzontal:Scale?"))
+ ),
+ "memory_depth": capture(lambda: _clean_reply(scope.query(":ACQUIRE:DEPMEM?"))),
+ }
+
+
+def _capture_state_for_style(scope, style):
+ if style == "off":
+ return None
+ if style == "minimal":
+ return _snapshot_scope_state_minimal(scope)
+ return _snapshot_scope_state(scope)
+
+
+def _record_state_snapshot(trace_ctx, phase, command_label, snapshot, error=None):
+ if (
+ trace_ctx is None
+ or trace_ctx.state_trace_path is None
+ or trace_ctx.state_probe_style == "off"
+ or snapshot is None
+ ):
+ return
+ trace_ctx.state_seq += 1
+ payload = {
+ "ts": _timestamp().isoformat(timespec="microseconds"),
+ "seq": trace_ctx.state_seq,
+ "phase": phase,
+ "command_label": command_label,
+ "snapshot": snapshot,
+ }
+ if error is not None:
+ payload["error"] = error
+ _write_jsonl(trace_ctx.state_trace_path, payload)
+
+
+def _apply_state_step(scope, label, trace_ctx, callback, settle_s=0.05):
+ style = "full" if trace_ctx is None else trace_ctx.state_probe_style
+ _record_state_snapshot(
+ trace_ctx, "before", label, _capture_state_for_style(scope, style)
+ )
+ try:
+ result = callback()
+ except Exception as exc:
+ _record_state_snapshot(
+ trace_ctx,
+ "error",
+ label,
+ _capture_state_for_style(scope, style),
+ error=f"{type(exc).__name__}: {exc}",
+ )
+ raise
+ _record_state_snapshot(
+ trace_ctx, "after", label, _capture_state_for_style(scope, style)
+ )
+ time.sleep(max(float(settle_s), 0.0))
+ _record_state_snapshot(
+ trace_ctx,
+ "after_settle",
+ label,
+ _capture_state_for_style(scope, style),
+ )
+ return result
+
+
+def _apply_channel_setup(scope, args, trace_ctx):
+ for channel in range(1, 5):
+ _apply_state_step(
+ scope,
+ f"channel[{channel}].display = {channel in args.capture_channels}",
+ trace_ctx,
+ lambda channel=channel: setattr(
+ scope.channel[channel - 1],
+ "display",
+ channel in args.capture_channels,
+ ),
+ )
+
+ _apply_state_step(
+ scope,
+ f"channel[1].probe_attenuation = {int(args.ch1_probe)}",
+ trace_ctx,
+ lambda: setattr(scope.channel[0], "probe_attenuation", int(args.ch1_probe)),
+ )
+ _apply_state_step(
+ scope,
+ f"channel[2].probe_attenuation = {int(args.ch2_probe)}",
+ trace_ctx,
+ lambda: setattr(scope.channel[1], "probe_attenuation", int(args.ch2_probe)),
+ )
+ if 3 in args.capture_channels:
+ _apply_state_step(
+ scope,
+ f"channel[3].probe_attenuation = {int(args.ch3_probe)}",
+ trace_ctx,
+ lambda: setattr(scope.channel[2], "probe_attenuation", int(args.ch3_probe)),
+ )
+
+ if args.ch1_scale_v_div is not None:
+ _apply_state_step(
+ scope,
+ f"channel[1].scale = {args.ch1_scale_v_div} V/div",
+ trace_ctx,
+ lambda: setattr(scope.channel[0], "scale", args.ch1_scale_v_div * u.volt),
+ )
+ if args.ch2_scale_v_div is not None:
+ _apply_state_step(
+ scope,
+ f"channel[2].scale = {args.ch2_scale_v_div} V/div",
+ trace_ctx,
+ lambda: setattr(scope.channel[1], "scale", args.ch2_scale_v_div * u.volt),
+ )
+ if args.ch3_scale_v_div is not None and 3 in args.capture_channels:
+ _apply_state_step(
+ scope,
+ f"channel[3].scale = {args.ch3_scale_v_div} V/div",
+ trace_ctx,
+ lambda: setattr(scope.channel[2], "scale", args.ch3_scale_v_div * u.volt),
+ )
+
+ _apply_state_step(
+ scope,
+ f"channel[1].position = {float(args.ch1_position_div)}",
+ trace_ctx,
+ lambda: setattr(scope.channel[0], "position", float(args.ch1_position_div)),
+ )
+ _apply_state_step(
+ scope,
+ f"channel[2].position = {float(args.ch2_position_div)}",
+ trace_ctx,
+ lambda: setattr(scope.channel[1], "position", float(args.ch2_position_div)),
+ )
+ if 3 in args.capture_channels:
+ _apply_state_step(
+ scope,
+ f"channel[3].position = {float(args.ch3_position_div)}",
+ trace_ctx,
+ lambda: setattr(scope.channel[2], "position", float(args.ch3_position_div)),
+ )
+
+
+def _apply_trigger_common(scope, args, trace_ctx):
+ sweep_readback = None
+ if args.trigger_sweep:
+ try:
+ _apply_state_step(
+ scope,
+ f"trigger_sweep = {args.trigger_sweep}",
+ trace_ctx,
+ lambda: setattr(
+ scope, "trigger_sweep", scope.TriggerSweep(args.trigger_sweep)
+ ),
+ )
+ sweep_readback = str(scope.trigger_sweep)
+ except Exception as exc:
+ sweep_readback = f"error: {exc}"
+ return sweep_readback
+
+
+def _configure_scope(scope, args, trace_ctx):
+ _apply_channel_setup(scope, args, trace_ctx)
+ if args.memory_depth is not None:
+ _apply_state_step(
+ scope,
+ f"memory_depth = {int(args.memory_depth)}",
+ trace_ctx,
+ lambda: setattr(scope, "memory_depth", int(args.memory_depth)),
+ )
+ if args.horizontal_offset_div is not None:
+ _apply_state_step(
+ scope,
+ f"horizontal_offset = {float(args.horizontal_offset_div)}",
+ trace_ctx,
+ lambda: setattr(
+ scope, "horizontal_offset", float(args.horizontal_offset_div)
+ ),
+ )
+ if args.measurement_display == "on":
+ _apply_state_step(
+ scope,
+ "measurement_display_enabled = True",
+ trace_ctx,
+ lambda: setattr(scope, "measurement_display_enabled", True),
+ )
+ elif args.measurement_display == "off":
+ _apply_state_step(
+ scope,
+ "measurement_display_enabled = False",
+ trace_ctx,
+ lambda: setattr(scope, "measurement_display_enabled", False),
+ )
+ _apply_state_step(
+ scope,
+ f"timebase_scale = {args.timebase_s_div}",
+ trace_ctx,
+ lambda: setattr(scope, "timebase_scale", args.timebase_s_div * u.second),
+ )
+ _apply_state_step(
+ scope,
+ ":TRIGger:TYPE SINGle",
+ trace_ctx,
+ lambda: scope.sendcmd(":TRIGger:TYPE SINGle"),
+ )
+
+ if args.profile == "edge":
+ _apply_state_step(
+ scope,
+ "single_trigger_mode = EDGE",
+ trace_ctx,
+ lambda: setattr(scope, "single_trigger_mode", scope.SingleTriggerMode.edge),
+ )
+ _apply_state_step(
+ scope,
+ f"trigger_source = {args.edge_source}",
+ trace_ctx,
+ lambda: setattr(
+ scope,
+ "trigger_source",
+ getattr(scope.TriggerSource, args.edge_source.lower()),
+ ),
+ )
+ _apply_state_step(
+ scope,
+ f"trigger_coupling = {args.edge_coupling}",
+ trace_ctx,
+ lambda: setattr(
+ scope,
+ "trigger_coupling",
+ getattr(scope.TriggerCoupling, args.edge_coupling.lower()),
+ ),
+ )
+ _apply_state_step(
+ scope,
+ f"trigger_slope = {args.edge_slope}",
+ trace_ctx,
+ lambda: setattr(
+ scope,
+ "trigger_slope",
+ getattr(scope.TriggerSlope, args.edge_slope.lower()),
+ ),
+ )
+ if args.trigger_level_v is not None:
+ _apply_state_step(
+ scope,
+ f"trigger_level = {args.trigger_level_v} V",
+ trace_ctx,
+ lambda: setattr(scope, "trigger_level", args.trigger_level_v * u.volt),
+ )
+ elif args.profile == "pulse":
+ _apply_state_step(
+ scope,
+ "single_trigger_mode = PULSe",
+ trace_ctx,
+ lambda: setattr(
+ scope, "single_trigger_mode", scope.SingleTriggerMode.pulse
+ ),
+ )
+ _apply_state_step(
+ scope,
+ f"PULSe:SOURce {args.pulse_source.upper()}",
+ trace_ctx,
+ lambda: scope.sendcmd(
+ f":TRIGger:SINGle:PULSe:SOURce {args.pulse_source.upper()}"
+ ),
+ )
+ _apply_state_step(
+ scope,
+ f"PULSe:SIGN {args.pulse_sign}",
+ trace_ctx,
+ lambda: scope.sendcmd(f":TRIGger:SINGle:PULSe:SIGN {args.pulse_sign}"),
+ )
+ _apply_state_step(
+ scope,
+ f"PULSe:TIME {args.pulse_trigger_time_us} us",
+ trace_ctx,
+ lambda: scope.sendcmd(
+ f":TRIGger:SINGle:PULSe:TIME {_format_time_token(args.pulse_trigger_time_us * u.microsecond)}"
+ ),
+ )
+ _apply_state_step(
+ scope,
+ f"PULSe:DIR {args.pulse_dir.upper()}",
+ trace_ctx,
+ lambda: scope.sendcmd(
+ f":TRIGger:SINGle:PULSe:DIR {args.pulse_dir.upper()}"
+ ),
+ )
+ _apply_state_step(
+ scope,
+ f"PULSe:COUPling {args.pulse_coupling.upper()}",
+ trace_ctx,
+ lambda: scope.sendcmd(
+ f":TRIGger:SINGle:PULSe:COUPling {args.pulse_coupling.upper()}"
+ ),
+ )
+ elif args.profile == "slope":
+ _apply_state_step(
+ scope,
+ "single_trigger_mode = SLOPe",
+ trace_ctx,
+ lambda: setattr(
+ scope, "single_trigger_mode", scope.SingleTriggerMode.slope
+ ),
+ )
+ _apply_state_step(
+ scope,
+ f"SLOPe:SOURce {args.slope_source.upper()}",
+ trace_ctx,
+ lambda: scope.sendcmd(
+ f":TRIGger:SINGle:SLOPe:SOURce {args.slope_source.upper()}"
+ ),
+ )
+ _apply_state_step(
+ scope,
+ f"SLOPe:ULevel {args.slope_upper_v} V",
+ trace_ctx,
+ lambda: scope.sendcmd(
+ f":TRIGger:SINGle:SLOPe:ULevel {_format_voltage_token(args.slope_upper_v * u.volt)}"
+ ),
+ )
+ _apply_state_step(
+ scope,
+ f"SLOPe:LLevel {args.slope_lower_v} V",
+ trace_ctx,
+ lambda: scope.sendcmd(
+ f":TRIGger:SINGle:SLOPe:LLevel {_format_voltage_token(args.slope_lower_v * u.volt)}"
+ ),
+ )
+ _apply_state_step(
+ scope,
+ f"SLOPe:SIGN {args.slope_sign}",
+ trace_ctx,
+ lambda: scope.sendcmd(f":TRIGger:SINGle:SLOPe:SIGN {args.slope_sign}"),
+ )
+ _apply_state_step(
+ scope,
+ f"SLOPe:TIME {args.slope_trigger_time_us} us",
+ trace_ctx,
+ lambda: scope.sendcmd(
+ f":TRIGger:SINGle:SLOPe:TIME {_format_time_token(args.slope_trigger_time_us * u.microsecond)}"
+ ),
+ )
+ _apply_state_step(
+ scope,
+ f"SLOPe:SLOPe {args.slope_edge.upper()}",
+ trace_ctx,
+ lambda: scope.sendcmd(
+ f":TRIGger:SINGle:SLOPe:SLOPe {args.slope_edge.upper()}"
+ ),
+ )
+ else:
+ raise ValueError(f"Unsupported profile: {args.profile}")
+
+ _apply_state_step(
+ scope,
+ f"trigger_holdoff = {args.trigger_holdoff_ns} ns",
+ trace_ctx,
+ lambda: setattr(
+ scope, "trigger_holdoff", args.trigger_holdoff_ns * u.nanosecond
+ ),
+ )
+ sweep_readback = _apply_trigger_common(scope, args, trace_ctx)
+
+ if args.arm == "run":
+ _apply_state_step(scope, "run()", trace_ctx, scope.run)
+ elif args.arm == "single":
+ _apply_state_step(
+ scope,
+ f"single(stop_first={args.single_stop_first}, arm_method={args.arm_method})",
+ trace_ctx,
+ lambda: scope.single(
+ stop_first=args.single_stop_first,
+ arm_method=args.arm_method,
+ ),
+ )
+ elif args.arm == "stop":
+ _apply_state_step(scope, "stop()", trace_ctx, scope.stop)
+ return {
+ "trigger_sweep": sweep_readback,
+ "trigger_status": str(scope.trigger_status),
+ "trigger_snapshot": str(scope.read_trigger_configuration()),
+ "raw_edge_level": _clean_reply(scope.query(":TRIGger:SINGle:EDGE:LEVel?")),
+ "trigger_type": str(scope.trigger_type),
+ }
+
+
+def _restore_stable_edge(scope, args):
+ try:
+ scope.sendcmd(":TRIGger:TYPE SINGle")
+ scope.sendcmd(":TRIGger:SINGle:MODE EDGE")
+ scope.sendcmd(f":TRIGger:SINGle:EDGE:SOURce {args.edge_source.upper()}")
+ scope.sendcmd(f":TRIGger:SINGle:EDGE:COUPling {args.edge_coupling.upper()}")
+ scope.sendcmd(f":TRIGger:SINGle:EDGE:SLOPe {args.edge_slope.upper()}")
+ if args.trigger_level_v is not None:
+ scope.sendcmd(
+ f":TRIGger:SINGle:EDGE:LEVel {_format_voltage_token(args.trigger_level_v * u.volt)}"
+ )
+ except Exception:
+ pass
+
+
+def _collect_scope_state(scope, style="full"):
+ if style == "off":
+ return {}
+ if style == "minimal":
+ return _snapshot_scope_state_minimal(scope)
+ state = {}
+ try:
+ state["trigger_status"] = str(scope.trigger_status)
+ except Exception as exc:
+ state["trigger_status"] = f"error: {exc}"
+ try:
+ state["trigger_type"] = str(scope.trigger_type)
+ except Exception as exc:
+ state["trigger_type"] = f"error: {exc}"
+ try:
+ state["timebase_scale"] = str(scope.timebase_scale)
+ except Exception as exc:
+ state["timebase_scale"] = f"error: {exc}"
+ try:
+ state["trigger_sweep"] = str(scope.trigger_sweep)
+ except Exception as exc:
+ state["trigger_sweep"] = f"error: {exc}"
+ try:
+ state["raw_edge_level"] = _clean_reply(
+ scope.query(":TRIGger:SINGle:EDGE:LEVel?")
+ )
+ except Exception as exc:
+ state["raw_edge_level"] = f"error: {exc}"
+ try:
+ state["trigger_snapshot"] = str(scope.read_trigger_configuration())
+ except Exception as exc:
+ state["trigger_snapshot"] = f"error: {exc}"
+ return state
+
+
+def _namespace_with_overrides(args, **updates):
+ data = vars(args).copy()
+ data.update(updates)
+ return argparse.Namespace(**data)
+
+
+def _write_report(
+ out_dir,
+ args,
+ esp_lines,
+ scope_state,
+ after_arm_state,
+ after_finalize_state,
+ screenshot_path,
+ html_view_path,
+ html_data_path,
+ trace_ctx,
+ screenshot_error,
+ waveforms,
+ waveform_errors,
+ waveform_truth_path=None,
+ waveform_truth=None,
+ waveform_comparison=None,
+):
+ report_path = out_dir / "report.md"
+ json_path = out_dir / "report.json"
+
+ waveform_rows = []
+ for summary in waveforms:
+ waveform_rows.append(
+ f"| CH{summary.channel} | {summary.sample_count} | "
+ f"{summary.time_start_s:.6g} | {summary.time_end_s:.6g} | "
+ f"{summary.voltage_min_v:.6g} | {summary.voltage_max_v:.6g} | "
+ f"{summary.voltage_pp_v:.6g} | `{Path(summary.csv_path).name}` |"
+ )
+
+ report_lines = [
+ f"# OWON ESP32 Trigger Jig Run: {args.label}",
+ "",
+ f"- Timestamp: `{_timestamp().isoformat(timespec='seconds')}`",
+ f"- Profile: `{args.profile}`",
+ f"- ESP32 port: `{args.esp_port}`",
+ f"- Scope VID:PID: `{args.scope_vid}:{args.scope_pid}`",
+ f"- Arm command: `{args.arm}`",
+ f"- Screenshot: `{screenshot_path.name if screenshot_path is not None else 'not captured'}`",
+ "",
+ "## Primary Scope Screen",
+ "",
+ *(
+ [f".name})", ""]
+ if screenshot_path is not None
+ else []
+ ),
+ "",
+ "## Derived Scope View",
+ "",
+ *(
+ [
+ f"- [Open `scope_view.html`]({Path(html_view_path).name})",
+ f"- [Open `scope_view.json`]({Path(html_data_path).name})",
+ "",
+ "> VS Code Markdown preview disables local scripts/iframes here.",
+ "> Open the HTML file directly to view the rendered scope screen.",
+ "",
+ ]
+ if html_view_path is not None and html_data_path is not None
+ else []
+ ),
+ *(
+ [
+ "",
+ "## Trace Artifacts",
+ "",
+ f"- [Open `scpi_trace.jsonl`]({Path(trace_ctx.scpi_trace_path).name})",
+ f"- [Open `state_trace.jsonl`]({Path(trace_ctx.state_trace_path).name})",
+ "",
+ ]
+ if trace_ctx is not None
+ and (
+ trace_ctx.scpi_trace_path is not None
+ or trace_ctx.state_trace_path is not None
+ )
+ else []
+ ),
+ "",
+ "## ESP32 Configuration",
+ "",
+ f"- Pulse mode: `{args.pulse_mode}`",
+ f"- Pulse width: `{args.pulse_width_us} us`",
+ f"- Pulse gap: `{args.pulse_gap_us} us`",
+ f"- Frame gap: `{args.pulse_frame_us} us`",
+ f"- Slope half-period: `{args.slope_half_period_us} us`",
+ "",
+ "## Scope State",
+ "",
+ f"- Trigger status: `{scope_state['trigger_status']}`",
+ f"- Timebase scale: `{scope_state['timebase_scale']}`",
+ f"- Trigger snapshot: `{scope_state['trigger_snapshot']}`",
+ "",
+ "## Arming States",
+ "",
+ f"- After arm: `{after_arm_state}`",
+ f"- After finalize: `{after_finalize_state}`",
+ "",
+ "## ESP32 Serial Output",
+ "",
+ "```text",
+ *esp_lines,
+ "```",
+ "",
+ "## Waveforms",
+ "",
+ "| Channel | Samples | t_start (s) | t_end (s) | v_min (V) | v_max (V) | v_pp (V) | CSV |",
+ "| --- | ---: | ---: | ---: | ---: | ---: | ---: | --- |",
+ *waveform_rows,
+ "",
+ *(
+ [
+ "## Waveform Truth",
+ "",
+ f"- [Open `waveform_truth.json`]({Path(waveform_truth_path).name})",
+ "",
+ f"- Errors: `{'; '.join(waveform_truth['errors']) if waveform_truth and waveform_truth.get('errors') else 'none'}`",
+ "",
+ ]
+ if waveform_truth_path is not None and waveform_truth is not None
+ else []
+ ),
+ *(
+ [
+ "## Waveform Comparison",
+ "",
+ f"- [Open `waveform_comparison.html`]({Path(waveform_comparison['html_path']).name})",
+ f"- [Open `waveform_comparison.json`]({Path(waveform_comparison['json_path']).name})",
+ "",
+ "| Family | Channel | A | B | Counts | Alignment | Mean dV | Max dV | Diff samples | Max dt |",
+ "| --- | --- | --- | --- | --- | --- | ---: | ---: | ---: | ---: |",
+ *[
+ f"| `{family}` | `{channel_name}` | "
+ f"`{metrics['series_a']}` | `{metrics['series_b']}` | "
+ f"`{metrics['count_a']} vs {metrics['count_b']} (shared {metrics['aligned_count']})` | "
+ f"`{metrics['alignment']}` | "
+ f"`{metrics['mean_abs_voltage_delta_v']:.6g}` | "
+ f"`{metrics['max_abs_voltage_delta_v']:.6g}` | "
+ f"`{metrics['diff_sample_count']}` | "
+ f"`{metrics['max_abs_time_delta_s']:.6g}` |"
+ for family, channels in waveform_comparison["families"].items()
+ for channel_name, metrics in channels.items()
+ ],
+ "",
+ ]
+ if waveform_comparison is not None
+ else []
+ ),
+ "",
+ "## Notes",
+ "",
+ "- This report is generated by `owon_esp32_trigger_jig_runner.py`.",
+ "- The BMP is the primary evidence artifact.",
+ "- `scope_view.html` is a derived view built from `scope_view.json`.",
+ "- `waveform_comparison.html` groups the screen and deep-memory APIs side by side.",
+ "- HTML waveform views keep the discrete front-panel `M:` scale and show a separate derived `Span:` from the returned x-axis.",
+ "- It records the exact run inputs and captured artifacts, but it does not yet make a formal pass/fail verdict.",
+ ]
+ if waveform_errors:
+ report_lines.extend(
+ [
+ "",
+ "## Waveform Errors",
+ "",
+ *[f"- {item}" for item in waveform_errors],
+ ]
+ )
+ if screenshot_error:
+ report_lines.extend(
+ [
+ "",
+ "## Screenshot Error",
+ "",
+ f"- {screenshot_error}",
+ ]
+ )
+ report_path.write_text("\n".join(report_lines) + "\n", encoding="utf-8")
+
+ payload = {
+ "label": args.label,
+ "profile": args.profile,
+ "esp_port": args.esp_port,
+ "scope_vid": args.scope_vid,
+ "scope_pid": args.scope_pid,
+ "arm": args.arm,
+ "arm_method": args.arm_method,
+ "single_stop_first": args.single_stop_first,
+ "finalize_method": args.finalize_method,
+ "finalize_delay_s": args.finalize_delay_s,
+ "trigger_status": scope_state["trigger_status"],
+ "timebase_scale": scope_state["timebase_scale"],
+ "trigger_snapshot": scope_state["trigger_snapshot"],
+ "after_arm_state": after_arm_state,
+ "after_finalize_state": after_finalize_state,
+ "screenshot_path": None if screenshot_path is None else str(screenshot_path),
+ "html_view_path": None if html_view_path is None else str(html_view_path),
+ "html_data_path": None if html_data_path is None else str(html_data_path),
+ "scpi_trace_path": None if trace_ctx is None else trace_ctx.scpi_trace_path,
+ "state_trace_path": None if trace_ctx is None else trace_ctx.state_trace_path,
+ "waveform_truth_path": (
+ None if waveform_truth_path is None else str(waveform_truth_path)
+ ),
+ "waveform_comparison_path": (
+ None
+ if waveform_comparison is None
+ else str(waveform_comparison["html_path"])
+ ),
+ "waveform_comparison_json_path": (
+ None
+ if waveform_comparison is None
+ else str(waveform_comparison["json_path"])
+ ),
+ "screenshot_error": screenshot_error,
+ "waveforms": [asdict(summary) for summary in waveforms],
+ "waveform_errors": waveform_errors,
+ "esp_serial_lines": esp_lines,
+ }
+ json_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
+
+ return report_path, json_path
+
+
+def _run_capture_case(scope, args, out_dir, esp_lines):
+ out_dir.mkdir(parents=True, exist_ok=True)
+ trace_ctx = _make_trace_context(out_dir, args)
+ if trace_ctx.scpi_trace_path is not None:
+ scope.enable_trace(trace_ctx.scpi_trace_path)
+ extra_state = _configure_scope(scope, args, trace_ctx)
+ time.sleep(max(args.capture_delay_s, 0.0))
+ after_arm_state = {}
+ if not args.immediate_deep_probe:
+ after_arm_state = _collect_scope_state(scope, style=args.state_probe_style)
+ after_arm_state.update(extra_state)
+ if args.finalize_method != "none":
+ _apply_state_step(
+ scope,
+ f"freeze_acquisition(method={args.finalize_method})",
+ trace_ctx,
+ lambda: scope.freeze_acquisition(
+ method=args.finalize_method,
+ settle_time=args.finalize_delay_s,
+ ),
+ settle_s=0.0,
+ )
+ after_finalize_state = {}
+ if not args.immediate_deep_probe:
+ after_finalize_state = _collect_scope_state(scope, style=args.state_probe_style)
+ scope_state = dict(after_finalize_state)
+ else:
+ scope_state = dict(extra_state)
+
+ screenshot_path = None
+ html_view_path = None
+ html_data_path = None
+ screenshot_error = None
+ waveform_truth = None
+ waveform_truth_path = None
+ waveform_comparison = None
+ waveforms = []
+ waveform_errors = []
+
+ if args.validate_waveforms:
+ try:
+ waveform_truth, waveform_truth_path, waveform_comparison = (
+ _capture_waveform_truth(
+ scope,
+ out_dir,
+ args,
+ scope_state,
+ args.capture_channels,
+ deep_first=args.immediate_deep_probe,
+ )
+ )
+ except Exception as exc:
+ waveform_errors.append(f"waveform truth capture: {exc}")
+
+ if args.immediate_deep_probe:
+ if waveform_truth is not None:
+ after_arm_state = _collect_scope_state(scope, style="minimal")
+ after_arm_state.update(extra_state)
+ after_finalize_state = dict(after_arm_state)
+ scope_state = dict(after_arm_state)
+ else:
+ try:
+ screenshot_path = _capture_screenshot(scope, out_dir, args)
+ except Exception as exc:
+ screenshot_error = str(exc)
+
+ for channel in args.capture_channels:
+ try:
+ waveforms.append(_capture_waveform(scope, channel, out_dir))
+ except Exception as exc:
+ waveform_errors.append(f"CH{channel}: {exc}")
+
+ scope_view_sample_rate_text = None
+ try:
+ scope_view_sample_rate_text = _metadata_sample_rate_text(
+ scope.read_waveform_metadata()
+ )
+ except Exception as exc:
+ waveform_errors.append(f"scope_view sample-rate metadata: {exc}")
+
+ try:
+ html_view_path, html_data_path = _render_scope_html(
+ out_dir,
+ args,
+ scope_state,
+ waveforms,
+ sample_rate_text=scope_view_sample_rate_text,
+ )
+ except Exception as exc:
+ waveform_errors.append(f"scope_view.html: {exc}")
+
+ report_path, json_path = _write_report(
+ out_dir,
+ args,
+ esp_lines,
+ scope_state,
+ after_arm_state,
+ after_finalize_state,
+ screenshot_path,
+ html_view_path,
+ html_data_path,
+ trace_ctx,
+ screenshot_error,
+ waveforms,
+ waveform_errors,
+ waveform_truth_path=waveform_truth_path,
+ waveform_truth=waveform_truth,
+ waveform_comparison=waveform_comparison,
+ )
+ primary_error = None
+ if screenshot_error:
+ primary_error = screenshot_error
+ elif waveform_errors:
+ primary_error = waveform_errors[0]
+ return (
+ {
+ "label": args.label,
+ "profile": args.profile,
+ "trigger_status": scope_state.get("trigger_status"),
+ "trigger_status_before_reads": after_arm_state.get("trigger_status"),
+ "trigger_status_after_finalize": after_finalize_state.get("trigger_status"),
+ "trigger_sweep": scope_state.get("trigger_sweep"),
+ "arm_method": args.arm_method,
+ "single_stop_first": args.single_stop_first,
+ "finalize_method": args.finalize_method,
+ "screenshot_path": (
+ None if screenshot_path is None else str(screenshot_path)
+ ),
+ "screenshot_error": screenshot_error,
+ "failure_class": (
+ None if primary_error is None else _classify_error(primary_error)
+ ),
+ "deep_metadata_ok": bool(
+ waveform_truth is not None
+ and waveform_truth.get("deep_memory_metadata") is not None
+ ),
+ "deep_ch1_ok": bool(
+ waveform_truth is not None
+ and "CH1" in waveform_truth.get("deep_memory_channels", {})
+ ),
+ "deep_bundle_ok": bool(
+ waveform_truth is not None
+ and waveform_truth.get("deep_memory_bundle") is not None
+ ),
+ "session_error": None,
+ "waveform_errors": list(waveform_errors),
+ "report_path": str(report_path),
+ "json_path": str(json_path),
+ },
+ report_path,
+ json_path,
+ waveforms,
+ )
+
+
+def _build_verification_cases(args):
+ cases = []
+ if args.verify_pulse:
+ trigger_time_us = int(args.pulse_trigger_time_us)
+ width_map = {
+ ">": max(trigger_time_us * 4, trigger_time_us + 100),
+ "<": max(10, trigger_time_us // 2),
+ "=": trigger_time_us,
+ }
+ for sign, suffix in ((">", "gt"), ("<", "lt"), ("=", "eq")):
+ case_width_us = width_map[sign]
+ cases.append(
+ _namespace_with_overrides(
+ args,
+ label=(
+ f"{args.label}_pulse_{suffix}_{trigger_time_us}us_"
+ f"src_{case_width_us}us"
+ ),
+ profile="pulse",
+ pulse_mode="single",
+ pulse_width_us=case_width_us,
+ pulse_sign=sign,
+ )
+ )
+ elif args.verify_slope:
+ for edge in ("POS", "NEG"):
+ for sign, suffix in ((">", "gt"), ("<", "lt"), ("=", "eq")):
+ cases.append(
+ _namespace_with_overrides(
+ args,
+ label=(
+ f"{args.label}_slope_{edge.lower()}_{suffix}_"
+ f"{int(args.slope_trigger_time_us)}us"
+ ),
+ profile="slope",
+ slope_edge=edge,
+ slope_sign=sign,
+ )
+ )
+ return cases
+
+
+def _build_edge_arming_cases(args):
+ base = {
+ "profile": "edge",
+ "capture_channels": [1, 2],
+ "pulse_mode": "single",
+ "pulse_width_us": 100,
+ "pulse_gap_us": 2000,
+ "pulse_frame_us": 2000,
+ "slope_half_period_us": 2000,
+ "ch1_probe": 1,
+ "ch2_probe": 1,
+ "ch1_scale_v_div": 1.0,
+ "ch2_scale_v_div": 1.0,
+ "ch1_position_div": 1.0,
+ "ch2_position_div": -2.0,
+ "timebase_s_div": 200e-6,
+ "trigger_level_v": 1.65,
+ "trigger_holdoff_ns": 100,
+ "validate_waveforms": True,
+ "arm": "single",
+ }
+ cases = [
+ ("edge_auto_legacy_stopfirst", "AUTO", "legacy_single", True, "none"),
+ ("edge_normal_legacy_stopfirst", "NORMal", "legacy_single", True, "none"),
+ ("edge_normal_legacy_nostopfirst", "NORMal", "legacy_single", False, "none"),
+ ("edge_single_legacy_stopfirst", "SINGle", "legacy_single", True, "none"),
+ (
+ "edge_auto_running_stopfirst_runningstop",
+ "AUTO",
+ "running_run",
+ True,
+ "running_stop",
+ ),
+ (
+ "edge_normal_running_stopfirst_runningstop",
+ "NORMal",
+ "running_run",
+ True,
+ "running_stop",
+ ),
+ (
+ "edge_normal_running_nostopfirst_runningstop",
+ "NORMal",
+ "running_run",
+ False,
+ "running_stop",
+ ),
+ (
+ "edge_single_running_stopfirst_runningstop",
+ "SINGle",
+ "running_run",
+ True,
+ "running_stop",
+ ),
+ (
+ "edge_normal_legacy_nostopfirst_runningstop",
+ "NORMal",
+ "legacy_single",
+ False,
+ "running_stop",
+ ),
+ (
+ "edge_normal_legacy_stopfirst_runningstop",
+ "NORMal",
+ "legacy_single",
+ True,
+ "running_stop",
+ ),
+ ]
+ built = []
+ for label, sweep, arm_method, stop_first, finalize_method in cases:
+ built.append(
+ _namespace_with_overrides(
+ args,
+ label=label,
+ trigger_sweep=sweep,
+ arm_method=arm_method,
+ single_stop_first=stop_first,
+ finalize_method=finalize_method,
+ **base,
+ )
+ )
+ return built
+
+
+def _run_case_with_fresh_scope(args, case_args, out_dir, esp_lines):
+ scope = None
+ transport_failure = False
+ clean_success = False
+ try:
+ scope = _open_scope(case_args)
+ summary, report_path, json_path, waveforms = _run_capture_case(
+ scope, case_args, out_dir, esp_lines
+ )
+ transport_failure = bool(
+ summary.get("failure_class")
+ in {"transport_timeout", "transport_pipe_error", "malformed_length_prefix"}
+ or (
+ summary.get("session_error") is not None
+ and _is_transport_failure(summary["session_error"])
+ )
+ )
+ clean_success = not transport_failure
+ return summary, report_path, json_path, waveforms
+ except Exception as exc:
+ failure_class = _classify_error(exc)
+ transport_failure = _is_transport_failure(exc)
+ report_path = out_dir / "case_error.txt"
+ report_path.write_text(f"{type(exc).__name__}: {exc}\n", encoding="utf-8")
+ summary = {
+ "label": case_args.label,
+ "profile": case_args.profile,
+ "trigger_status": None,
+ "trigger_status_before_reads": None,
+ "trigger_status_after_finalize": None,
+ "trigger_sweep": case_args.trigger_sweep,
+ "arm_method": case_args.arm_method,
+ "single_stop_first": case_args.single_stop_first,
+ "finalize_method": case_args.finalize_method,
+ "screenshot_path": None,
+ "screenshot_error": None,
+ "failure_class": failure_class,
+ "deep_metadata_ok": False,
+ "deep_ch1_ok": False,
+ "deep_bundle_ok": False,
+ "waveform_errors": [],
+ "session_error": f"{type(exc).__name__}: {exc}",
+ "report_path": str(report_path),
+ "json_path": None,
+ }
+ return summary, report_path, None, []
+ finally:
+ if scope is not None:
+ try:
+ scope.disable_trace()
+ except Exception:
+ pass
+ try:
+ if transport_failure:
+ if case_args.restore_after_failure:
+ try:
+ _restore_stable_edge(scope, case_args)
+ except Exception:
+ pass
+ scope.close(
+ reset_device=case_args.hard_reset_on_failure,
+ settle_time=case_args.reset_settle_s,
+ )
+ else:
+ if clean_success and case_args.restore_after_success:
+ try:
+ _restore_stable_edge(scope, case_args)
+ except Exception:
+ pass
+ scope.close()
+ except Exception:
+ pass
+
+
+def _write_verification_report(out_dir, args, summaries):
+ report_path = out_dir / "verification_report.md"
+ json_path = out_dir / "verification_report.json"
+
+ rows = []
+ for summary in summaries:
+ rows.append(
+ f"| `{summary['label']}` | `{summary['profile']}` | "
+ f"`{summary['trigger_status']}` | `{summary['trigger_sweep']}` | "
+ f"`{Path(summary['screenshot_path']).name if summary['screenshot_path'] else 'none'}` | "
+ f"`{Path(summary['report_path']).name}` | "
+ f"`{'; '.join(summary['waveform_errors']) if summary['waveform_errors'] else 'none'}` |"
+ )
+
+ lines = [
+ f"# OWON Trigger Verification: {args.label}",
+ "",
+ f"- Timestamp: `{_timestamp().isoformat(timespec='seconds')}`",
+ f"- ESP32 port: `{args.esp_port}`",
+ f"- Scope VID:PID: `{args.scope_vid}:{args.scope_pid}`",
+ "",
+ "| Case | Profile | Trigger status | Sweep | Screenshot | Report | Waveform errors |",
+ "| --- | --- | --- | --- | --- | --- | --- |",
+ *rows,
+ "",
+ "## Case Reports",
+ "",
+ *[
+ f"- [{Path(item['report_path']).name}]({Path(item['label']) / Path(item['report_path']).name})"
+ for item in summaries
+ ],
+ ]
+ report_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
+ json_path.write_text(json.dumps(summaries, indent=2), encoding="utf-8")
+ return report_path, json_path
+
+
+def _write_edge_arming_report(out_dir, args, summaries):
+ report_path = out_dir / "edge_arming_report.md"
+ json_path = out_dir / "edge_arming_report.json"
+
+ rows = []
+ for summary in summaries:
+ rows.append(
+ f"| `{summary['label']}` | `{summary['trigger_sweep']}` | "
+ f"`{summary['arm_method']}` | `{summary['single_stop_first']}` | "
+ f"`{summary['finalize_method']}` | "
+ f"`{summary['trigger_status_before_reads']}` | "
+ f"`{summary['trigger_status_after_finalize']}` | "
+ f"`{summary['deep_metadata_ok']}` | `{summary['deep_ch1_ok']}` | "
+ f"`{summary['deep_bundle_ok']}` | "
+ f"`{summary['failure_class'] or 'none'}` | "
+ f"`{Path(summary['report_path']).name if summary.get('report_path') else 'none'}` |"
+ )
+
+ lines = [
+ f"# OWON Edge Arming Matrix: {args.label}",
+ "",
+ f"- Timestamp: `{_timestamp().isoformat(timespec='seconds')}`",
+ f"- ESP32 port: `{args.esp_port}`",
+ f"- Scope VID:PID: `{args.scope_vid}:{args.scope_pid}`",
+ "",
+ "| Case | Sweep | Arm | Stop First | Finalize | Status Before Reads | Status After Finalize | Deep Meta | Deep CH1 | Deep Bundle | Failure Class | Report |",
+ "| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |",
+ *rows,
+ "",
+ "## Case Reports",
+ "",
+ *[
+ f"- [{Path(item['report_path']).name}]({Path(item['label']) / Path(item['report_path']).name})"
+ for item in summaries
+ if item.get("report_path")
+ ],
+ ]
+ report_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
+ json_path.write_text(json.dumps(summaries, indent=2), encoding="utf-8")
+ return report_path, json_path
diff --git a/doc/examples/_owon_capture_stable.py b/doc/examples/_owon_capture_stable.py
new file mode 100644
index 00000000..5c09b47d
--- /dev/null
+++ b/doc/examples/_owon_capture_stable.py
@@ -0,0 +1,390 @@
+#!/usr/bin/env python
+"""
+Stable capture helpers for the OWON ESP32 trigger jig runner.
+
+This module contains the promoted, supported example flows:
+
+- --capture-edge-pretty
+- --capture-edge-pretty-burst
+"""
+
+from __future__ import annotations
+
+from dataclasses import asdict
+import json
+from pathlib import Path
+from types import SimpleNamespace
+import time
+
+import instruments.owon.sds1104 as owon_sds1104
+
+from _owon_capture_common import (
+ _build_scope_html_data_from_series,
+ _capture_depmem_all_summary_and_series,
+ _capture_screen_channel_raw,
+ _capture_screenshot,
+ _clean_reply,
+ _compare_waveform_series,
+ _configure_esp_from_args,
+ _make_trace_context,
+ _metadata_sample_rate_text,
+ _open_scope,
+ _render_scope_html_with_data,
+ _render_waveform_comparison_html,
+ _safe_json_write,
+ _timestamp,
+)
+
+
+def _is_pretty_edge_mode(args):
+ return bool(args.capture_edge_pretty or args.capture_edge_pretty_burst)
+
+
+def _apply_pretty_edge_defaults(args):
+ if not _is_pretty_edge_mode(args):
+ return args
+
+ args.profile = "edge"
+ args.arm = "single"
+ # Bench finding on the connected DOS1104: forcing SWEEp NORMal in this
+ # promoted path is unsafe. The stable sequence uses raw setup plus
+ # `:TRIGger:SINGle:SWEEp SINGle`, then waits for a genuine trigger event.
+ args.trigger_sweep = None
+ args.trigger_level_v = 1.65
+ args.ch1_probe = 1
+ args.ch2_probe = 1
+ args.ch1_position_div = 1.0
+ args.ch2_position_div = -2.0
+ args.ch1_scale_v_div = 1.0
+ args.ch2_scale_v_div = 1.0
+ args.timebase_s_div = 0.0002
+ args.pulse_mode = "burst" if args.capture_edge_pretty_burst else "single"
+ args.pulse_width_us = 100
+ args.pulse_gap_us = 2000
+ args.pulse_frame_us = 2000
+ args.slope_half_period_us = 2000
+ if args.label == "owon_esp32_trigger_jig":
+ args.label = (
+ "pretty_edge_capture_burst"
+ if args.capture_edge_pretty_burst
+ else "pretty_edge_capture"
+ )
+ return args
+
+
+def _run_proven_edge_capture_case(args, out_dir):
+ out_dir.mkdir(parents=True, exist_ok=True)
+ trace_ctx = _make_trace_context(out_dir, args)
+ scope = _open_scope(args)
+ try:
+ if trace_ctx.scpi_trace_path is not None:
+ scope.enable_trace(trace_ctx.scpi_trace_path)
+
+ initial_status = _clean_reply(scope.query(":TRIGger:STATUS?"))
+ resumed_before_setup = False
+ if initial_status.upper() == "STOP":
+ scope.run()
+ time.sleep(0.2)
+ resumed_before_setup = True
+
+ sequence = [
+ ":CH1:DISP ON",
+ ":CH2:DISP ON",
+ ":CH3:DISP OFF",
+ ":CH4:DISP OFF",
+ ":CH1:PROB 1X",
+ ":CH2:PROB 1X",
+ ":CH1:SCAL 1v",
+ ":CH2:SCAL 1v",
+ ":CH1:POS 1.0",
+ ":CH2:POS -2.0",
+ ":ACQUire:Mode SAMPle",
+ ":ACQUIRE:DEPMEM 10K",
+ ":ACQUIRE:DEPMEM?",
+ ":HORIzontal:Scale 500us",
+ ":HORIzontal:Scale?",
+ ":TRIGger:TYPE SINGle",
+ ":TRIGger:SINGle:MODE EDGE",
+ ":TRIGger:SINGle:EDGE:SOURce CH1",
+ ":TRIGger:SINGle:EDGE:COUPling DC",
+ ":TRIGger:SINGle:EDGE:SLOPe RISE",
+ ":TRIGger:SINGle:EDGE:LEVel 1.64V",
+ ":TRIGger:SINGle:EDGE:LEVel?",
+ ":TRIGger:SINGle:HOLDoff 100ns",
+ ":TRIGger:SINGle:SWEEp SINGle",
+ ]
+ for command in sequence:
+ if command.endswith("?"):
+ scope.query(command)
+ else:
+ scope.sendcmd(command)
+
+ esp_lines = _configure_esp_from_args(args)
+ time.sleep(0.5)
+ time.sleep(0.5)
+
+ status_text = _clean_reply(scope.query(":TRIGger:STATUS?"))
+ screen_metadata = (
+ owon_sds1104._parse_json_payload( # pylint: disable=protected-access
+ scope._binary_query(
+ ":DATA:WAVE:SCREen:HEAD?"
+ ), # pylint: disable=protected-access
+ "waveform metadata",
+ )
+ )
+ screen_metadata_path = out_dir / "screen_waveform_metadata.json"
+ _safe_json_write(screen_metadata_path, screen_metadata)
+
+ waveforms = []
+ screen_series = {}
+ for channel in (1, 2):
+ summary, series = _capture_screen_channel_raw(
+ scope, screen_metadata, channel, out_dir
+ )
+ waveforms.append(summary)
+ screen_series[channel] = series
+
+ screenshot_path = _capture_screenshot(
+ scope, out_dir, SimpleNamespace(profile="edge", arm="single")
+ )
+ (
+ depmem_summary,
+ depmem_summary_path,
+ depmem_raw_path,
+ depmem_series,
+ ) = _capture_depmem_all_summary_and_series(scope, out_dir)
+
+ scope_state = {
+ "trigger_status": f"TriggerStatus.{status_text.lower()}",
+ "initial_trigger_status": initial_status,
+ "resumed_before_setup": resumed_before_setup,
+ "timebase_scale": "0.0005 second",
+ "trigger_snapshot": (
+ "SDS1104TriggerConfiguration("
+ f"status=<{status_text}>, "
+ "trigger_type=, "
+ "single_trigger_mode=, "
+ "holdoff=, "
+ "edge_source=, "
+ "edge_coupling=, "
+ "edge_slope=, "
+ "edge_level=, "
+ "video_source=None, video_standard=None, video_sync=None, video_line_number=None)"
+ ),
+ "trigger_sweep": "TriggerSweep.single",
+ "raw_edge_level": _clean_reply(scope.query(":TRIGger:SINGle:EDGE:LEVel?")),
+ }
+
+ sample_rate_text = _metadata_sample_rate_text(screen_metadata)
+ scope_html_data = _build_scope_html_data_from_series(
+ args,
+ scope_state,
+ screen_series,
+ memory_depth_text=str(len(next(iter(screen_series.values()))["y"])),
+ sample_rate_text=sample_rate_text,
+ )
+ html_view_path, html_data_path = _render_scope_html_with_data(
+ out_dir, scope_html_data, "scope_view"
+ )
+
+ depmem_view_path = None
+ depmem_view_json_path = None
+ if depmem_series:
+ depmem_scope_data = _build_scope_html_data_from_series(
+ args,
+ scope_state,
+ depmem_series,
+ memory_depth_text=str(len(next(iter(depmem_series.values()))["y"])),
+ sample_rate_text=depmem_summary["metadata"]["sample_rate"],
+ )
+ depmem_view_path, depmem_view_json_path = _render_scope_html_with_data(
+ out_dir, depmem_scope_data, "depmem_all_view"
+ )
+
+ report_path = out_dir / "report.md"
+ json_path = out_dir / "report.json"
+ lines = [
+ f"# OWON ESP32 Trigger Jig Run: {args.label}",
+ "",
+ f"- Timestamp: `{_timestamp().isoformat(timespec='seconds')}`",
+ "- Profile: `edge`",
+ f"- ESP32 port: `{args.esp_port}`",
+ f"- Scope VID:PID: `{args.scope_vid}:{args.scope_pid}`",
+ "- Arm command: `sequenced_raw_edge`",
+ f"- Screenshot: `{Path(screenshot_path).name}`",
+ "",
+ "## Primary Scope Screen",
+ "",
+ f".name})",
+ "",
+ "## Derived Scope View",
+ "",
+ f"- [Open `scope_view.html`]({Path(html_view_path).name})",
+ f"- [Open `scope_view.json`]({Path(html_data_path).name})",
+ "",
+ "## Scope State",
+ "",
+ f"- Initial trigger status: `{scope_state['initial_trigger_status']}`",
+ f"- Resumed before setup: `{scope_state['resumed_before_setup']}`",
+ f"- Trigger status: `{scope_state['trigger_status']}`",
+ "- Timebase scale: `0.0005 second`",
+ f"- Trigger sweep: `{scope_state['trigger_sweep']}`",
+ f"- Raw edge level: `{scope_state['raw_edge_level']}`",
+ "",
+ "## ESP32 Serial Output",
+ "",
+ "```text",
+ *esp_lines,
+ "```",
+ "",
+ "## Waveforms",
+ "",
+ "| Channel | Samples | t_start (s) | t_end (s) | v_min (V) | v_max (V) | v_pp (V) | CSV | Plot |",
+ "| --- | ---: | ---: | ---: | ---: | ---: | ---: | --- | --- |",
+ ]
+ for summary in waveforms:
+ lines.append(
+ f"| CH{summary.channel} | {summary.sample_count} | {summary.time_start_s:.6g} | "
+ f"{summary.time_end_s:.6g} | {summary.voltage_min_v:.6g} | "
+ f"{summary.voltage_max_v:.6g} | {summary.voltage_pp_v:.6g} | "
+ f"`{Path(summary.csv_path).name}` | "
+ f"`screen_ch{summary.channel}_plot.html` |"
+ )
+ lines.extend(
+ [
+ "",
+ "## Screen Metadata",
+ "",
+ f"- [Open `screen_waveform_metadata.json`]({screen_metadata_path.name})",
+ "",
+ "## DEPMem:All Summary",
+ "",
+ f"- [Open `depmem_all_summary.json`]({depmem_summary_path.name})",
+ f"- [Open `depmem_all_raw.bin`]({depmem_raw_path.name})",
+ ]
+ )
+ if depmem_view_path is not None:
+ lines.extend(
+ [
+ f"- [Open `depmem_all_view.html`]({Path(depmem_view_path).name})",
+ f"- [Open `depmem_all_view.json`]({Path(depmem_view_json_path).name})",
+ ]
+ )
+
+ waveform_comparison = {
+ "bmp_name": Path(screenshot_path).name,
+ "views": [
+ {
+ "stem": "scope_view",
+ "title": "Screen `:DATA:WAVE:SCREen:CH?`",
+ "html_name": Path(html_view_path).name,
+ "json_name": Path(html_data_path).name,
+ }
+ ],
+ "families": {},
+ }
+ if depmem_view_path is not None:
+ waveform_comparison["views"].append(
+ {
+ "stem": "depmem_all_view",
+ "title": "Deep `:DATA:WAVE:DEPMem:All?` converted from raw bundle",
+ "html_name": Path(depmem_view_path).name,
+ "json_name": Path(depmem_view_json_path).name,
+ }
+ )
+
+ channels = {}
+ for channel in sorted(set(screen_series) & set(depmem_series)):
+ channels[f"CH{channel}"] = _compare_waveform_series(
+ "screen `:DATA:WAVE:SCREen:CH?`",
+ screen_series[channel],
+ "deep `:DATA:WAVE:DEPMem:All?`",
+ depmem_series[channel],
+ )
+ if channels:
+ waveform_comparison["families"]["screen_vs_depmem_all"] = channels
+
+ waveform_comparison_path = out_dir / "waveform_comparison.json"
+ _safe_json_write(waveform_comparison_path, waveform_comparison)
+ waveform_comparison_html_path = _render_waveform_comparison_html(
+ out_dir, waveform_comparison
+ )
+
+ lines.extend(
+ [
+ "",
+ "## Waveform Comparison",
+ "",
+ f"- [Open `waveform_comparison.html`]({waveform_comparison_html_path.name})",
+ f"- [Open `waveform_comparison.json`]({waveform_comparison_path.name})",
+ "",
+ "| Family | Channel | A | B | Counts | Alignment | Mean dV | Max dV | Diff samples | Max dt |",
+ "| --- | --- | --- | --- | --- | --- | ---: | ---: | ---: | ---: |",
+ ]
+ )
+ for family, family_channels in waveform_comparison["families"].items():
+ for channel_name, metrics in family_channels.items():
+ lines.append(
+ f"| `{family}` | `{channel_name}` | `{metrics['series_a']}` | "
+ f"`{metrics['series_b']}` | "
+ f"`{metrics['count_a']} vs {metrics['count_b']} (shared {metrics['aligned_count']})` | "
+ f"`{metrics['alignment']}` | "
+ f"{metrics['mean_abs_voltage_delta_v']:.6g} | "
+ f"{metrics['max_abs_voltage_delta_v']:.6g} | "
+ f"{metrics['diff_sample_count']} | "
+ f"{metrics['max_abs_time_delta_s']:.6g} |"
+ )
+
+ if trace_ctx.scpi_trace_path is not None:
+ lines.extend(
+ [
+ "",
+ "## Trace",
+ "",
+ f"- [Open `scpi_trace.jsonl`]({Path(trace_ctx.scpi_trace_path).name})",
+ ]
+ )
+ report_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
+
+ payload = {
+ "label": args.label,
+ "profile": "edge",
+ "esp_port": args.esp_port,
+ "scope_vid": args.scope_vid,
+ "scope_pid": args.scope_pid,
+ "arm": "sequenced_raw_edge",
+ "trigger_status": scope_state["trigger_status"],
+ "initial_trigger_status": scope_state["initial_trigger_status"],
+ "resumed_before_setup": scope_state["resumed_before_setup"],
+ "timebase_scale": scope_state["timebase_scale"],
+ "trigger_snapshot": scope_state["trigger_snapshot"],
+ "trigger_sweep": scope_state["trigger_sweep"],
+ "raw_edge_level": scope_state["raw_edge_level"],
+ "screenshot_path": str(screenshot_path),
+ "html_view_path": str(html_view_path),
+ "html_data_path": str(html_data_path),
+ "screen_waveform_metadata_path": str(screen_metadata_path),
+ "depmem_all_summary_path": str(depmem_summary_path),
+ "depmem_all_raw_path": str(depmem_raw_path),
+ "depmem_all_view_path": (
+ None if depmem_view_path is None else str(depmem_view_path)
+ ),
+ "waveform_comparison_path": str(waveform_comparison_html_path),
+ "waveform_comparison_json_path": str(waveform_comparison_path),
+ "scpi_trace_path": (
+ None if trace_ctx.scpi_trace_path is None else trace_ctx.scpi_trace_path
+ ),
+ "waveforms": [asdict(summary) for summary in waveforms],
+ "esp_serial_lines": esp_lines,
+ }
+ json_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
+ return payload, report_path, json_path, waveforms
+ finally:
+ try:
+ scope.disable_trace()
+ except Exception:
+ pass
+ try:
+ scope.close()
+ except Exception:
+ pass
diff --git a/doc/examples/ex_owon_sds1104.py b/doc/examples/ex_owon_sds1104.py
new file mode 100644
index 00000000..aefe968f
--- /dev/null
+++ b/doc/examples/ex_owon_sds1104.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+"""
+Minimal example for connecting to an OWON SDS1104-family oscilloscope.
+"""
+
+import instruments as ik
+from instruments.units import ureg as u
+
+
+def main():
+ """
+ Open the scope over raw USB, print a few stable values, and read the
+ current CH1 screen waveform. The trigger example below only uses the
+ conservative trigger paths promoted in the driver.
+ """
+ scope = ik.owon.OWONSDS1104.open_usb(
+ enable_scpi=False,
+ ignore_scpi_failure=True,
+ settle_time=0.1,
+ )
+ try:
+ print(f"Identity: {scope.name}")
+ print(f"Timebase scale: {scope.timebase_scale}")
+ print(f"CH1 displayed: {scope.channel[0].display}")
+ print(f"CH1 coupling: {scope.channel[0].coupling}")
+ print(f"Trigger type: {scope.trigger_type}")
+ print(f"Single trigger mode: {scope.single_trigger_mode}")
+
+ scope.stop()
+ scope.single_trigger_mode = scope.SingleTriggerMode.edge
+ scope.trigger_source = scope.TriggerSource.ch1
+ scope.trigger_coupling = scope.TriggerCoupling.dc
+ scope.trigger_slope = scope.TriggerSlope.rise
+ scope.trigger_level = 25 * u.millivolt
+ scope.trigger_holdoff = 100 * u.nanosecond
+ print(f"Trigger snapshot: {scope.read_trigger_configuration()}")
+
+ time_s, voltage_v = scope.channel[0].read_waveform()
+ print(f"CH1 waveform samples: {len(voltage_v)}")
+ print(f"First sample: t={time_s[0]!r}, v={voltage_v[0]!r}")
+ finally:
+ scope.close()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/doc/examples/kiprim/ex_kiprim_dc310s.py b/doc/examples/kiprim/ex_kiprim_dc310s.py
new file mode 100644
index 00000000..30129444
--- /dev/null
+++ b/doc/examples/kiprim/ex_kiprim_dc310s.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python
+
+import instruments as ik
+
+psu = ik.kiprim.DC310S.open_serial("COM8", baud=115200, timeout=0.5)
+
+print(psu.name)
+print(f"Setpoint: {psu.voltage}, {psu.current}, output={psu.output}")
+print(f"Measured: {psu.voltage_sense}, {psu.current_sense}, {psu.power_sense}")
+
+# Uncomment to configure the supply explicitly.
+# psu.voltage = 5 * ik.units.volt
+# psu.current = 0.25 * ik.units.ampere
+# psu.output = True
diff --git a/doc/examples/owon_esp32_trigger_jig.md b/doc/examples/owon_esp32_trigger_jig.md
new file mode 100644
index 00000000..785da772
--- /dev/null
+++ b/doc/examples/owon_esp32_trigger_jig.md
@@ -0,0 +1,301 @@
+# ESP32 Trigger Test Jig For OWON SDS1104 / DOS1104
+
+This note describes a very small bench jig for validating the SDS1104 / DOS1104
+trigger families that now look real on hardware but are not fully exercised in
+the front-panel UI:
+
+- `PULSe`
+- `SLOPe`
+- `EDGE:COUPling HF`
+- `EDGE:SOURce ACLine`
+
+The goal is not RF-grade timing accuracy. The goal is to generate clean,
+repeatable, low-voltage bench signals that are good enough to validate trigger
+behavior at easy time scales such as `10 us`, `50 us`, `100 us`, and `1 ms`.
+
+## Hardware
+
+Use any ordinary 3.3 V ESP32 dev board with exposed GPIOs.
+
+Preferred firmware path:
+
+- PlatformIO project: [owon_esp32_trigger_jig_pio/platformio.ini](./owon_esp32_trigger_jig_pio/platformio.ini)
+- PlatformIO source: [owon_esp32_trigger_jig_pio/src/main.cpp](./owon_esp32_trigger_jig_pio/src/main.cpp)
+- Command-line scope runner: [owon_esp32_trigger_jig_runner.py](./owon_esp32_trigger_jig_runner.py)
+
+Recommended parts:
+
+- 1 x ESP32 dev board
+- 2 x `330 ohm` series resistors
+- 1 x `1 kohm` series resistor
+- 1 x `100 kohm` resistor, optional
+- 4 x capacitors for the slope node
+ - `4.7 nF`
+ - `22 nF`
+ - `47 nF`
+ - `470 nF`
+- jumper wires
+- optional small breadboard
+
+Use `10x` probes if possible. They load the signal less and make the RC slope
+node behave more predictably.
+
+## Exact Wiring
+
+Common ground:
+
+- ESP32 `GND` -> scope ground clips for all channels used
+
+CH1 direct pulse output:
+
+- ESP32 `GPIO18` -> `330 ohm` -> scope `CH1` probe tip
+
+CH2 slope output node:
+
+- ESP32 `GPIO19` -> `1 kohm` -> node `SLOPE_OUT`
+- `SLOPE_OUT` -> scope `CH2` probe tip
+- `SLOPE_OUT` -> selected capacitor -> `GND`
+- optional: `SLOPE_OUT` -> `100 kohm` -> `GND`
+
+CH3 frame marker, optional:
+
+- ESP32 `GPIO21` -> `330 ohm` -> scope `CH3` probe tip
+
+Do not connect the scope probe tips directly to `3V3`.
+
+### RC Values For Approximate Slope Times
+
+The CH2 node is a first-order RC edge shaper. The approximate `10%` to `90%`
+rise or fall time is about `2.2 * R * C`.
+
+With `R = 1 kohm`:
+
+- `4.7 nF` -> about `10 us`
+- `22 nF` -> about `48 us`
+- `47 nF` -> about `103 us`
+- `470 nF` -> about `1.03 ms`
+
+This is good enough for conservative trigger validation.
+
+## What Each Channel Is For
+
+`CH1`
+
+- direct digital pulse train
+- use this for `PULSe` trigger tests
+
+`CH2`
+
+- RC-shaped version of a square wave
+- use this for `SLOPe` trigger tests
+
+`CH3`
+
+- optional burst marker
+- useful when checking where one pulse frame starts
+
+## Scope Setup Suggestions
+
+For pulse tests on `CH1`:
+
+- source: `CH1`
+- trigger mode: `PULSe`
+- coupling: start with `DC`
+- slope or direction: match the pulse edge you care about
+
+For slope tests on `CH2`:
+
+- source: `CH2`
+- trigger mode: `SLOPe`
+- set upper and lower levels around the middle of the RC swing, for example
+ `0.8 V` and `2.4 V`
+- set `TIME` near the expected slope time from the RC table
+
+For `HF` coupling tests:
+
+- start from ordinary `EDGE` mode on `CH1`
+- compare trigger behavior with `DC`, `AC`, and `HF`
+- `HF` is easiest to see later with a signal that has slow baseline movement
+ plus a fast edge component
+
+For `ACLine`:
+
+- the ESP32 jig does not generate line-synchronous AC
+- use a separate isolated low-voltage AC source later, for example a small
+ transformer secondary
+- never connect mains directly to the scope or to the ESP32
+
+## Minimal Test Procedure
+
+### Pulse Trigger
+
+The firmware emits a repeating burst on `CH1` with pulse widths:
+
+- `10 us`
+- `50 us`
+- `100 us`
+- `1000 us`
+
+Suggested checks:
+
+- set `PULSe:TIME 50us`
+- try `PULSe:SIGN >`, `<`, and `=`
+- verify the scope prefers the matching pulse width or class
+
+### Slope Trigger
+
+Pick one capacitor on `CH2`, for example:
+
+- `22 nF` for about `50 us`
+
+Suggested checks:
+
+- set `SLOPe:TIME 50us`
+- try `SLOPe:SIGN >`, `<`, and `=`
+- try `SLOPe:SLOPe POS` and `NEG`
+- verify that triggering changes when you swap the capacitor to another
+ timescale
+
+### HF Coupling
+
+Start with the CH1 pulse source.
+
+Suggested checks:
+
+- compare `EDGE:COUPling DC`, `AC`, and `HF`
+- later, if needed, add a slow baseline modulation source and confirm `HF`
+ ignores it better than `DC`
+
+## Limitations
+
+- `delayMicroseconds()` on ESP32 is good enough for bench trigger validation,
+ not for calibrated timing metrology
+- the RC slope node is only approximately linear
+- for sub-microsecond work, use RP2040 PIO or a function generator instead
+
+## Firmware
+
+The maintained firmware path is the PlatformIO project:
+
+- [owon_esp32_trigger_jig_pio/platformio.ini](./owon_esp32_trigger_jig_pio/platformio.ini)
+- [owon_esp32_trigger_jig_pio/src/main.cpp](./owon_esp32_trigger_jig_pio/src/main.cpp)
+
+## PlatformIO Build And Upload
+
+From the repo root:
+
+```powershell
+cd doc/examples/owon_esp32_trigger_jig_pio
+pio run
+pio run -t upload
+pio device monitor
+```
+
+## Command-Line Runner
+
+The Python runner configures the ESP32 over serial, configures the scope,
+captures a BMP screenshot, saves waveform CSV files, and writes a Markdown and
+JSON report into a timestamped artifact directory.
+
+Basic pulse example:
+
+```powershell
+python doc/examples/owon_esp32_trigger_jig_runner.py `
+ --esp-port COM7 `
+ --profile pulse `
+ --pulse-mode burst `
+ --pulse-trigger-time-us 50 `
+ --capture-channels 1 `
+ --timebase-s-div 0.00005
+```
+
+Basic slope example:
+
+```powershell
+python doc/examples/owon_esp32_trigger_jig_runner.py `
+ --esp-port COM7 `
+ --profile slope `
+ --slope-trigger-time-us 50 `
+ --slope-upper-v 2.4 `
+ --slope-lower-v 0.8 `
+ --capture-channels 2 `
+ --timebase-s-div 0.001
+```
+
+Pretty stable edge capture:
+
+```powershell
+python doc/examples/owon_esp32_trigger_jig_runner.py `
+ --esp-port COM7 `
+ --capture-edge-pretty
+```
+
+Waveform truth validation capture:
+
+```powershell
+python doc/examples/owon_esp32_trigger_jig_runner.py `
+ --esp-port COM7 `
+ --capture-edge-pretty `
+ --validate-waveforms
+```
+
+The runner saves artifacts under:
+
+```text
+doc/examples/artifacts/YYYYMMDD_HHMMSS_