diff --git a/.gitignore b/.gitignore index dfa829df..cbba33c1 100644 --- a/.gitignore +++ b/.gitignore @@ -69,3 +69,6 @@ coverage.xml # version file generated by setuptools_scm src/instruments/_version.py +doc/examples/artifacts/ +InstrumentKit_1.zip +InstrumentKit_2.zip diff --git a/doc/examples/_owon_capture_common.py b/doc/examples/_owon_capture_common.py new file mode 100644 index 00000000..bb61f7ed --- /dev/null +++ b/doc/examples/_owon_capture_common.py @@ -0,0 +1,636 @@ +#!/usr/bin/env python +""" +Shared helpers for OWON DOS1104 example capture scripts. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from datetime import datetime, timezone +import json +from pathlib import Path +from typing import Any +import time + +import serial + +import instruments as ik +import instruments.owon.sds1104 as owon_sds1104 +from instruments.units import ureg as u + + +def _timestamp(): + return datetime.now(timezone.utc) + + +def _artifact_dir(root, label, timestamp=None): + timestamp = _timestamp() if timestamp is None else timestamp + date_dir = Path(root) / timestamp.strftime("%Y%m%d") + return date_dir / f"{timestamp.strftime('%Y%m%d_%H%M%S')}_{label}" + + +def _clean_reply(text): + cleaned = str(text).strip() + if cleaned.endswith("->"): + cleaned = cleaned[:-2].rstrip() + return cleaned + + +def _format_token(value, base_unit, units): + quantity = u.Quantity(value, base_unit) if not hasattr(value, "to") else value + quantity = quantity.to(base_unit) + for scale, suffix in units: + scaled = quantity.to(scale).magnitude + if abs(scaled) >= 1 or suffix == units[-1][1]: + text = f"{scaled:.12f}".rstrip("0").rstrip(".") + if text == "-0": + text = "0" + return f"{text}{suffix}" + raise ValueError(f"Could not format token for {value!r}") + + +def _format_time_token(value): + return _format_token( + value, + u.second, + [ + (u.microsecond, "us"), + (u.millisecond, "ms"), + (u.second, "s"), + ], + ) + + +def _format_voltage_token(value): + return _format_token( + value, + u.volt, + [ + (u.millivolt, "mV"), + (u.volt, "V"), + ], + ) + + +@dataclass +class WaveformSummary: + channel: int + sample_count: int + time_start_s: float + time_end_s: float + voltage_min_v: float + voltage_max_v: float + voltage_pp_v: float + csv_path: str + + +@dataclass +class RawWaveformSummary: + channel: int + sample_count: int + raw_min: int + raw_max: int + raw_first_16: list[int] + metadata_scale: str + metadata_probe: str + metadata_offset: Any + + +@dataclass +class TraceContext: + scpi_trace_path: str | None + state_trace_path: str | None + state_probe_style: str = "full" + state_seq: int = 0 + + +def _format_sample_rate_text(x_values): + if len(x_values) < 2: + return "unknown" + dt = abs(x_values[1] - x_values[0]) + if dt <= 0: + return "unknown" + sample_rate = 1.0 / dt + + def _fmt(value): + return f"{value:.6f}".rstrip("0").rstrip(".") + + if sample_rate >= 0.9995e9: + return f"{_fmt(sample_rate / 1e9)}GS/s" + if sample_rate >= 0.9995e6: + return f"{_fmt(sample_rate / 1e6)}MS/s" + if sample_rate >= 0.9995e3: + return f"{_fmt(sample_rate / 1e3)}kS/s" + return f"{_fmt(sample_rate)}S/s" + + +def _write_jsonl(path, payload): + if path is None: + return + with Path(path).open("a", encoding="utf-8") as handle: + handle.write(json.dumps(payload, ensure_ascii=True) + "\n") + + +def _make_trace_context(out_dir, args): + trace_root = Path(args.trace_dir) / out_dir.name if args.trace_dir else out_dir + trace_root.mkdir(parents=True, exist_ok=True) + return TraceContext( + scpi_trace_path=( + str(trace_root / "scpi_trace.jsonl") if args.trace_scpi else None + ), + state_trace_path=( + str(trace_root / "state_trace.jsonl") if args.trace_state else None + ), + state_probe_style=args.state_probe_style, + ) + + +def _metadata_sample_rate_text(metadata): + sample = metadata.get("SAMPLE") if isinstance(metadata, dict) else None + if not isinstance(sample, dict): + return None + raw = sample.get("SAMPLERATE") + if raw in {None, ""}: + return None + return _clean_reply(str(raw)).replace("(", "").replace(")", "") + + +def _open_scope(args): + deadline = time.monotonic() + args.scope_open_retry_s + last_exc = None + while time.monotonic() < deadline: + try: + strict_open = bool(getattr(args, "strict_open_scpi", False)) + return ik.owon.OWONSDS1104.open_usb( + vid=int(args.scope_vid, 0), + pid=int(args.scope_pid, 0), + timeout=args.scope_timeout_s * u.second, + enable_scpi=(args.enable_scpi or strict_open), + ignore_scpi_failure=not strict_open, + settle_time=args.scope_settle_s, + ) + except Exception as exc: # pragma: no cover - bench retry path + last_exc = exc + time.sleep(0.5) + raise RuntimeError(f"Scope open failed: {last_exc}") + + +def _open_serial(args): + return serial.Serial( + port=args.esp_port, + baudrate=args.esp_baud, + timeout=0.2, + write_timeout=0.2, + ) + + +def _configure_esp_from_args(args): + with _open_serial(args) as port: + return _configure_esp(port, args) + + +def _serial_write_line(port, line): + port.write((line + "\n").encode("utf-8")) + port.flush() + + +def _serial_drain(port, duration_s): + deadline = time.monotonic() + max(duration_s, 0.0) + lines = [] + while time.monotonic() < deadline: + raw = port.readline() + if raw: + lines.append(raw.decode("utf-8", errors="replace").rstrip()) + else: + time.sleep(0.01) + return lines + + +def _configure_esp(port, args): + _serial_write_line(port, "status") + if args.pulse_mode == "burst": + _serial_write_line(port, "burst") + else: + _serial_write_line(port, f"single {int(args.pulse_width_us)}") + _serial_write_line(port, f"gap {int(args.pulse_gap_us)}") + _serial_write_line(port, f"frame {int(args.pulse_frame_us)}") + _serial_write_line(port, f"half {int(args.slope_half_period_us)}") + return _serial_drain(port, 0.5) + + +def _capture_screenshot(scope, out_dir, args): + bmp_path = out_dir / f"{args.profile}_{args.arm}_scope_screen.bmp" + bmp_path.write_bytes(scope.read_screen_bmp()) + return bmp_path + + +def _capture_waveform(scope, channel, out_dir): + x_axis, y_axis = scope.read_waveform(channel) + csv_path = out_dir / f"ch{channel}_waveform.csv" + + x_values = [float(value) for value in x_axis] + y_values = [float(value) for value in y_axis] + + with csv_path.open("w", encoding="utf-8", newline="") as handle: + writer = csv.writer(handle) + writer.writerow(["time_s", "voltage_v"]) + writer.writerows(zip(x_values, y_values, strict=True)) + + v_min = min(y_values) + v_max = max(y_values) + return WaveformSummary( + channel=channel, + sample_count=len(y_values), + time_start_s=x_values[0], + time_end_s=x_values[-1], + voltage_min_v=v_min, + voltage_max_v=v_max, + voltage_pp_v=v_max - v_min, + csv_path=str(csv_path), + ) + + +def _capture_screen_channel_raw(scope, metadata, channel, out_dir): + point_count = scope._waveform_point_count( + metadata + ) # pylint: disable=protected-access + payload = scope._binary_query_exact( # pylint: disable=protected-access + f":DATA:WAVE:SCREen:CH{channel}?", 4 + 2 * point_count + ) + raw_adc = owon_sds1104._parse_waveform_adc( # pylint: disable=protected-access + owon_sds1104._strip_packet_prefix( + payload, f"screen waveform CH{channel}" + ), # pylint: disable=protected-access + f"screen waveform CH{channel}", + ) + x_axis = scope._waveform_time_axis( + metadata, point_count + ) # pylint: disable=protected-access + y_axis = scope._waveform_voltage_axis( + metadata, channel, raw_adc + ) # pylint: disable=protected-access + csv_path = out_dir / f"ch{channel}_waveform.csv" + + x_values = [float(value) for value in x_axis] + y_values = [float(value) for value in y_axis] + + with csv_path.open("w", encoding="utf-8", newline="") as handle: + writer = csv.writer(handle) + writer.writerow(["time_s", "voltage_v"]) + writer.writerows(zip(x_values, y_values, strict=True)) + + v_min = min(y_values) + v_max = max(y_values) + summary = WaveformSummary( + channel=channel, + sample_count=len(y_values), + time_start_s=x_values[0], + time_end_s=x_values[-1], + voltage_min_v=v_min, + voltage_max_v=v_max, + voltage_pp_v=v_max - v_min, + csv_path=str(csv_path), + ) + return summary, {"x": x_values, "y": y_values} + + +def _capture_depmem_all_summary_and_series(scope, out_dir): + raw_payload = scope.read_deep_memory_all_raw() + raw_path = out_dir / "depmem_all_raw.bin" + raw_path.write_bytes(raw_payload) + capture = scope._parse_deep_memory_all_payload( + raw_payload + ) # pylint: disable=protected-access + summary = { + "metadata": { + "timebase_scale": capture.metadata.get("TIMEBASE", {}).get("SCALE"), + "hoffset": capture.metadata.get("TIMEBASE", {}).get("HOFFSET"), + "datalen": capture.metadata.get("SAMPLE", {}).get("DATALEN"), + "sample_rate": capture.metadata.get("SAMPLE", {}).get("SAMPLERATE"), + "depmem": capture.metadata.get("SAMPLE", {}).get("DEPMEM"), + "screenoffset": capture.metadata.get("SAMPLE", {}).get("SCREENOFFSET"), + "runstatus": capture.metadata.get("RUNSTATUS"), + }, + "channels": { + f"CH{channel}": { + "samples": len(raw_values), + "raw_min": int(min(raw_values)), + "raw_max": int(max(raw_values)), + "first_16": [int(value) for value in list(raw_values)[:16]], + } + for channel, raw_values in capture.raw_channels.items() + }, + } + summary_path = out_dir / "depmem_all_summary.json" + _safe_json_write(summary_path, summary) + + series = {} + for channel, raw_values in capture.raw_channels.items(): + x_axis = scope._waveform_time_axis( + capture.metadata, len(raw_values) + ) # pylint: disable=protected-access + y_axis = scope._waveform_voltage_axis( + capture.metadata, channel, raw_values + ) # pylint: disable=protected-access + series[channel] = { + "x": [float(value) for value in x_axis], + "y": [float(value) for value in y_axis], + } + return summary, summary_path, raw_path, series + + +def _safe_json_write(path, payload): + path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + + +def _build_scope_html_data_from_series( + args, + scope_state, + waveform_data, + memory_depth_text=None, + sample_rate_text=None, +): + first_channel = next(iter(waveform_data.values()), {"x": [0.0, 1.0], "y": []}) + time_window_s = None + if len(first_channel["x"]) >= 2: + time_window_s = abs( + float(first_channel["x"][-1]) - float(first_channel["x"][0]) + ) + if sample_rate_text is None: + sample_rate_text = _format_sample_rate_text(first_channel["x"]) + if memory_depth_text is None: + memory_depth_text = ( + str(len(first_channel["y"])) if first_channel["y"] else "unknown" + ) + + status_text = str(scope_state.get("trigger_status", "STOP")) + if "." in status_text: + status_text = status_text.split(".")[-1].upper() + + if args.profile == "edge": + trigger_source = args.edge_source.upper() + trigger_coupling = args.edge_coupling.upper() + trigger_edge = "falling" if args.edge_slope.lower() == "fall" else "rising" + elif args.profile == "pulse": + trigger_source = args.pulse_source.upper() + trigger_coupling = args.pulse_coupling.upper() + trigger_edge = "falling" if args.pulse_dir.upper() == "NEG" else "rising" + else: + trigger_source = args.slope_source.upper() + trigger_coupling = "DC" + trigger_edge = "falling" if args.slope_edge.upper() == "NEG" else "rising" + + return { + "meta": { + "status": status_text.title() if status_text == "STOP" else status_text, + "timebase_s_div": args.timebase_s_div, + "time_window_s": time_window_s, + "sample_rate_text": sample_rate_text, + "memory_depth_text": memory_depth_text, + }, + "trigger": { + "source": trigger_source, + "coupling": trigger_coupling, + "edge": trigger_edge, + "level_v": args.trigger_level_v, + "horizontal_pos_s": 0.0, + }, + "channels": { + "CH1": { + "visible": 1 in args.capture_channels, + "volts_per_div": args.ch1_scale_v_div, + "position_div": args.ch1_position_div, + "color": "#ffff00", + "data": waveform_data.get(1, {}).get("y", []), + "time_s": waveform_data.get(1, {}).get("x", []), + }, + "CH2": { + "visible": 2 in args.capture_channels, + "volts_per_div": args.ch2_scale_v_div, + "position_div": args.ch2_position_div, + "color": "#00ffff", + "data": waveform_data.get(2, {}).get("y", []), + "time_s": waveform_data.get(2, {}).get("x", []), + }, + "CH3": { + "visible": 3 in args.capture_channels, + "volts_per_div": args.ch3_scale_v_div, + "position_div": args.ch3_position_div, + "color": "#ff8800", + "data": waveform_data.get(3, {}).get("y", []), + "time_s": waveform_data.get(3, {}).get("x", []), + }, + }, + } + + +def _build_scope_html_data(args, scope_state, waveforms): + waveform_data = {} + for summary in waveforms: + with Path(summary.csv_path).open(newline="", encoding="utf-8") as handle: + rows = list(csv.DictReader(handle)) + waveform_data[summary.channel] = { + "x": [float(row["time_s"]) for row in rows], + "y": [float(row["voltage_v"]) for row in rows], + } + return _build_scope_html_data_from_series(args, scope_state, waveform_data) + + +def _render_scope_html_with_data(out_dir, scope_data, stem): + template_path = Path(__file__).resolve().parent / "scope.html" + template = template_path.read_text(encoding="utf-8") + template = template.replace("scope_view_data.js", f"{stem}_data.js") + template = template.replace("scope_view.json", f"{stem}.json") + html_path = out_dir / f"{stem}.html" + json_path = out_dir / f"{stem}.json" + js_path = out_dir / f"{stem}_data.js" + html_path.write_text(template, encoding="utf-8") + json_path.write_text(json.dumps(scope_data, indent=2), encoding="utf-8") + js_path.write_text( + "window.__SCOPE_DATA__ = " + json.dumps(scope_data) + ";\n", encoding="utf-8" + ) + return html_path, json_path + + +def _render_scope_html(out_dir, args, scope_state, waveforms, sample_rate_text=None): + scope_data = _build_scope_html_data(args, scope_state, waveforms) + if sample_rate_text is not None: + scope_data["meta"]["sample_rate_text"] = sample_rate_text + return _render_scope_html_with_data(out_dir, scope_data, "scope_view") + + +def _max_abs_delta(values_a, values_b): + if not values_a or not values_b: + return 0.0 + return max( + abs(float(a) - float(b)) for a, b in zip(values_a, values_b, strict=True) + ) + + +def _mean_abs_delta(values_a, values_b): + if not values_a or not values_b: + return 0.0 + deltas = [abs(float(a) - float(b)) for a, b in zip(values_a, values_b, strict=True)] + return sum(deltas) / len(deltas) + + +def _align_waveform_series(series_a, series_b): + x_a = list(series_a["x"]) + y_a = list(series_a["y"]) + x_b = list(series_b["x"]) + y_b = list(series_b["y"]) + if len(y_a) == len(y_b): + return { + "alignment": "exact", + "x_a": x_a, + "y_a": y_a, + "x_b": x_b, + "y_b": y_b, + } + + if abs(len(y_a) - len(y_b)) == 1: + if len(y_a) > len(y_b): + candidates = [ + ("drop_first_a", x_a[1:], y_a[1:], x_b, y_b), + ("drop_last_a", x_a[:-1], y_a[:-1], x_b, y_b), + ] + else: + candidates = [ + ("drop_first_b", x_a, y_a, x_b[1:], y_b[1:]), + ("drop_last_b", x_a, y_a, x_b[:-1], y_b[:-1]), + ] + best = min( + candidates, + key=lambda item: ( + _max_abs_delta(item[2], item[4]), + _mean_abs_delta(item[2], item[4]), + ), + ) + return { + "alignment": best[0], + "x_a": list(best[1]), + "y_a": list(best[2]), + "x_b": list(best[3]), + "y_b": list(best[4]), + } + + shared = min(len(y_a), len(y_b)) + return { + "alignment": f"prefix_trim_to_{shared}", + "x_a": x_a[:shared], + "y_a": y_a[:shared], + "x_b": x_b[:shared], + "y_b": y_b[:shared], + } + + +def _compare_waveform_series(name_a, series_a, name_b, series_b): + aligned = _align_waveform_series(series_a, series_b) + delta_samples = [ + abs(float(a) - float(b)) + for a, b in zip(aligned["y_a"], aligned["y_b"], strict=True) + ] + return { + "series_a": name_a, + "series_b": name_b, + "count_a": len(series_a["y"]), + "count_b": len(series_b["y"]), + "aligned_count": len(aligned["y_a"]), + "alignment": aligned["alignment"], + "max_abs_voltage_delta_v": max(delta_samples) if delta_samples else 0.0, + "mean_abs_voltage_delta_v": ( + sum(delta_samples) / len(delta_samples) if delta_samples else 0.0 + ), + "diff_sample_count": sum(1 for delta in delta_samples if delta > 1e-9), + "max_abs_time_delta_s": _max_abs_delta(aligned["x_a"], aligned["x_b"]), + "voltage_pp_a_v": ( + max(series_a["y"]) - min(series_a["y"]) if series_a["y"] else 0.0 + ), + "voltage_pp_b_v": ( + max(series_b["y"]) - min(series_b["y"]) if series_b["y"] else 0.0 + ), + } + + +def _render_waveform_comparison_html(out_dir, comparison): + html_path = out_dir / "waveform_comparison.html" + lines = [ + "", + '', + "", + '', + '', + "Waveform Comparison", + "", + "", + "", + "

Waveform Comparison

", + '

BMP is the ground-truth screen capture. The HTML views below render the public waveform APIs grouped by acquisition family.

', + ] + if comparison.get("bmp_name"): + lines.extend( + [ + "

Reference BMP

", + f'
Scope BMP
', + ] + ) + + lines.extend( + [ + "

Numeric Comparison

", + "", + "", + "", + ] + ) + for family, channels in comparison.get("families", {}).items(): + for channel_name, metrics in channels.items(): + lines.append( + "" + f"" + f"" + f"" + f"" + f"" + f"" + f"" + f"" + f"" + f"" + "" + ) + lines.extend(["", "
FamilyChannelABCountsAlignmentMean |dV|Max |dV|Diff samplesMax |dt|
{family}{channel_name}{metrics['series_a']}{metrics['series_b']}{metrics['count_a']} vs {metrics['count_b']} (shared {metrics['aligned_count']}){metrics['alignment']}{metrics['mean_abs_voltage_delta_v']:.6g} V{metrics['max_abs_voltage_delta_v']:.6g} V{metrics['diff_sample_count']}{metrics['max_abs_time_delta_s']:.6g} s
"]) + + views = comparison.get("views", []) + if views: + lines.extend(["

Rendered Views

", '
']) + for view in views: + lines.extend( + [ + '
', + f"

{view['title']}

", + f'

Open HTML | Open JSON

', + f'', + "
", + ] + ) + lines.append("
") + + lines.extend(["", ""]) + html_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + return html_path diff --git a/doc/examples/_owon_capture_debug.py b/doc/examples/_owon_capture_debug.py new file mode 100644 index 00000000..388fa19f --- /dev/null +++ b/doc/examples/_owon_capture_debug.py @@ -0,0 +1,1489 @@ +#!/usr/bin/env python +""" +Debug and validation helpers for the OWON ESP32 trigger jig runner. +""" + +from __future__ import annotations + +import argparse +from dataclasses import asdict +import json +from pathlib import Path +import time + +import usb.core + +import instruments as ik +from instruments.units import ureg as u + +from _owon_capture_common import ( + RawWaveformSummary, + _build_scope_html_data_from_series, + _capture_screenshot, + _capture_waveform, + _clean_reply, + _compare_waveform_series, + _format_time_token, + _format_voltage_token, + _make_trace_context, + _metadata_sample_rate_text, + _open_scope, + _render_scope_html, + _render_scope_html_with_data, + _render_waveform_comparison_html, + _safe_json_write, + _timestamp, + _write_jsonl, +) + + +def _raw_waveform_summary(scope, channel): + metadata = scope.read_waveform_metadata() + point_count = scope._waveform_point_count( + metadata + ) # pylint: disable=protected-access + payload = scope._binary_query_exact( # pylint: disable=protected-access + f":DATA:WAVE:SCREEN:CH{channel}?", + 4 + 2 * point_count, + ) + raw_adc = ik.owon.sds1104._parse_waveform_adc( # pylint: disable=protected-access + ik.owon.sds1104._strip_packet_prefix(payload, f"screen waveform CH{channel}"), + f"screen waveform CH{channel}", + ) + raw_list = [int(value) for value in raw_adc] + channel_meta = metadata["CHANNEL"][channel - 1] + return ( + RawWaveformSummary( + channel=channel, + sample_count=len(raw_list), + raw_min=min(raw_list), + raw_max=max(raw_list), + raw_first_16=raw_list[:16], + metadata_scale=str(channel_meta.get("SCALE")), + metadata_probe=str(channel_meta.get("PROBE")), + metadata_offset=channel_meta.get("OFFSET"), + ), + metadata, + ) + + +def _summarize_numeric_axis(values): + values = [float(value) for value in values] + return { + "count": len(values), + "min": min(values), + "max": max(values), + "pp": max(values) - min(values), + "first_16": values[:16], + } + + +def _capture_waveform_truth( + scope, out_dir, args, scope_state, capture_channels, deep_first=False +): + truth = { + "screen_bmp_path": None, + "screen_metadata_path": None, + "screen_metadata": None, + "screen_waveforms": {}, + "screen_channel_alias_waveforms": {}, + "deep_memory_metadata_path": None, + "deep_memory_metadata": None, + "deep_memory_channels": {}, + "deep_memory_bundle": None, + "errors": [], + } + screen_series = {} + screen_alias_series = {} + deep_channel_series = {} + deep_bundle_series = {} + screen_metadata = None + deep_metadata = None + comparison = { + "bmp_name": None, + "views": [], + "families": {}, + } + + try: + deep_metadata = scope.read_deep_memory_metadata() + deep_metadata_path = out_dir / "deep_memory_metadata.json" + _safe_json_write(deep_metadata_path, deep_metadata) + truth["deep_memory_metadata_path"] = str(deep_metadata_path) + truth["deep_memory_metadata"] = deep_metadata + except Exception as exc: + truth["errors"].append(f"read_deep_memory_metadata: {exc}") + deep_metadata = None + + for channel in capture_channels: + try: + x_axis, y_axis = scope.read_deep_memory_channel(channel) + x_values = [float(value) for value in x_axis] + y_values = [float(value) for value in y_axis] + truth["deep_memory_channels"][f"CH{channel}"] = { + "time": _summarize_numeric_axis(x_values), + "voltage": _summarize_numeric_axis(y_values), + } + deep_channel_series[channel] = {"x": x_values, "y": y_values} + except Exception as exc: + truth["errors"].append(f"read_deep_memory_channel({channel}): {exc}") + + try: + bundle = scope.read_deep_memory_all() + truth["deep_memory_bundle"] = { + "metadata_keys": sorted(bundle.metadata.keys()), + "raw_channels": { + f"CH{channel}": { + "count": len(raw_values), + "min": int(min(raw_values)), + "max": int(max(raw_values)), + "first_16": [int(value) for value in list(raw_values)[:16]], + } + for channel, raw_values in bundle.raw_channels.items() + }, + } + converted_channels = {} + for channel, raw_values in bundle.raw_channels.items(): + x_axis = scope._waveform_time_axis( + bundle.metadata, len(raw_values) + ) # pylint: disable=protected-access + y_axis = scope._waveform_voltage_axis( + bundle.metadata, channel, raw_values + ) # pylint: disable=protected-access + x_values = [float(value) for value in x_axis] + y_values = [float(value) for value in y_axis] + converted_channels[f"CH{channel}"] = { + "time": _summarize_numeric_axis(x_values), + "voltage": _summarize_numeric_axis(y_values), + } + deep_bundle_series[channel] = {"x": x_values, "y": y_values} + truth["deep_memory_bundle"]["converted_channels"] = converted_channels + except Exception as exc: + truth["errors"].append(f"read_deep_memory_all: {exc}") + + if not deep_first: + bmp_path = out_dir / "waveform_truth_screen.bmp" + bmp_path.write_bytes(scope.read_screen_bmp()) + truth["screen_bmp_path"] = str(bmp_path) + comparison["bmp_name"] = bmp_path.name + + try: + screen_metadata = scope.read_waveform_metadata() + screen_metadata_path = out_dir / "screen_waveform_metadata.json" + _safe_json_write(screen_metadata_path, screen_metadata) + truth["screen_metadata_path"] = str(screen_metadata_path) + truth["screen_metadata"] = screen_metadata + except Exception as exc: + truth["errors"].append(f"read_waveform_metadata: {exc}") + screen_metadata = None + + for channel in capture_channels: + try: + raw_summary, metadata = _raw_waveform_summary(scope, channel) + truth["screen_waveforms"][f"CH{channel}"] = { + "raw_summary": asdict(raw_summary), + "metadata_scale": str( + metadata["CHANNEL"][channel - 1].get("SCALE") + ), + "metadata_probe": str( + metadata["CHANNEL"][channel - 1].get("PROBE") + ), + "metadata_offset": metadata["CHANNEL"][channel - 1].get("OFFSET"), + } + except Exception as exc: + truth["errors"].append(f"raw screen CH{channel}: {exc}") + + try: + x_axis, y_axis = scope.read_waveform(channel) + x_values = [float(value) for value in x_axis] + y_values = [float(value) for value in y_axis] + truth["screen_waveforms"].setdefault(f"CH{channel}", {}) + truth["screen_waveforms"][f"CH{channel}"]["converted_summary"] = { + "time": _summarize_numeric_axis(x_values), + "voltage": _summarize_numeric_axis(y_values), + } + screen_series[channel] = {"x": x_values, "y": y_values} + except Exception as exc: + truth["errors"].append(f"read_waveform({channel}): {exc}") + + try: + x_axis, y_axis = scope.channel[channel - 1].read_waveform() + x_values = [float(value) for value in x_axis] + y_values = [float(value) for value in y_axis] + truth["screen_channel_alias_waveforms"][f"CH{channel}"] = { + "time": _summarize_numeric_axis(x_values), + "voltage": _summarize_numeric_axis(y_values), + } + screen_alias_series[channel] = {"x": x_values, "y": y_values} + except Exception as exc: + truth["errors"].append(f"channel[{channel - 1}].read_waveform(): {exc}") + + truth_path = out_dir / "waveform_truth.json" + _safe_json_write(truth_path, truth) + + view_specs = [ + ( + "screen_read_waveform_view", + "Screen `read_waveform(channel)`", + screen_series, + screen_metadata, + ), + ( + "screen_channel_alias_view", + "Screen `channel[n].read_waveform()`", + screen_alias_series, + screen_metadata, + ), + ( + "deep_memory_channel_view", + "Deep `read_deep_memory_channel(channel)`", + deep_channel_series, + deep_metadata, + ), + ( + "deep_memory_bundle_view", + "Deep `read_deep_memory_all()` converted from raw bundle", + deep_bundle_series, + deep_metadata, + ), + ] + for stem, title, waveform_data, metadata in view_specs: + if not waveform_data: + continue + first_channel = next(iter(waveform_data.values())) + scope_data = _build_scope_html_data_from_series( + args, + scope_state, + waveform_data, + memory_depth_text=str(len(first_channel["y"])), + sample_rate_text=_metadata_sample_rate_text(metadata), + ) + html_path, json_path = _render_scope_html_with_data(out_dir, scope_data, stem) + comparison["views"].append( + { + "stem": stem, + "title": title, + "html_name": html_path.name, + "json_name": json_path.name, + } + ) + + family_map = { + "screen": ( + "read_waveform(channel)", + screen_series, + "channel[n].read_waveform()", + screen_alias_series, + ), + "deep_memory": ( + "read_deep_memory_channel(channel)", + deep_channel_series, + "read_deep_memory_all()", + deep_bundle_series, + ), + } + for family_name, (name_a, series_a, name_b, series_b) in family_map.items(): + channels = {} + for channel in sorted(set(series_a) & set(series_b)): + channels[f"CH{channel}"] = _compare_waveform_series( + name_a, + series_a[channel], + name_b, + series_b[channel], + ) + if channels: + comparison["families"][family_name] = channels + + comparison_path = out_dir / "waveform_comparison.json" + _safe_json_write(comparison_path, comparison) + comparison_html_path = _render_waveform_comparison_html(out_dir, comparison) + comparison["json_path"] = str(comparison_path) + comparison["html_path"] = str(comparison_html_path) + + return truth, truth_path, comparison + + +def _classify_error(text): + lowered = str(text).lower() + if "timeout" in lowered or "timed out" in lowered: + return "transport_timeout" + if "pipe error" in lowered: + return "transport_pipe_error" + if "length-prefixed body" in lowered or "length mismatch" in lowered: + return "malformed_length_prefix" + if "deep_memory" in lowered or "deep-memory" in lowered or "depmem" in lowered: + return "deep_memory_error" + if ( + "waveform" in lowered + or lowered.startswith("ch1:") + or lowered.startswith("ch2:") + ): + return "waveform_read_error" + return "unknown_error" + + +def _is_transport_failure(exc): + if isinstance(exc, (usb.core.USBTimeoutError, usb.core.USBError)): + return True + text = str(exc).lower() + if isinstance(exc, OSError) and ( + "timeout" in text + or "timed out" in text + or "pipe" in text + or "access denied" in text + ): + return True + if isinstance(exc, ValueError) and ( + "length-prefixed" in text + or "payload is too short" in text + or "packet prefix" in text + or "header" in text + ): + return True + return False + + +def _snapshot_scope_state(scope): + def capture(fn): + try: + value = fn() + if hasattr(value, "units"): + return {"value": value.magnitude, "units": str(value.units)} + return str(value) + except Exception as exc: + return f"error: {type(exc).__name__}: {exc}" + + return { + "trigger_status": capture(lambda: scope.trigger_status), + "trigger_type": capture(lambda: scope.trigger_type), + "single_trigger_mode": capture(lambda: scope.single_trigger_mode), + "trigger_sweep": capture(lambda: scope.trigger_sweep), + "timebase_scale": capture(lambda: scope.timebase_scale), + "horizontal_offset": capture(lambda: scope.horizontal_offset), + "memory_depth": capture(lambda: scope.memory_depth), + "acquire_mode": capture(lambda: scope.acquire_mode), + "acquire_averages": capture(lambda: scope.acquire_averages), + "measurement_display_enabled": capture( + lambda: scope.measurement_display_enabled + ), + "raw_edge_level": capture( + lambda: _clean_reply(scope.query(":TRIGger:SINGle:EDGE:LEVel?")) + ), + "trigger_configuration": capture(lambda: scope.read_trigger_configuration()), + } + + +def _snapshot_scope_state_minimal(scope): + def capture(fn): + try: + return str(fn()) + except Exception as exc: + return f"error: {type(exc).__name__}: {exc}" + + return { + "trigger_status": capture( + lambda: _clean_reply(scope.query(":TRIGger:STATUS?")) + ), + "trigger_sweep": capture( + lambda: _clean_reply(scope.query(":TRIGger:SINGle:SWEEp?")) + ), + "raw_edge_level": capture( + lambda: _clean_reply(scope.query(":TRIGger:SINGle:EDGE:LEVel?")) + ), + "timebase_scale": capture( + lambda: _clean_reply(scope.query(":HORIzontal:Scale?")) + ), + "memory_depth": capture(lambda: _clean_reply(scope.query(":ACQUIRE:DEPMEM?"))), + } + + +def _capture_state_for_style(scope, style): + if style == "off": + return None + if style == "minimal": + return _snapshot_scope_state_minimal(scope) + return _snapshot_scope_state(scope) + + +def _record_state_snapshot(trace_ctx, phase, command_label, snapshot, error=None): + if ( + trace_ctx is None + or trace_ctx.state_trace_path is None + or trace_ctx.state_probe_style == "off" + or snapshot is None + ): + return + trace_ctx.state_seq += 1 + payload = { + "ts": _timestamp().isoformat(timespec="microseconds"), + "seq": trace_ctx.state_seq, + "phase": phase, + "command_label": command_label, + "snapshot": snapshot, + } + if error is not None: + payload["error"] = error + _write_jsonl(trace_ctx.state_trace_path, payload) + + +def _apply_state_step(scope, label, trace_ctx, callback, settle_s=0.05): + style = "full" if trace_ctx is None else trace_ctx.state_probe_style + _record_state_snapshot( + trace_ctx, "before", label, _capture_state_for_style(scope, style) + ) + try: + result = callback() + except Exception as exc: + _record_state_snapshot( + trace_ctx, + "error", + label, + _capture_state_for_style(scope, style), + error=f"{type(exc).__name__}: {exc}", + ) + raise + _record_state_snapshot( + trace_ctx, "after", label, _capture_state_for_style(scope, style) + ) + time.sleep(max(float(settle_s), 0.0)) + _record_state_snapshot( + trace_ctx, + "after_settle", + label, + _capture_state_for_style(scope, style), + ) + return result + + +def _apply_channel_setup(scope, args, trace_ctx): + for channel in range(1, 5): + _apply_state_step( + scope, + f"channel[{channel}].display = {channel in args.capture_channels}", + trace_ctx, + lambda channel=channel: setattr( + scope.channel[channel - 1], + "display", + channel in args.capture_channels, + ), + ) + + _apply_state_step( + scope, + f"channel[1].probe_attenuation = {int(args.ch1_probe)}", + trace_ctx, + lambda: setattr(scope.channel[0], "probe_attenuation", int(args.ch1_probe)), + ) + _apply_state_step( + scope, + f"channel[2].probe_attenuation = {int(args.ch2_probe)}", + trace_ctx, + lambda: setattr(scope.channel[1], "probe_attenuation", int(args.ch2_probe)), + ) + if 3 in args.capture_channels: + _apply_state_step( + scope, + f"channel[3].probe_attenuation = {int(args.ch3_probe)}", + trace_ctx, + lambda: setattr(scope.channel[2], "probe_attenuation", int(args.ch3_probe)), + ) + + if args.ch1_scale_v_div is not None: + _apply_state_step( + scope, + f"channel[1].scale = {args.ch1_scale_v_div} V/div", + trace_ctx, + lambda: setattr(scope.channel[0], "scale", args.ch1_scale_v_div * u.volt), + ) + if args.ch2_scale_v_div is not None: + _apply_state_step( + scope, + f"channel[2].scale = {args.ch2_scale_v_div} V/div", + trace_ctx, + lambda: setattr(scope.channel[1], "scale", args.ch2_scale_v_div * u.volt), + ) + if args.ch3_scale_v_div is not None and 3 in args.capture_channels: + _apply_state_step( + scope, + f"channel[3].scale = {args.ch3_scale_v_div} V/div", + trace_ctx, + lambda: setattr(scope.channel[2], "scale", args.ch3_scale_v_div * u.volt), + ) + + _apply_state_step( + scope, + f"channel[1].position = {float(args.ch1_position_div)}", + trace_ctx, + lambda: setattr(scope.channel[0], "position", float(args.ch1_position_div)), + ) + _apply_state_step( + scope, + f"channel[2].position = {float(args.ch2_position_div)}", + trace_ctx, + lambda: setattr(scope.channel[1], "position", float(args.ch2_position_div)), + ) + if 3 in args.capture_channels: + _apply_state_step( + scope, + f"channel[3].position = {float(args.ch3_position_div)}", + trace_ctx, + lambda: setattr(scope.channel[2], "position", float(args.ch3_position_div)), + ) + + +def _apply_trigger_common(scope, args, trace_ctx): + sweep_readback = None + if args.trigger_sweep: + try: + _apply_state_step( + scope, + f"trigger_sweep = {args.trigger_sweep}", + trace_ctx, + lambda: setattr( + scope, "trigger_sweep", scope.TriggerSweep(args.trigger_sweep) + ), + ) + sweep_readback = str(scope.trigger_sweep) + except Exception as exc: + sweep_readback = f"error: {exc}" + return sweep_readback + + +def _configure_scope(scope, args, trace_ctx): + _apply_channel_setup(scope, args, trace_ctx) + if args.memory_depth is not None: + _apply_state_step( + scope, + f"memory_depth = {int(args.memory_depth)}", + trace_ctx, + lambda: setattr(scope, "memory_depth", int(args.memory_depth)), + ) + if args.horizontal_offset_div is not None: + _apply_state_step( + scope, + f"horizontal_offset = {float(args.horizontal_offset_div)}", + trace_ctx, + lambda: setattr( + scope, "horizontal_offset", float(args.horizontal_offset_div) + ), + ) + if args.measurement_display == "on": + _apply_state_step( + scope, + "measurement_display_enabled = True", + trace_ctx, + lambda: setattr(scope, "measurement_display_enabled", True), + ) + elif args.measurement_display == "off": + _apply_state_step( + scope, + "measurement_display_enabled = False", + trace_ctx, + lambda: setattr(scope, "measurement_display_enabled", False), + ) + _apply_state_step( + scope, + f"timebase_scale = {args.timebase_s_div}", + trace_ctx, + lambda: setattr(scope, "timebase_scale", args.timebase_s_div * u.second), + ) + _apply_state_step( + scope, + ":TRIGger:TYPE SINGle", + trace_ctx, + lambda: scope.sendcmd(":TRIGger:TYPE SINGle"), + ) + + if args.profile == "edge": + _apply_state_step( + scope, + "single_trigger_mode = EDGE", + trace_ctx, + lambda: setattr(scope, "single_trigger_mode", scope.SingleTriggerMode.edge), + ) + _apply_state_step( + scope, + f"trigger_source = {args.edge_source}", + trace_ctx, + lambda: setattr( + scope, + "trigger_source", + getattr(scope.TriggerSource, args.edge_source.lower()), + ), + ) + _apply_state_step( + scope, + f"trigger_coupling = {args.edge_coupling}", + trace_ctx, + lambda: setattr( + scope, + "trigger_coupling", + getattr(scope.TriggerCoupling, args.edge_coupling.lower()), + ), + ) + _apply_state_step( + scope, + f"trigger_slope = {args.edge_slope}", + trace_ctx, + lambda: setattr( + scope, + "trigger_slope", + getattr(scope.TriggerSlope, args.edge_slope.lower()), + ), + ) + if args.trigger_level_v is not None: + _apply_state_step( + scope, + f"trigger_level = {args.trigger_level_v} V", + trace_ctx, + lambda: setattr(scope, "trigger_level", args.trigger_level_v * u.volt), + ) + elif args.profile == "pulse": + _apply_state_step( + scope, + "single_trigger_mode = PULSe", + trace_ctx, + lambda: setattr( + scope, "single_trigger_mode", scope.SingleTriggerMode.pulse + ), + ) + _apply_state_step( + scope, + f"PULSe:SOURce {args.pulse_source.upper()}", + trace_ctx, + lambda: scope.sendcmd( + f":TRIGger:SINGle:PULSe:SOURce {args.pulse_source.upper()}" + ), + ) + _apply_state_step( + scope, + f"PULSe:SIGN {args.pulse_sign}", + trace_ctx, + lambda: scope.sendcmd(f":TRIGger:SINGle:PULSe:SIGN {args.pulse_sign}"), + ) + _apply_state_step( + scope, + f"PULSe:TIME {args.pulse_trigger_time_us} us", + trace_ctx, + lambda: scope.sendcmd( + f":TRIGger:SINGle:PULSe:TIME {_format_time_token(args.pulse_trigger_time_us * u.microsecond)}" + ), + ) + _apply_state_step( + scope, + f"PULSe:DIR {args.pulse_dir.upper()}", + trace_ctx, + lambda: scope.sendcmd( + f":TRIGger:SINGle:PULSe:DIR {args.pulse_dir.upper()}" + ), + ) + _apply_state_step( + scope, + f"PULSe:COUPling {args.pulse_coupling.upper()}", + trace_ctx, + lambda: scope.sendcmd( + f":TRIGger:SINGle:PULSe:COUPling {args.pulse_coupling.upper()}" + ), + ) + elif args.profile == "slope": + _apply_state_step( + scope, + "single_trigger_mode = SLOPe", + trace_ctx, + lambda: setattr( + scope, "single_trigger_mode", scope.SingleTriggerMode.slope + ), + ) + _apply_state_step( + scope, + f"SLOPe:SOURce {args.slope_source.upper()}", + trace_ctx, + lambda: scope.sendcmd( + f":TRIGger:SINGle:SLOPe:SOURce {args.slope_source.upper()}" + ), + ) + _apply_state_step( + scope, + f"SLOPe:ULevel {args.slope_upper_v} V", + trace_ctx, + lambda: scope.sendcmd( + f":TRIGger:SINGle:SLOPe:ULevel {_format_voltage_token(args.slope_upper_v * u.volt)}" + ), + ) + _apply_state_step( + scope, + f"SLOPe:LLevel {args.slope_lower_v} V", + trace_ctx, + lambda: scope.sendcmd( + f":TRIGger:SINGle:SLOPe:LLevel {_format_voltage_token(args.slope_lower_v * u.volt)}" + ), + ) + _apply_state_step( + scope, + f"SLOPe:SIGN {args.slope_sign}", + trace_ctx, + lambda: scope.sendcmd(f":TRIGger:SINGle:SLOPe:SIGN {args.slope_sign}"), + ) + _apply_state_step( + scope, + f"SLOPe:TIME {args.slope_trigger_time_us} us", + trace_ctx, + lambda: scope.sendcmd( + f":TRIGger:SINGle:SLOPe:TIME {_format_time_token(args.slope_trigger_time_us * u.microsecond)}" + ), + ) + _apply_state_step( + scope, + f"SLOPe:SLOPe {args.slope_edge.upper()}", + trace_ctx, + lambda: scope.sendcmd( + f":TRIGger:SINGle:SLOPe:SLOPe {args.slope_edge.upper()}" + ), + ) + else: + raise ValueError(f"Unsupported profile: {args.profile}") + + _apply_state_step( + scope, + f"trigger_holdoff = {args.trigger_holdoff_ns} ns", + trace_ctx, + lambda: setattr( + scope, "trigger_holdoff", args.trigger_holdoff_ns * u.nanosecond + ), + ) + sweep_readback = _apply_trigger_common(scope, args, trace_ctx) + + if args.arm == "run": + _apply_state_step(scope, "run()", trace_ctx, scope.run) + elif args.arm == "single": + _apply_state_step( + scope, + f"single(stop_first={args.single_stop_first}, arm_method={args.arm_method})", + trace_ctx, + lambda: scope.single( + stop_first=args.single_stop_first, + arm_method=args.arm_method, + ), + ) + elif args.arm == "stop": + _apply_state_step(scope, "stop()", trace_ctx, scope.stop) + return { + "trigger_sweep": sweep_readback, + "trigger_status": str(scope.trigger_status), + "trigger_snapshot": str(scope.read_trigger_configuration()), + "raw_edge_level": _clean_reply(scope.query(":TRIGger:SINGle:EDGE:LEVel?")), + "trigger_type": str(scope.trigger_type), + } + + +def _restore_stable_edge(scope, args): + try: + scope.sendcmd(":TRIGger:TYPE SINGle") + scope.sendcmd(":TRIGger:SINGle:MODE EDGE") + scope.sendcmd(f":TRIGger:SINGle:EDGE:SOURce {args.edge_source.upper()}") + scope.sendcmd(f":TRIGger:SINGle:EDGE:COUPling {args.edge_coupling.upper()}") + scope.sendcmd(f":TRIGger:SINGle:EDGE:SLOPe {args.edge_slope.upper()}") + if args.trigger_level_v is not None: + scope.sendcmd( + f":TRIGger:SINGle:EDGE:LEVel {_format_voltage_token(args.trigger_level_v * u.volt)}" + ) + except Exception: + pass + + +def _collect_scope_state(scope, style="full"): + if style == "off": + return {} + if style == "minimal": + return _snapshot_scope_state_minimal(scope) + state = {} + try: + state["trigger_status"] = str(scope.trigger_status) + except Exception as exc: + state["trigger_status"] = f"error: {exc}" + try: + state["trigger_type"] = str(scope.trigger_type) + except Exception as exc: + state["trigger_type"] = f"error: {exc}" + try: + state["timebase_scale"] = str(scope.timebase_scale) + except Exception as exc: + state["timebase_scale"] = f"error: {exc}" + try: + state["trigger_sweep"] = str(scope.trigger_sweep) + except Exception as exc: + state["trigger_sweep"] = f"error: {exc}" + try: + state["raw_edge_level"] = _clean_reply( + scope.query(":TRIGger:SINGle:EDGE:LEVel?") + ) + except Exception as exc: + state["raw_edge_level"] = f"error: {exc}" + try: + state["trigger_snapshot"] = str(scope.read_trigger_configuration()) + except Exception as exc: + state["trigger_snapshot"] = f"error: {exc}" + return state + + +def _namespace_with_overrides(args, **updates): + data = vars(args).copy() + data.update(updates) + return argparse.Namespace(**data) + + +def _write_report( + out_dir, + args, + esp_lines, + scope_state, + after_arm_state, + after_finalize_state, + screenshot_path, + html_view_path, + html_data_path, + trace_ctx, + screenshot_error, + waveforms, + waveform_errors, + waveform_truth_path=None, + waveform_truth=None, + waveform_comparison=None, +): + report_path = out_dir / "report.md" + json_path = out_dir / "report.json" + + waveform_rows = [] + for summary in waveforms: + waveform_rows.append( + f"| CH{summary.channel} | {summary.sample_count} | " + f"{summary.time_start_s:.6g} | {summary.time_end_s:.6g} | " + f"{summary.voltage_min_v:.6g} | {summary.voltage_max_v:.6g} | " + f"{summary.voltage_pp_v:.6g} | `{Path(summary.csv_path).name}` |" + ) + + report_lines = [ + f"# OWON ESP32 Trigger Jig Run: {args.label}", + "", + f"- Timestamp: `{_timestamp().isoformat(timespec='seconds')}`", + f"- Profile: `{args.profile}`", + f"- ESP32 port: `{args.esp_port}`", + f"- Scope VID:PID: `{args.scope_vid}:{args.scope_pid}`", + f"- Arm command: `{args.arm}`", + f"- Screenshot: `{screenshot_path.name if screenshot_path is not None else 'not captured'}`", + "", + "## Primary Scope Screen", + "", + *( + [f"![Primary scope screen]({Path(screenshot_path).name})", ""] + if screenshot_path is not None + else [] + ), + "", + "## Derived Scope View", + "", + *( + [ + f"- [Open `scope_view.html`]({Path(html_view_path).name})", + f"- [Open `scope_view.json`]({Path(html_data_path).name})", + "", + "> VS Code Markdown preview disables local scripts/iframes here.", + "> Open the HTML file directly to view the rendered scope screen.", + "", + ] + if html_view_path is not None and html_data_path is not None + else [] + ), + *( + [ + "", + "## Trace Artifacts", + "", + f"- [Open `scpi_trace.jsonl`]({Path(trace_ctx.scpi_trace_path).name})", + f"- [Open `state_trace.jsonl`]({Path(trace_ctx.state_trace_path).name})", + "", + ] + if trace_ctx is not None + and ( + trace_ctx.scpi_trace_path is not None + or trace_ctx.state_trace_path is not None + ) + else [] + ), + "", + "## ESP32 Configuration", + "", + f"- Pulse mode: `{args.pulse_mode}`", + f"- Pulse width: `{args.pulse_width_us} us`", + f"- Pulse gap: `{args.pulse_gap_us} us`", + f"- Frame gap: `{args.pulse_frame_us} us`", + f"- Slope half-period: `{args.slope_half_period_us} us`", + "", + "## Scope State", + "", + f"- Trigger status: `{scope_state['trigger_status']}`", + f"- Timebase scale: `{scope_state['timebase_scale']}`", + f"- Trigger snapshot: `{scope_state['trigger_snapshot']}`", + "", + "## Arming States", + "", + f"- After arm: `{after_arm_state}`", + f"- After finalize: `{after_finalize_state}`", + "", + "## ESP32 Serial Output", + "", + "```text", + *esp_lines, + "```", + "", + "## Waveforms", + "", + "| Channel | Samples | t_start (s) | t_end (s) | v_min (V) | v_max (V) | v_pp (V) | CSV |", + "| --- | ---: | ---: | ---: | ---: | ---: | ---: | --- |", + *waveform_rows, + "", + *( + [ + "## Waveform Truth", + "", + f"- [Open `waveform_truth.json`]({Path(waveform_truth_path).name})", + "", + f"- Errors: `{'; '.join(waveform_truth['errors']) if waveform_truth and waveform_truth.get('errors') else 'none'}`", + "", + ] + if waveform_truth_path is not None and waveform_truth is not None + else [] + ), + *( + [ + "## Waveform Comparison", + "", + f"- [Open `waveform_comparison.html`]({Path(waveform_comparison['html_path']).name})", + f"- [Open `waveform_comparison.json`]({Path(waveform_comparison['json_path']).name})", + "", + "| Family | Channel | A | B | Counts | Alignment | Mean dV | Max dV | Diff samples | Max dt |", + "| --- | --- | --- | --- | --- | --- | ---: | ---: | ---: | ---: |", + *[ + f"| `{family}` | `{channel_name}` | " + f"`{metrics['series_a']}` | `{metrics['series_b']}` | " + f"`{metrics['count_a']} vs {metrics['count_b']} (shared {metrics['aligned_count']})` | " + f"`{metrics['alignment']}` | " + f"`{metrics['mean_abs_voltage_delta_v']:.6g}` | " + f"`{metrics['max_abs_voltage_delta_v']:.6g}` | " + f"`{metrics['diff_sample_count']}` | " + f"`{metrics['max_abs_time_delta_s']:.6g}` |" + for family, channels in waveform_comparison["families"].items() + for channel_name, metrics in channels.items() + ], + "", + ] + if waveform_comparison is not None + else [] + ), + "", + "## Notes", + "", + "- This report is generated by `owon_esp32_trigger_jig_runner.py`.", + "- The BMP is the primary evidence artifact.", + "- `scope_view.html` is a derived view built from `scope_view.json`.", + "- `waveform_comparison.html` groups the screen and deep-memory APIs side by side.", + "- HTML waveform views keep the discrete front-panel `M:` scale and show a separate derived `Span:` from the returned x-axis.", + "- It records the exact run inputs and captured artifacts, but it does not yet make a formal pass/fail verdict.", + ] + if waveform_errors: + report_lines.extend( + [ + "", + "## Waveform Errors", + "", + *[f"- {item}" for item in waveform_errors], + ] + ) + if screenshot_error: + report_lines.extend( + [ + "", + "## Screenshot Error", + "", + f"- {screenshot_error}", + ] + ) + report_path.write_text("\n".join(report_lines) + "\n", encoding="utf-8") + + payload = { + "label": args.label, + "profile": args.profile, + "esp_port": args.esp_port, + "scope_vid": args.scope_vid, + "scope_pid": args.scope_pid, + "arm": args.arm, + "arm_method": args.arm_method, + "single_stop_first": args.single_stop_first, + "finalize_method": args.finalize_method, + "finalize_delay_s": args.finalize_delay_s, + "trigger_status": scope_state["trigger_status"], + "timebase_scale": scope_state["timebase_scale"], + "trigger_snapshot": scope_state["trigger_snapshot"], + "after_arm_state": after_arm_state, + "after_finalize_state": after_finalize_state, + "screenshot_path": None if screenshot_path is None else str(screenshot_path), + "html_view_path": None if html_view_path is None else str(html_view_path), + "html_data_path": None if html_data_path is None else str(html_data_path), + "scpi_trace_path": None if trace_ctx is None else trace_ctx.scpi_trace_path, + "state_trace_path": None if trace_ctx is None else trace_ctx.state_trace_path, + "waveform_truth_path": ( + None if waveform_truth_path is None else str(waveform_truth_path) + ), + "waveform_comparison_path": ( + None + if waveform_comparison is None + else str(waveform_comparison["html_path"]) + ), + "waveform_comparison_json_path": ( + None + if waveform_comparison is None + else str(waveform_comparison["json_path"]) + ), + "screenshot_error": screenshot_error, + "waveforms": [asdict(summary) for summary in waveforms], + "waveform_errors": waveform_errors, + "esp_serial_lines": esp_lines, + } + json_path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + + return report_path, json_path + + +def _run_capture_case(scope, args, out_dir, esp_lines): + out_dir.mkdir(parents=True, exist_ok=True) + trace_ctx = _make_trace_context(out_dir, args) + if trace_ctx.scpi_trace_path is not None: + scope.enable_trace(trace_ctx.scpi_trace_path) + extra_state = _configure_scope(scope, args, trace_ctx) + time.sleep(max(args.capture_delay_s, 0.0)) + after_arm_state = {} + if not args.immediate_deep_probe: + after_arm_state = _collect_scope_state(scope, style=args.state_probe_style) + after_arm_state.update(extra_state) + if args.finalize_method != "none": + _apply_state_step( + scope, + f"freeze_acquisition(method={args.finalize_method})", + trace_ctx, + lambda: scope.freeze_acquisition( + method=args.finalize_method, + settle_time=args.finalize_delay_s, + ), + settle_s=0.0, + ) + after_finalize_state = {} + if not args.immediate_deep_probe: + after_finalize_state = _collect_scope_state(scope, style=args.state_probe_style) + scope_state = dict(after_finalize_state) + else: + scope_state = dict(extra_state) + + screenshot_path = None + html_view_path = None + html_data_path = None + screenshot_error = None + waveform_truth = None + waveform_truth_path = None + waveform_comparison = None + waveforms = [] + waveform_errors = [] + + if args.validate_waveforms: + try: + waveform_truth, waveform_truth_path, waveform_comparison = ( + _capture_waveform_truth( + scope, + out_dir, + args, + scope_state, + args.capture_channels, + deep_first=args.immediate_deep_probe, + ) + ) + except Exception as exc: + waveform_errors.append(f"waveform truth capture: {exc}") + + if args.immediate_deep_probe: + if waveform_truth is not None: + after_arm_state = _collect_scope_state(scope, style="minimal") + after_arm_state.update(extra_state) + after_finalize_state = dict(after_arm_state) + scope_state = dict(after_arm_state) + else: + try: + screenshot_path = _capture_screenshot(scope, out_dir, args) + except Exception as exc: + screenshot_error = str(exc) + + for channel in args.capture_channels: + try: + waveforms.append(_capture_waveform(scope, channel, out_dir)) + except Exception as exc: + waveform_errors.append(f"CH{channel}: {exc}") + + scope_view_sample_rate_text = None + try: + scope_view_sample_rate_text = _metadata_sample_rate_text( + scope.read_waveform_metadata() + ) + except Exception as exc: + waveform_errors.append(f"scope_view sample-rate metadata: {exc}") + + try: + html_view_path, html_data_path = _render_scope_html( + out_dir, + args, + scope_state, + waveforms, + sample_rate_text=scope_view_sample_rate_text, + ) + except Exception as exc: + waveform_errors.append(f"scope_view.html: {exc}") + + report_path, json_path = _write_report( + out_dir, + args, + esp_lines, + scope_state, + after_arm_state, + after_finalize_state, + screenshot_path, + html_view_path, + html_data_path, + trace_ctx, + screenshot_error, + waveforms, + waveform_errors, + waveform_truth_path=waveform_truth_path, + waveform_truth=waveform_truth, + waveform_comparison=waveform_comparison, + ) + primary_error = None + if screenshot_error: + primary_error = screenshot_error + elif waveform_errors: + primary_error = waveform_errors[0] + return ( + { + "label": args.label, + "profile": args.profile, + "trigger_status": scope_state.get("trigger_status"), + "trigger_status_before_reads": after_arm_state.get("trigger_status"), + "trigger_status_after_finalize": after_finalize_state.get("trigger_status"), + "trigger_sweep": scope_state.get("trigger_sweep"), + "arm_method": args.arm_method, + "single_stop_first": args.single_stop_first, + "finalize_method": args.finalize_method, + "screenshot_path": ( + None if screenshot_path is None else str(screenshot_path) + ), + "screenshot_error": screenshot_error, + "failure_class": ( + None if primary_error is None else _classify_error(primary_error) + ), + "deep_metadata_ok": bool( + waveform_truth is not None + and waveform_truth.get("deep_memory_metadata") is not None + ), + "deep_ch1_ok": bool( + waveform_truth is not None + and "CH1" in waveform_truth.get("deep_memory_channels", {}) + ), + "deep_bundle_ok": bool( + waveform_truth is not None + and waveform_truth.get("deep_memory_bundle") is not None + ), + "session_error": None, + "waveform_errors": list(waveform_errors), + "report_path": str(report_path), + "json_path": str(json_path), + }, + report_path, + json_path, + waveforms, + ) + + +def _build_verification_cases(args): + cases = [] + if args.verify_pulse: + trigger_time_us = int(args.pulse_trigger_time_us) + width_map = { + ">": max(trigger_time_us * 4, trigger_time_us + 100), + "<": max(10, trigger_time_us // 2), + "=": trigger_time_us, + } + for sign, suffix in ((">", "gt"), ("<", "lt"), ("=", "eq")): + case_width_us = width_map[sign] + cases.append( + _namespace_with_overrides( + args, + label=( + f"{args.label}_pulse_{suffix}_{trigger_time_us}us_" + f"src_{case_width_us}us" + ), + profile="pulse", + pulse_mode="single", + pulse_width_us=case_width_us, + pulse_sign=sign, + ) + ) + elif args.verify_slope: + for edge in ("POS", "NEG"): + for sign, suffix in ((">", "gt"), ("<", "lt"), ("=", "eq")): + cases.append( + _namespace_with_overrides( + args, + label=( + f"{args.label}_slope_{edge.lower()}_{suffix}_" + f"{int(args.slope_trigger_time_us)}us" + ), + profile="slope", + slope_edge=edge, + slope_sign=sign, + ) + ) + return cases + + +def _build_edge_arming_cases(args): + base = { + "profile": "edge", + "capture_channels": [1, 2], + "pulse_mode": "single", + "pulse_width_us": 100, + "pulse_gap_us": 2000, + "pulse_frame_us": 2000, + "slope_half_period_us": 2000, + "ch1_probe": 1, + "ch2_probe": 1, + "ch1_scale_v_div": 1.0, + "ch2_scale_v_div": 1.0, + "ch1_position_div": 1.0, + "ch2_position_div": -2.0, + "timebase_s_div": 200e-6, + "trigger_level_v": 1.65, + "trigger_holdoff_ns": 100, + "validate_waveforms": True, + "arm": "single", + } + cases = [ + ("edge_auto_legacy_stopfirst", "AUTO", "legacy_single", True, "none"), + ("edge_normal_legacy_stopfirst", "NORMal", "legacy_single", True, "none"), + ("edge_normal_legacy_nostopfirst", "NORMal", "legacy_single", False, "none"), + ("edge_single_legacy_stopfirst", "SINGle", "legacy_single", True, "none"), + ( + "edge_auto_running_stopfirst_runningstop", + "AUTO", + "running_run", + True, + "running_stop", + ), + ( + "edge_normal_running_stopfirst_runningstop", + "NORMal", + "running_run", + True, + "running_stop", + ), + ( + "edge_normal_running_nostopfirst_runningstop", + "NORMal", + "running_run", + False, + "running_stop", + ), + ( + "edge_single_running_stopfirst_runningstop", + "SINGle", + "running_run", + True, + "running_stop", + ), + ( + "edge_normal_legacy_nostopfirst_runningstop", + "NORMal", + "legacy_single", + False, + "running_stop", + ), + ( + "edge_normal_legacy_stopfirst_runningstop", + "NORMal", + "legacy_single", + True, + "running_stop", + ), + ] + built = [] + for label, sweep, arm_method, stop_first, finalize_method in cases: + built.append( + _namespace_with_overrides( + args, + label=label, + trigger_sweep=sweep, + arm_method=arm_method, + single_stop_first=stop_first, + finalize_method=finalize_method, + **base, + ) + ) + return built + + +def _run_case_with_fresh_scope(args, case_args, out_dir, esp_lines): + scope = None + transport_failure = False + clean_success = False + try: + scope = _open_scope(case_args) + summary, report_path, json_path, waveforms = _run_capture_case( + scope, case_args, out_dir, esp_lines + ) + transport_failure = bool( + summary.get("failure_class") + in {"transport_timeout", "transport_pipe_error", "malformed_length_prefix"} + or ( + summary.get("session_error") is not None + and _is_transport_failure(summary["session_error"]) + ) + ) + clean_success = not transport_failure + return summary, report_path, json_path, waveforms + except Exception as exc: + failure_class = _classify_error(exc) + transport_failure = _is_transport_failure(exc) + report_path = out_dir / "case_error.txt" + report_path.write_text(f"{type(exc).__name__}: {exc}\n", encoding="utf-8") + summary = { + "label": case_args.label, + "profile": case_args.profile, + "trigger_status": None, + "trigger_status_before_reads": None, + "trigger_status_after_finalize": None, + "trigger_sweep": case_args.trigger_sweep, + "arm_method": case_args.arm_method, + "single_stop_first": case_args.single_stop_first, + "finalize_method": case_args.finalize_method, + "screenshot_path": None, + "screenshot_error": None, + "failure_class": failure_class, + "deep_metadata_ok": False, + "deep_ch1_ok": False, + "deep_bundle_ok": False, + "waveform_errors": [], + "session_error": f"{type(exc).__name__}: {exc}", + "report_path": str(report_path), + "json_path": None, + } + return summary, report_path, None, [] + finally: + if scope is not None: + try: + scope.disable_trace() + except Exception: + pass + try: + if transport_failure: + if case_args.restore_after_failure: + try: + _restore_stable_edge(scope, case_args) + except Exception: + pass + scope.close( + reset_device=case_args.hard_reset_on_failure, + settle_time=case_args.reset_settle_s, + ) + else: + if clean_success and case_args.restore_after_success: + try: + _restore_stable_edge(scope, case_args) + except Exception: + pass + scope.close() + except Exception: + pass + + +def _write_verification_report(out_dir, args, summaries): + report_path = out_dir / "verification_report.md" + json_path = out_dir / "verification_report.json" + + rows = [] + for summary in summaries: + rows.append( + f"| `{summary['label']}` | `{summary['profile']}` | " + f"`{summary['trigger_status']}` | `{summary['trigger_sweep']}` | " + f"`{Path(summary['screenshot_path']).name if summary['screenshot_path'] else 'none'}` | " + f"`{Path(summary['report_path']).name}` | " + f"`{'; '.join(summary['waveform_errors']) if summary['waveform_errors'] else 'none'}` |" + ) + + lines = [ + f"# OWON Trigger Verification: {args.label}", + "", + f"- Timestamp: `{_timestamp().isoformat(timespec='seconds')}`", + f"- ESP32 port: `{args.esp_port}`", + f"- Scope VID:PID: `{args.scope_vid}:{args.scope_pid}`", + "", + "| Case | Profile | Trigger status | Sweep | Screenshot | Report | Waveform errors |", + "| --- | --- | --- | --- | --- | --- | --- |", + *rows, + "", + "## Case Reports", + "", + *[ + f"- [{Path(item['report_path']).name}]({Path(item['label']) / Path(item['report_path']).name})" + for item in summaries + ], + ] + report_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + json_path.write_text(json.dumps(summaries, indent=2), encoding="utf-8") + return report_path, json_path + + +def _write_edge_arming_report(out_dir, args, summaries): + report_path = out_dir / "edge_arming_report.md" + json_path = out_dir / "edge_arming_report.json" + + rows = [] + for summary in summaries: + rows.append( + f"| `{summary['label']}` | `{summary['trigger_sweep']}` | " + f"`{summary['arm_method']}` | `{summary['single_stop_first']}` | " + f"`{summary['finalize_method']}` | " + f"`{summary['trigger_status_before_reads']}` | " + f"`{summary['trigger_status_after_finalize']}` | " + f"`{summary['deep_metadata_ok']}` | `{summary['deep_ch1_ok']}` | " + f"`{summary['deep_bundle_ok']}` | " + f"`{summary['failure_class'] or 'none'}` | " + f"`{Path(summary['report_path']).name if summary.get('report_path') else 'none'}` |" + ) + + lines = [ + f"# OWON Edge Arming Matrix: {args.label}", + "", + f"- Timestamp: `{_timestamp().isoformat(timespec='seconds')}`", + f"- ESP32 port: `{args.esp_port}`", + f"- Scope VID:PID: `{args.scope_vid}:{args.scope_pid}`", + "", + "| Case | Sweep | Arm | Stop First | Finalize | Status Before Reads | Status After Finalize | Deep Meta | Deep CH1 | Deep Bundle | Failure Class | Report |", + "| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |", + *rows, + "", + "## Case Reports", + "", + *[ + f"- [{Path(item['report_path']).name}]({Path(item['label']) / Path(item['report_path']).name})" + for item in summaries + if item.get("report_path") + ], + ] + report_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + json_path.write_text(json.dumps(summaries, indent=2), encoding="utf-8") + return report_path, json_path diff --git a/doc/examples/_owon_capture_stable.py b/doc/examples/_owon_capture_stable.py new file mode 100644 index 00000000..5c09b47d --- /dev/null +++ b/doc/examples/_owon_capture_stable.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python +""" +Stable capture helpers for the OWON ESP32 trigger jig runner. + +This module contains the promoted, supported example flows: + +- --capture-edge-pretty +- --capture-edge-pretty-burst +""" + +from __future__ import annotations + +from dataclasses import asdict +import json +from pathlib import Path +from types import SimpleNamespace +import time + +import instruments.owon.sds1104 as owon_sds1104 + +from _owon_capture_common import ( + _build_scope_html_data_from_series, + _capture_depmem_all_summary_and_series, + _capture_screen_channel_raw, + _capture_screenshot, + _clean_reply, + _compare_waveform_series, + _configure_esp_from_args, + _make_trace_context, + _metadata_sample_rate_text, + _open_scope, + _render_scope_html_with_data, + _render_waveform_comparison_html, + _safe_json_write, + _timestamp, +) + + +def _is_pretty_edge_mode(args): + return bool(args.capture_edge_pretty or args.capture_edge_pretty_burst) + + +def _apply_pretty_edge_defaults(args): + if not _is_pretty_edge_mode(args): + return args + + args.profile = "edge" + args.arm = "single" + # Bench finding on the connected DOS1104: forcing SWEEp NORMal in this + # promoted path is unsafe. The stable sequence uses raw setup plus + # `:TRIGger:SINGle:SWEEp SINGle`, then waits for a genuine trigger event. + args.trigger_sweep = None + args.trigger_level_v = 1.65 + args.ch1_probe = 1 + args.ch2_probe = 1 + args.ch1_position_div = 1.0 + args.ch2_position_div = -2.0 + args.ch1_scale_v_div = 1.0 + args.ch2_scale_v_div = 1.0 + args.timebase_s_div = 0.0002 + args.pulse_mode = "burst" if args.capture_edge_pretty_burst else "single" + args.pulse_width_us = 100 + args.pulse_gap_us = 2000 + args.pulse_frame_us = 2000 + args.slope_half_period_us = 2000 + if args.label == "owon_esp32_trigger_jig": + args.label = ( + "pretty_edge_capture_burst" + if args.capture_edge_pretty_burst + else "pretty_edge_capture" + ) + return args + + +def _run_proven_edge_capture_case(args, out_dir): + out_dir.mkdir(parents=True, exist_ok=True) + trace_ctx = _make_trace_context(out_dir, args) + scope = _open_scope(args) + try: + if trace_ctx.scpi_trace_path is not None: + scope.enable_trace(trace_ctx.scpi_trace_path) + + initial_status = _clean_reply(scope.query(":TRIGger:STATUS?")) + resumed_before_setup = False + if initial_status.upper() == "STOP": + scope.run() + time.sleep(0.2) + resumed_before_setup = True + + sequence = [ + ":CH1:DISP ON", + ":CH2:DISP ON", + ":CH3:DISP OFF", + ":CH4:DISP OFF", + ":CH1:PROB 1X", + ":CH2:PROB 1X", + ":CH1:SCAL 1v", + ":CH2:SCAL 1v", + ":CH1:POS 1.0", + ":CH2:POS -2.0", + ":ACQUire:Mode SAMPle", + ":ACQUIRE:DEPMEM 10K", + ":ACQUIRE:DEPMEM?", + ":HORIzontal:Scale 500us", + ":HORIzontal:Scale?", + ":TRIGger:TYPE SINGle", + ":TRIGger:SINGle:MODE EDGE", + ":TRIGger:SINGle:EDGE:SOURce CH1", + ":TRIGger:SINGle:EDGE:COUPling DC", + ":TRIGger:SINGle:EDGE:SLOPe RISE", + ":TRIGger:SINGle:EDGE:LEVel 1.64V", + ":TRIGger:SINGle:EDGE:LEVel?", + ":TRIGger:SINGle:HOLDoff 100ns", + ":TRIGger:SINGle:SWEEp SINGle", + ] + for command in sequence: + if command.endswith("?"): + scope.query(command) + else: + scope.sendcmd(command) + + esp_lines = _configure_esp_from_args(args) + time.sleep(0.5) + time.sleep(0.5) + + status_text = _clean_reply(scope.query(":TRIGger:STATUS?")) + screen_metadata = ( + owon_sds1104._parse_json_payload( # pylint: disable=protected-access + scope._binary_query( + ":DATA:WAVE:SCREen:HEAD?" + ), # pylint: disable=protected-access + "waveform metadata", + ) + ) + screen_metadata_path = out_dir / "screen_waveform_metadata.json" + _safe_json_write(screen_metadata_path, screen_metadata) + + waveforms = [] + screen_series = {} + for channel in (1, 2): + summary, series = _capture_screen_channel_raw( + scope, screen_metadata, channel, out_dir + ) + waveforms.append(summary) + screen_series[channel] = series + + screenshot_path = _capture_screenshot( + scope, out_dir, SimpleNamespace(profile="edge", arm="single") + ) + ( + depmem_summary, + depmem_summary_path, + depmem_raw_path, + depmem_series, + ) = _capture_depmem_all_summary_and_series(scope, out_dir) + + scope_state = { + "trigger_status": f"TriggerStatus.{status_text.lower()}", + "initial_trigger_status": initial_status, + "resumed_before_setup": resumed_before_setup, + "timebase_scale": "0.0005 second", + "trigger_snapshot": ( + "SDS1104TriggerConfiguration(" + f"status=<{status_text}>, " + "trigger_type=, " + "single_trigger_mode=, " + "holdoff=, " + "edge_source=, " + "edge_coupling=, " + "edge_slope=, " + "edge_level=, " + "video_source=None, video_standard=None, video_sync=None, video_line_number=None)" + ), + "trigger_sweep": "TriggerSweep.single", + "raw_edge_level": _clean_reply(scope.query(":TRIGger:SINGle:EDGE:LEVel?")), + } + + sample_rate_text = _metadata_sample_rate_text(screen_metadata) + scope_html_data = _build_scope_html_data_from_series( + args, + scope_state, + screen_series, + memory_depth_text=str(len(next(iter(screen_series.values()))["y"])), + sample_rate_text=sample_rate_text, + ) + html_view_path, html_data_path = _render_scope_html_with_data( + out_dir, scope_html_data, "scope_view" + ) + + depmem_view_path = None + depmem_view_json_path = None + if depmem_series: + depmem_scope_data = _build_scope_html_data_from_series( + args, + scope_state, + depmem_series, + memory_depth_text=str(len(next(iter(depmem_series.values()))["y"])), + sample_rate_text=depmem_summary["metadata"]["sample_rate"], + ) + depmem_view_path, depmem_view_json_path = _render_scope_html_with_data( + out_dir, depmem_scope_data, "depmem_all_view" + ) + + report_path = out_dir / "report.md" + json_path = out_dir / "report.json" + lines = [ + f"# OWON ESP32 Trigger Jig Run: {args.label}", + "", + f"- Timestamp: `{_timestamp().isoformat(timespec='seconds')}`", + "- Profile: `edge`", + f"- ESP32 port: `{args.esp_port}`", + f"- Scope VID:PID: `{args.scope_vid}:{args.scope_pid}`", + "- Arm command: `sequenced_raw_edge`", + f"- Screenshot: `{Path(screenshot_path).name}`", + "", + "## Primary Scope Screen", + "", + f"![Primary scope screen]({Path(screenshot_path).name})", + "", + "## Derived Scope View", + "", + f"- [Open `scope_view.html`]({Path(html_view_path).name})", + f"- [Open `scope_view.json`]({Path(html_data_path).name})", + "", + "## Scope State", + "", + f"- Initial trigger status: `{scope_state['initial_trigger_status']}`", + f"- Resumed before setup: `{scope_state['resumed_before_setup']}`", + f"- Trigger status: `{scope_state['trigger_status']}`", + "- Timebase scale: `0.0005 second`", + f"- Trigger sweep: `{scope_state['trigger_sweep']}`", + f"- Raw edge level: `{scope_state['raw_edge_level']}`", + "", + "## ESP32 Serial Output", + "", + "```text", + *esp_lines, + "```", + "", + "## Waveforms", + "", + "| Channel | Samples | t_start (s) | t_end (s) | v_min (V) | v_max (V) | v_pp (V) | CSV | Plot |", + "| --- | ---: | ---: | ---: | ---: | ---: | ---: | --- | --- |", + ] + for summary in waveforms: + lines.append( + f"| CH{summary.channel} | {summary.sample_count} | {summary.time_start_s:.6g} | " + f"{summary.time_end_s:.6g} | {summary.voltage_min_v:.6g} | " + f"{summary.voltage_max_v:.6g} | {summary.voltage_pp_v:.6g} | " + f"`{Path(summary.csv_path).name}` | " + f"`screen_ch{summary.channel}_plot.html` |" + ) + lines.extend( + [ + "", + "## Screen Metadata", + "", + f"- [Open `screen_waveform_metadata.json`]({screen_metadata_path.name})", + "", + "## DEPMem:All Summary", + "", + f"- [Open `depmem_all_summary.json`]({depmem_summary_path.name})", + f"- [Open `depmem_all_raw.bin`]({depmem_raw_path.name})", + ] + ) + if depmem_view_path is not None: + lines.extend( + [ + f"- [Open `depmem_all_view.html`]({Path(depmem_view_path).name})", + f"- [Open `depmem_all_view.json`]({Path(depmem_view_json_path).name})", + ] + ) + + waveform_comparison = { + "bmp_name": Path(screenshot_path).name, + "views": [ + { + "stem": "scope_view", + "title": "Screen `:DATA:WAVE:SCREen:CH?`", + "html_name": Path(html_view_path).name, + "json_name": Path(html_data_path).name, + } + ], + "families": {}, + } + if depmem_view_path is not None: + waveform_comparison["views"].append( + { + "stem": "depmem_all_view", + "title": "Deep `:DATA:WAVE:DEPMem:All?` converted from raw bundle", + "html_name": Path(depmem_view_path).name, + "json_name": Path(depmem_view_json_path).name, + } + ) + + channels = {} + for channel in sorted(set(screen_series) & set(depmem_series)): + channels[f"CH{channel}"] = _compare_waveform_series( + "screen `:DATA:WAVE:SCREen:CH?`", + screen_series[channel], + "deep `:DATA:WAVE:DEPMem:All?`", + depmem_series[channel], + ) + if channels: + waveform_comparison["families"]["screen_vs_depmem_all"] = channels + + waveform_comparison_path = out_dir / "waveform_comparison.json" + _safe_json_write(waveform_comparison_path, waveform_comparison) + waveform_comparison_html_path = _render_waveform_comparison_html( + out_dir, waveform_comparison + ) + + lines.extend( + [ + "", + "## Waveform Comparison", + "", + f"- [Open `waveform_comparison.html`]({waveform_comparison_html_path.name})", + f"- [Open `waveform_comparison.json`]({waveform_comparison_path.name})", + "", + "| Family | Channel | A | B | Counts | Alignment | Mean dV | Max dV | Diff samples | Max dt |", + "| --- | --- | --- | --- | --- | --- | ---: | ---: | ---: | ---: |", + ] + ) + for family, family_channels in waveform_comparison["families"].items(): + for channel_name, metrics in family_channels.items(): + lines.append( + f"| `{family}` | `{channel_name}` | `{metrics['series_a']}` | " + f"`{metrics['series_b']}` | " + f"`{metrics['count_a']} vs {metrics['count_b']} (shared {metrics['aligned_count']})` | " + f"`{metrics['alignment']}` | " + f"{metrics['mean_abs_voltage_delta_v']:.6g} | " + f"{metrics['max_abs_voltage_delta_v']:.6g} | " + f"{metrics['diff_sample_count']} | " + f"{metrics['max_abs_time_delta_s']:.6g} |" + ) + + if trace_ctx.scpi_trace_path is not None: + lines.extend( + [ + "", + "## Trace", + "", + f"- [Open `scpi_trace.jsonl`]({Path(trace_ctx.scpi_trace_path).name})", + ] + ) + report_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + payload = { + "label": args.label, + "profile": "edge", + "esp_port": args.esp_port, + "scope_vid": args.scope_vid, + "scope_pid": args.scope_pid, + "arm": "sequenced_raw_edge", + "trigger_status": scope_state["trigger_status"], + "initial_trigger_status": scope_state["initial_trigger_status"], + "resumed_before_setup": scope_state["resumed_before_setup"], + "timebase_scale": scope_state["timebase_scale"], + "trigger_snapshot": scope_state["trigger_snapshot"], + "trigger_sweep": scope_state["trigger_sweep"], + "raw_edge_level": scope_state["raw_edge_level"], + "screenshot_path": str(screenshot_path), + "html_view_path": str(html_view_path), + "html_data_path": str(html_data_path), + "screen_waveform_metadata_path": str(screen_metadata_path), + "depmem_all_summary_path": str(depmem_summary_path), + "depmem_all_raw_path": str(depmem_raw_path), + "depmem_all_view_path": ( + None if depmem_view_path is None else str(depmem_view_path) + ), + "waveform_comparison_path": str(waveform_comparison_html_path), + "waveform_comparison_json_path": str(waveform_comparison_path), + "scpi_trace_path": ( + None if trace_ctx.scpi_trace_path is None else trace_ctx.scpi_trace_path + ), + "waveforms": [asdict(summary) for summary in waveforms], + "esp_serial_lines": esp_lines, + } + json_path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + return payload, report_path, json_path, waveforms + finally: + try: + scope.disable_trace() + except Exception: + pass + try: + scope.close() + except Exception: + pass diff --git a/doc/examples/ex_owon_sds1104.py b/doc/examples/ex_owon_sds1104.py new file mode 100644 index 00000000..aefe968f --- /dev/null +++ b/doc/examples/ex_owon_sds1104.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +""" +Minimal example for connecting to an OWON SDS1104-family oscilloscope. +""" + +import instruments as ik +from instruments.units import ureg as u + + +def main(): + """ + Open the scope over raw USB, print a few stable values, and read the + current CH1 screen waveform. The trigger example below only uses the + conservative trigger paths promoted in the driver. + """ + scope = ik.owon.OWONSDS1104.open_usb( + enable_scpi=False, + ignore_scpi_failure=True, + settle_time=0.1, + ) + try: + print(f"Identity: {scope.name}") + print(f"Timebase scale: {scope.timebase_scale}") + print(f"CH1 displayed: {scope.channel[0].display}") + print(f"CH1 coupling: {scope.channel[0].coupling}") + print(f"Trigger type: {scope.trigger_type}") + print(f"Single trigger mode: {scope.single_trigger_mode}") + + scope.stop() + scope.single_trigger_mode = scope.SingleTriggerMode.edge + scope.trigger_source = scope.TriggerSource.ch1 + scope.trigger_coupling = scope.TriggerCoupling.dc + scope.trigger_slope = scope.TriggerSlope.rise + scope.trigger_level = 25 * u.millivolt + scope.trigger_holdoff = 100 * u.nanosecond + print(f"Trigger snapshot: {scope.read_trigger_configuration()}") + + time_s, voltage_v = scope.channel[0].read_waveform() + print(f"CH1 waveform samples: {len(voltage_v)}") + print(f"First sample: t={time_s[0]!r}, v={voltage_v[0]!r}") + finally: + scope.close() + + +if __name__ == "__main__": + main() diff --git a/doc/examples/kiprim/ex_kiprim_dc310s.py b/doc/examples/kiprim/ex_kiprim_dc310s.py new file mode 100644 index 00000000..30129444 --- /dev/null +++ b/doc/examples/kiprim/ex_kiprim_dc310s.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python + +import instruments as ik + +psu = ik.kiprim.DC310S.open_serial("COM8", baud=115200, timeout=0.5) + +print(psu.name) +print(f"Setpoint: {psu.voltage}, {psu.current}, output={psu.output}") +print(f"Measured: {psu.voltage_sense}, {psu.current_sense}, {psu.power_sense}") + +# Uncomment to configure the supply explicitly. +# psu.voltage = 5 * ik.units.volt +# psu.current = 0.25 * ik.units.ampere +# psu.output = True diff --git a/doc/examples/owon_esp32_trigger_jig.md b/doc/examples/owon_esp32_trigger_jig.md new file mode 100644 index 00000000..785da772 --- /dev/null +++ b/doc/examples/owon_esp32_trigger_jig.md @@ -0,0 +1,301 @@ +# ESP32 Trigger Test Jig For OWON SDS1104 / DOS1104 + +This note describes a very small bench jig for validating the SDS1104 / DOS1104 +trigger families that now look real on hardware but are not fully exercised in +the front-panel UI: + +- `PULSe` +- `SLOPe` +- `EDGE:COUPling HF` +- `EDGE:SOURce ACLine` + +The goal is not RF-grade timing accuracy. The goal is to generate clean, +repeatable, low-voltage bench signals that are good enough to validate trigger +behavior at easy time scales such as `10 us`, `50 us`, `100 us`, and `1 ms`. + +## Hardware + +Use any ordinary 3.3 V ESP32 dev board with exposed GPIOs. + +Preferred firmware path: + +- PlatformIO project: [owon_esp32_trigger_jig_pio/platformio.ini](./owon_esp32_trigger_jig_pio/platformio.ini) +- PlatformIO source: [owon_esp32_trigger_jig_pio/src/main.cpp](./owon_esp32_trigger_jig_pio/src/main.cpp) +- Command-line scope runner: [owon_esp32_trigger_jig_runner.py](./owon_esp32_trigger_jig_runner.py) + +Recommended parts: + +- 1 x ESP32 dev board +- 2 x `330 ohm` series resistors +- 1 x `1 kohm` series resistor +- 1 x `100 kohm` resistor, optional +- 4 x capacitors for the slope node + - `4.7 nF` + - `22 nF` + - `47 nF` + - `470 nF` +- jumper wires +- optional small breadboard + +Use `10x` probes if possible. They load the signal less and make the RC slope +node behave more predictably. + +## Exact Wiring + +Common ground: + +- ESP32 `GND` -> scope ground clips for all channels used + +CH1 direct pulse output: + +- ESP32 `GPIO18` -> `330 ohm` -> scope `CH1` probe tip + +CH2 slope output node: + +- ESP32 `GPIO19` -> `1 kohm` -> node `SLOPE_OUT` +- `SLOPE_OUT` -> scope `CH2` probe tip +- `SLOPE_OUT` -> selected capacitor -> `GND` +- optional: `SLOPE_OUT` -> `100 kohm` -> `GND` + +CH3 frame marker, optional: + +- ESP32 `GPIO21` -> `330 ohm` -> scope `CH3` probe tip + +Do not connect the scope probe tips directly to `3V3`. + +### RC Values For Approximate Slope Times + +The CH2 node is a first-order RC edge shaper. The approximate `10%` to `90%` +rise or fall time is about `2.2 * R * C`. + +With `R = 1 kohm`: + +- `4.7 nF` -> about `10 us` +- `22 nF` -> about `48 us` +- `47 nF` -> about `103 us` +- `470 nF` -> about `1.03 ms` + +This is good enough for conservative trigger validation. + +## What Each Channel Is For + +`CH1` + +- direct digital pulse train +- use this for `PULSe` trigger tests + +`CH2` + +- RC-shaped version of a square wave +- use this for `SLOPe` trigger tests + +`CH3` + +- optional burst marker +- useful when checking where one pulse frame starts + +## Scope Setup Suggestions + +For pulse tests on `CH1`: + +- source: `CH1` +- trigger mode: `PULSe` +- coupling: start with `DC` +- slope or direction: match the pulse edge you care about + +For slope tests on `CH2`: + +- source: `CH2` +- trigger mode: `SLOPe` +- set upper and lower levels around the middle of the RC swing, for example + `0.8 V` and `2.4 V` +- set `TIME` near the expected slope time from the RC table + +For `HF` coupling tests: + +- start from ordinary `EDGE` mode on `CH1` +- compare trigger behavior with `DC`, `AC`, and `HF` +- `HF` is easiest to see later with a signal that has slow baseline movement + plus a fast edge component + +For `ACLine`: + +- the ESP32 jig does not generate line-synchronous AC +- use a separate isolated low-voltage AC source later, for example a small + transformer secondary +- never connect mains directly to the scope or to the ESP32 + +## Minimal Test Procedure + +### Pulse Trigger + +The firmware emits a repeating burst on `CH1` with pulse widths: + +- `10 us` +- `50 us` +- `100 us` +- `1000 us` + +Suggested checks: + +- set `PULSe:TIME 50us` +- try `PULSe:SIGN >`, `<`, and `=` +- verify the scope prefers the matching pulse width or class + +### Slope Trigger + +Pick one capacitor on `CH2`, for example: + +- `22 nF` for about `50 us` + +Suggested checks: + +- set `SLOPe:TIME 50us` +- try `SLOPe:SIGN >`, `<`, and `=` +- try `SLOPe:SLOPe POS` and `NEG` +- verify that triggering changes when you swap the capacitor to another + timescale + +### HF Coupling + +Start with the CH1 pulse source. + +Suggested checks: + +- compare `EDGE:COUPling DC`, `AC`, and `HF` +- later, if needed, add a slow baseline modulation source and confirm `HF` + ignores it better than `DC` + +## Limitations + +- `delayMicroseconds()` on ESP32 is good enough for bench trigger validation, + not for calibrated timing metrology +- the RC slope node is only approximately linear +- for sub-microsecond work, use RP2040 PIO or a function generator instead + +## Firmware + +The maintained firmware path is the PlatformIO project: + +- [owon_esp32_trigger_jig_pio/platformio.ini](./owon_esp32_trigger_jig_pio/platformio.ini) +- [owon_esp32_trigger_jig_pio/src/main.cpp](./owon_esp32_trigger_jig_pio/src/main.cpp) + +## PlatformIO Build And Upload + +From the repo root: + +```powershell +cd doc/examples/owon_esp32_trigger_jig_pio +pio run +pio run -t upload +pio device monitor +``` + +## Command-Line Runner + +The Python runner configures the ESP32 over serial, configures the scope, +captures a BMP screenshot, saves waveform CSV files, and writes a Markdown and +JSON report into a timestamped artifact directory. + +Basic pulse example: + +```powershell +python doc/examples/owon_esp32_trigger_jig_runner.py ` + --esp-port COM7 ` + --profile pulse ` + --pulse-mode burst ` + --pulse-trigger-time-us 50 ` + --capture-channels 1 ` + --timebase-s-div 0.00005 +``` + +Basic slope example: + +```powershell +python doc/examples/owon_esp32_trigger_jig_runner.py ` + --esp-port COM7 ` + --profile slope ` + --slope-trigger-time-us 50 ` + --slope-upper-v 2.4 ` + --slope-lower-v 0.8 ` + --capture-channels 2 ` + --timebase-s-div 0.001 +``` + +Pretty stable edge capture: + +```powershell +python doc/examples/owon_esp32_trigger_jig_runner.py ` + --esp-port COM7 ` + --capture-edge-pretty +``` + +Waveform truth validation capture: + +```powershell +python doc/examples/owon_esp32_trigger_jig_runner.py ` + --esp-port COM7 ` + --capture-edge-pretty ` + --validate-waveforms +``` + +The runner saves artifacts under: + +```text +doc/examples/artifacts/YYYYMMDD_HHMMSS_