From 37f76c980785188949fbcbd7db608a6ba7affb27 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 25 Apr 2026 12:14:12 +0000 Subject: [PATCH 1/3] Initial plan From bcdb3ee4aee5ee8d1daab1feca8cda8450fa708e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 25 Apr 2026 12:21:46 +0000 Subject: [PATCH 2/3] Add Python port of MassData API with thread-safe implementation and unit tests Agent-Logs-Url: https://github.com/NEVSTOP-LAB/CSM-MassData-Parameter-Support/sessions/23799959-ca0d-43ce-a390-5e22aa6c0f3f Co-authored-by: nevstop <8196752+nevstop@users.noreply.github.com> --- .gitignore | 8 +- python/README.md | 100 +++++++ python/_test/test_csm_massdata.py | 237 +++++++++++++++++ python/csm_massdata.py | 429 ++++++++++++++++++++++++++++++ 4 files changed, 773 insertions(+), 1 deletion(-) create mode 100644 python/README.md create mode 100644 python/_test/test_csm_massdata.py create mode 100644 python/csm_massdata.py diff --git a/.gitignore b/.gitignore index a963972..cd46bc3 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,10 @@ c/**/*.exe c/**/*.pdb c/**/*.ilk c/**/*.idb -c/**/*.tlog \ No newline at end of file +c/**/*.tlog + +# Python build artefacts (under python/) +python/**/__pycache__/ +python/**/*.pyc +python/**/*.pyo +python/**/.pytest_cache/ \ No newline at end of file diff --git a/python/README.md b/python/README.md new file mode 100644 index 0000000..fa713f7 --- /dev/null +++ b/python/README.md @@ -0,0 +1,100 @@ +# CSM MassData Parameter Support —— Python 语言移植 + +本目录提供 CSM MassData Parameter Support 插件的 Python 实现, +接口与 [`addons/MassData-Parameter`](../addons/MassData-Parameter) +中的 LabVIEW VI 一一对应,同时与 [`c/`](../c) 目录下的 C 实现共享相同的 +参考字符串格式与语义,可与 LabVIEW / C 端无缝互通。 + +## 目录结构 + +``` +python/ +├── csm_massdata.py # 公开 Python 接口(中文 docstring 注释) +├── _test/ +│ └── test_csm_massdata.py # 基于 unittest 的独立测试程序 +└── README.md # 本文件 +``` + +## 接口对应关系 + +每个 Python 函数都是对应 LabVIEW VI 的逐字翻译:名称相同、参数顺序相同、 +语义相同;并与 [`c/include/csm_massdata.h`](../c/include/csm_massdata.h) +中的 C 函数一一对应。 + +| LabVIEW VI | Python 函数 | +| ------------------------------------------------------ | ------------------------------------------------------ | +| `CSM - Config MassData Parameter Cache Size.vi` | `CSM_ConfigMassDataParameterCacheSize` | +| `CSM - Convert MassData to Argument.vim` | `CSM_ConvertMassDataToArgument` | +| `CSM - Convert MassData to Argument With DataType.vim` | `CSM_ConvertMassDataToArgumentWithDataType` | +| `CSM - Convert Argument to MassData.vim` | `CSM_ConvertArgumentToMassData` | +| `CSM - MassData Data Type String.vi` | `CSM_MassDataDataTypeString` | +| `CSM - MassData Parameter Status.vi` | `CSM_MassDataParameterStatus` | + +参考字符串格式与 LabVIEW / C 端完全相同: + +``` +Start:;Size:[;DataType:] +``` + +状态码(`CsmMassDataStatus`)的整数值与 C 端 +`csm_massdata_status_t` 一一对应,便于跨语言对照。 + +## 运行环境 + +- Python 3.8 及以上版本 +- 仅依赖 Python 标准库(`enum`、`threading`、`typing`),无第三方依赖 + +## 运行测试 + +在仓库根目录或 `python/` 目录下,任选其一: + +```bash +# 方式 1:直接以脚本运行 +python python/_test/test_csm_massdata.py + +# 方式 2:使用 unittest 模块发现并运行 +cd python +python -m unittest _test.test_csm_massdata -v +``` + +测试覆盖: + +- 缓冲区配置与状态查询 +- 不带 / 带数据类型的编码 / 解码往返 +- 数据类型字段的解析(含缺省与非法字符) +- 解析错误(前缀错误、数字非法、超出 64 位无符号整数范围、`DataType:` + 后缀含非法字符等) +- 环形缓冲区覆盖检测 +- 写入数据超过缓冲容量 +- 多线程并发编码 / 解码下的线程安全 + +## 在自己的工程中使用 + +将 `python/csm_massdata.py` 复制到工程目录,或将 `python/` 加入 +`PYTHONPATH`,即可: + +```python +from csm_massdata import ( + CSM_ConfigMassDataParameterCacheSize, + CSM_ConvertMassDataToArgumentWithDataType, + CSM_ConvertArgumentToMassData, + CsmMassDataStatus, +) + +# 应用启动阶段在单线程上下文中初始化(可选;不调用则使用 50 MiB 默认值) +CSM_ConfigMassDataParameterCacheSize(64 * 1024 * 1024) + +samples = b"\x01\x02\x03\x04" * 256 +status, arg = CSM_ConvertMassDataToArgumentWithDataType(samples, "1D U8") +assert status == CsmMassDataStatus.OK +# ... 通过 CSM 总线传递 arg ... +status, restored = CSM_ConvertArgumentToMassData(arg) +assert status == CsmMassDataStatus.OK +assert restored == samples +``` + +## 线程安全 + +内部环形缓冲区与游标在模块导入(`import`)时即一次性完成初始化, +避免 C 端历史上出现过的延迟初始化竞态。所有公开函数对内部状态的 +访问均通过同一把 `threading.Lock` 串行化,可在多线程环境下安全调用。 diff --git a/python/_test/test_csm_massdata.py b/python/_test/test_csm_massdata.py new file mode 100644 index 0000000..2417225 --- /dev/null +++ b/python/_test/test_csm_massdata.py @@ -0,0 +1,237 @@ +"""CSM MassData Python API 的独立测试套件。 + +覆盖 ``csm_massdata`` 模块公开的所有 API: + + - 缓冲区配置与状态查询 + - 不带数据类型的编码 / 解码往返 + - 带数据类型的编码 / 解码往返 + - 数据类型解析(CSM - MassData Data Type String) + - 解析错误的处理 + - 环形缓冲区覆盖检测 + - 输出缓冲区与缓存容量边界 + - 并发访问下的线程安全 + +任意断言失败时进程以非零状态退出,方便在交互模式与 CI 流水线中 +同时使用。 +""" + +from __future__ import annotations + +import os +import struct +import sys +import threading +import unittest + +# 让脚本以 ``python _test/test_csm_massdata.py`` 直接运行也能找到模块。 +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from csm_massdata import ( # noqa: E402 (path manipulation must come first) + CSM_MASSDATA_MAX_ARGUMENT_LEN, + CSM_MASSDATA_MAX_DATATYPE_LEN, + CSM_ConfigMassDataParameterCacheSize, + CSM_ConvertArgumentToMassData, + CSM_ConvertMassDataToArgument, + CSM_ConvertMassDataToArgumentWithDataType, + CSM_MassDataDataTypeString, + CSM_MassDataParameterStatus, + CsmMassDataStatus, +) + + +class CsmMassDataTests(unittest.TestCase): + """与 ``c/_test/vs/test_main.c`` 中的测试一一对应。""" + + def setUp(self) -> None: + # 每个用例都从一个干净的较小缓冲区开始,避免相互影响。 + self.assertEqual( + CSM_ConfigMassDataParameterCacheSize(4096), CsmMassDataStatus.OK + ) + + # 测试:缓冲区配置与状态查询 + def test_config_and_status(self) -> None: + self.assertEqual( + CSM_ConfigMassDataParameterCacheSize(0), + CsmMassDataStatus.ERR_INVALID_ARG, + ) + self.assertEqual( + CSM_ConfigMassDataParameterCacheSize(-1), + CsmMassDataStatus.ERR_INVALID_ARG, + ) + self.assertEqual( + CSM_ConfigMassDataParameterCacheSize(1024), CsmMassDataStatus.OK + ) + status, r, w, cache = CSM_MassDataParameterStatus() + self.assertEqual(status, CsmMassDataStatus.OK) + self.assertEqual(cache, 1024) + self.assertEqual(r.size, 0) + self.assertEqual(w.size, 0) + + # 测试:不带数据类型的编码 / 解码往返 + def test_roundtrip_plain(self) -> None: + source = struct.pack("<8i", 10, 20, 30, 40, 50, 60, 70, 80) + status, arg = CSM_ConvertMassDataToArgument(source) + self.assertEqual(status, CsmMassDataStatus.OK) + self.assertTrue(arg.startswith("Start:"), + msg=f"unexpected argument: {arg!r}") + + status, restored = CSM_ConvertArgumentToMassData(arg) + self.assertEqual(status, CsmMassDataStatus.OK) + self.assertEqual(len(restored), len(source)) + self.assertEqual(restored, source) + + # 测试:带数据类型的编码 / 解码往返 + def test_roundtrip_with_type(self) -> None: + source = struct.pack("<4d", 1.5, -2.5, 3.5, -4.5) + status, arg = CSM_ConvertMassDataToArgumentWithDataType(source, "1D DBL") + self.assertEqual(status, CsmMassDataStatus.OK) + self.assertIn(";DataType:1D DBL", arg) + + status, dup, dtype = CSM_MassDataDataTypeString(arg) + self.assertEqual(status, CsmMassDataStatus.OK) + self.assertEqual(dtype, "1D DBL") + self.assertEqual(dup, arg) + + status, restored = CSM_ConvertArgumentToMassData(arg) + self.assertEqual(status, CsmMassDataStatus.OK) + self.assertEqual(len(restored), len(source)) + self.assertEqual(restored, source) + + # 测试:参数中没有数据类型字段时,解析结果应为空字符串 + def test_datatype_absent(self) -> None: + payload = bytes([0xAA, 0xBB, 0xCC]) + status, arg = CSM_ConvertMassDataToArgument(payload) + self.assertEqual(status, CsmMassDataStatus.OK) + status, dup, dtype = CSM_MassDataDataTypeString(arg) + self.assertEqual(status, CsmMassDataStatus.OK) + self.assertEqual(dtype, "") + self.assertEqual(dup, arg) + + # 测试:解析非法字符串时返回 PARSE 错误 + def test_parse_errors(self) -> None: + for bad in ( + "garbage", + "Start:abc;Size:1", + "Start:0;Size:1;wrong", + "Start:0;Size:1;DataType:bad;value", + "Start:0;Size:1;DataType:badStart:0;Size:1;DataType:bad>value", + # 超出 64 位无符号整数范围 + "Start:18446744073709551616;Size:0", + ): + status, data = CSM_ConvertArgumentToMassData(bad) + self.assertEqual(status, CsmMassDataStatus.ERR_PARSE, + msg=f"argument should fail to parse: {bad!r}") + self.assertEqual(data, b"") + + # 测试:旧数据被环形缓冲区覆盖后,应报告 OVERWRITTEN + def test_overwrite_detection(self) -> None: + # 缓冲区故意设得很小,后续写入会把第一份数据挤掉。 + self.assertEqual( + CSM_ConfigMassDataParameterCacheSize(32), CsmMassDataStatus.OK + ) + status, first_arg = CSM_ConvertMassDataToArgument(bytes(range(1, 9))) + self.assertEqual(status, CsmMassDataStatus.OK) + status, _filler_arg = CSM_ConvertMassDataToArgument(bytes(range(32))) + self.assertEqual(status, CsmMassDataStatus.OK) + status, restored = CSM_ConvertArgumentToMassData(first_arg) + self.assertEqual(status, CsmMassDataStatus.ERR_OVERWRITTEN) + self.assertEqual(restored, b"") + + # 测试:写入数据大于缓冲区容量时返回 CACHE_TOO_SMALL + def test_cache_too_small(self) -> None: + self.assertEqual( + CSM_ConfigMassDataParameterCacheSize(16), CsmMassDataStatus.OK + ) + status, arg = CSM_ConvertMassDataToArgument(bytes(64)) + self.assertEqual(status, CsmMassDataStatus.ERR_CACHE_TOO_SMALL) + self.assertEqual(arg, "") + + # 测试:状态查询应当反映最近一次的读 / 写操作 + def test_status_reflects_last_ops(self) -> None: + self.assertEqual( + CSM_ConfigMassDataParameterCacheSize(1024), CsmMassDataStatus.OK + ) + payload = bytes([9, 8, 7, 6, 5]) + status, arg = CSM_ConvertMassDataToArgument(payload) + self.assertEqual(status, CsmMassDataStatus.OK) + status, restored = CSM_ConvertArgumentToMassData(arg) + self.assertEqual(status, CsmMassDataStatus.OK) + self.assertEqual(restored, payload) + + status, r, w, cache = CSM_MassDataParameterStatus() + self.assertEqual(status, CsmMassDataStatus.OK) + self.assertEqual(w.size, len(payload)) + self.assertEqual(r.size, len(payload)) + self.assertEqual(cache, 1024) + + # 测试:编码时拒绝包含非法字符的数据类型 + def test_encode_rejects_bad_data_type(self) -> None: + for bad in (";", "<", ">", "1D I32;extra"): + status, arg = CSM_ConvertMassDataToArgumentWithDataType(b"x", bad) + self.assertEqual(status, CsmMassDataStatus.ERR_INVALID_ARG, + msg=f"data_type should be rejected: {bad!r}") + self.assertEqual(arg, "") + + # 数据类型超长时返回 BUFFER_TOO_SMALL(与 C 端常量一致) + too_long = "x" * CSM_MASSDATA_MAX_DATATYPE_LEN + status, arg = CSM_ConvertMassDataToArgumentWithDataType(b"x", too_long) + self.assertEqual(status, CsmMassDataStatus.ERR_BUFFER_TOO_SMALL) + + # 测试:编码后的引用字符串长度受 ``CSM_MASSDATA_MAX_ARGUMENT_LEN`` 约束 + def test_argument_length_bound(self) -> None: + status, arg = CSM_ConvertMassDataToArgument(b"hello world") + self.assertEqual(status, CsmMassDataStatus.OK) + # 含末尾 NUL 的等价长度上限校验 + self.assertLess(len(arg) + 1, CSM_MASSDATA_MAX_ARGUMENT_LEN) + + # 测试:CSM_MassDataDataTypeString 对非法输入返回错误,但 dup 仍为输入 + def test_data_type_string_invalid(self) -> None: + status, dup, dtype = CSM_MassDataDataTypeString("garbage") + self.assertEqual(status, CsmMassDataStatus.ERR_PARSE) + self.assertEqual(dup, "garbage") + self.assertEqual(dtype, "") + + status, dup, dtype = CSM_MassDataDataTypeString(None) # type: ignore[arg-type] + self.assertEqual(status, CsmMassDataStatus.ERR_INVALID_ARG) + self.assertEqual(dup, "") + self.assertEqual(dtype, "") + + # 测试:多线程并发编码 / 解码不会损坏内部状态 + def test_thread_safety(self) -> None: + self.assertEqual( + CSM_ConfigMassDataParameterCacheSize(1024 * 1024), + CsmMassDataStatus.OK, + ) + errors: list[str] = [] + barrier = threading.Barrier(8) + + def worker(tid: int) -> None: + barrier.wait() + for i in range(200): + payload = struct.pack("<2I", tid, i) * 8 # 64 字节 + status, arg = CSM_ConvertMassDataToArgument(payload) + if status != CsmMassDataStatus.OK: + errors.append(f"encode failed: {status!r}") + return + status, restored = CSM_ConvertArgumentToMassData(arg) + if status != CsmMassDataStatus.OK: + errors.append(f"decode failed: {status!r}") + return + if restored != payload: + errors.append( + f"data mismatch on tid={tid} i={i}: " + f"expected {payload!r}, got {restored!r}" + ) + return + + threads = [threading.Thread(target=worker, args=(t,)) for t in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + self.assertEqual(errors, [], msg="\n".join(errors)) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/python/csm_massdata.py b/python/csm_massdata.py new file mode 100644 index 0000000..50a85ab --- /dev/null +++ b/python/csm_massdata.py @@ -0,0 +1,429 @@ +"""CSM MassData Parameter Support 插件的 Python 移植版本。 + +本模块公开的接口与 LabVIEW 端 +``addons/MassData-Parameter/CSM MassData Parameter Support.lvlib`` 中的 VI +在功能与命名上完全一致;同时与 ``c/`` 目录下的 C 实现共享相同的 +参考字符串格式与语义,可与 LabVIEW / C 端无缝互通 MassData 参数。 + +函数名称、参数顺序与语义均与对应的 LabVIEW VI 严格保持一致。 + +MassData 参数格式 +----------------- +MassData 参数是一段仅包含 ASCII 字符、可读的引用字符串,指向进程内 +一个全局环形缓冲区中的实际数据。支持以下两种形式: + + - 不带数据类型: ``Start:;Size:`` + - 带数据类型: ``Start:;Size:;DataType:`` + +其中 ```` 为非负十进制整数(与 C 端一致,按 64 位无符号整数解析), +```` 为自由格式的数据类型标签(例如 ``1D I32``、``Waveform`` 等), +不允许包含 ``;`` / ``<`` / ``>`` 字符。 + +数据生命周期 +------------ +MassData 内部使用环形字节缓冲区。当缓冲区写满后,新写入的数据将 +从缓冲区起始位置覆盖最早的数据。被覆盖的数据无法恢复,后续对其 +引用进行解码时会返回 :data:`CsmMassDataStatus.ERR_OVERWRITTEN`。 +同一进程内的所有调用者共享同一份 MassData 缓冲区。 + +线程安全 +-------- +内部状态在模块加载(import)时即一次性完成初始化,避免延迟初始化 +带来的并发竞争。所有公开函数对内部状态的访问均通过同一把 +:class:`threading.Lock` 互斥串行化执行,可在多线程环境下安全调用。 +""" + +from __future__ import annotations + +import enum +import threading +from typing import Optional, Tuple, Union + +# --------------------------------------------------------------------------- # +# 常量与类型定义 # +# --------------------------------------------------------------------------- # + +#: MassData 缓冲区的默认大小(字节),与 LabVIEW VI 及 C 端保持一致:50 MiB。 +CSM_MASSDATA_DEFAULT_CACHE_SIZE: int = 50 * 1024 * 1024 + +#: 编码函数返回的 MassData 参数字符串的最大长度(包含末尾 NUL 计入字节数)。 +CSM_MASSDATA_MAX_ARGUMENT_LEN: int = 256 + +#: 数据类型标签字符串的最大长度(包含末尾 NUL 计入字节数)。 +CSM_MASSDATA_MAX_DATATYPE_LEN: int = 128 + +# 引用字符串的固定前缀 +_PREFIX = "" + +# 与 C 端 ``uint64_t`` 对应的最大值,用于显式检测溢出 +_UINT64_MAX = (1 << 64) - 1 + + +class CsmMassDataStatus(enum.IntEnum): + """MassData API 所有函数返回的状态码。 + + 数值与 C 端 ``csm_massdata_status_t`` 完全一致,便于跨语言对照。 + """ + + OK = 0 #: 操作成功完成。 + ERR_INVALID_ARG = -1 #: 参数为 None 或无效。 + ERR_BUFFER_TOO_SMALL = -2 #: 调用方提供的输出缓冲区不足。 + ERR_PARSE = -3 #: MassData 参数字符串无法解析。 + ERR_OVERWRITTEN = -4 #: 引用的数据已被环形缓冲区覆盖。 + ERR_CACHE_TOO_SMALL = -5 #: 待写入的数据大于缓冲区容量。 + ERR_NO_MEMORY = -6 #: 内存分配失败。 + + +class CsmMassDataOperation: + """描述最近一次对 MassData 环形缓冲区的读或写操作。 + + 等价于 ``CSM - MassData Parameter Status.vi`` 返回的 + ``Active Read Operation`` / ``Active Write Operation`` 簇。 + """ + + __slots__ = ("start", "size") + + def __init__(self, start: int = 0, size: int = 0) -> None: + self.start = int(start) #: 在缓冲区中的起始偏移量(字节)。 + self.size = int(size) #: 该次操作的字节数。 + + def __eq__(self, other: object) -> bool: + if not isinstance(other, CsmMassDataOperation): + return NotImplemented + return self.start == other.start and self.size == other.size + + def __repr__(self) -> str: # pragma: no cover - 仅用于调试输出 + return f"CsmMassDataOperation(start={self.start}, size={self.size})" + + +# --------------------------------------------------------------------------- # +# 内部状态 # +# --------------------------------------------------------------------------- # + +class _State: + """内部环形缓冲区与游标状态。仅供本模块自身使用。""" + + __slots__ = ("lock", "buffer", "capacity", "write_total", + "last_read", "last_write") + + def __init__(self) -> None: + # 互斥量保护所有可变成员;模块导入时即创建,避免延迟初始化竞态。 + self.lock = threading.Lock() + self.buffer: bytearray = bytearray(CSM_MASSDATA_DEFAULT_CACHE_SIZE) + self.capacity: int = CSM_MASSDATA_DEFAULT_CACHE_SIZE + # 累计已写入的字节数(与 C 端 ``write_total`` 等价的 64 位单调游标)。 + self.write_total: int = 0 + self.last_read = CsmMassDataOperation() + self.last_write = CsmMassDataOperation() + + +# 模块加载时即完成初始化,避免任何首次调用的并发竞争。 +_state = _State() + + +# --------------------------------------------------------------------------- # +# 内部辅助函数 # +# --------------------------------------------------------------------------- # + +def _ring_write(src: bytes) -> None: + """按 ``write_total`` 暗示的位置把 ``src`` 写入环形缓冲区。 + + 调用方必须已经持有 ``_state.lock``,并已确认 ``len(src) <= capacity``。 + """ + cap = _state.capacity + offset = _state.write_total % cap + first_run = cap - offset + n = len(src) + if n <= first_run: + _state.buffer[offset:offset + n] = src + else: + _state.buffer[offset:offset + first_run] = src[:first_run] + _state.buffer[0:n - first_run] = src[first_run:] + + +def _ring_read(start: int, size: int) -> bytes: + """从环形缓冲区中以绝对游标 ``start`` 起始位置读取 ``size`` 字节。 + + 调用方必须已经持有 ``_state.lock``,并已确认所请求的范围仍然驻留。 + """ + cap = _state.capacity + offset = start % cap + first_run = cap - offset + if size <= first_run: + return bytes(_state.buffer[offset:offset + size]) + return (bytes(_state.buffer[offset:offset + first_run]) + + bytes(_state.buffer[0:size - first_run])) + + +def _parse_uint64(text: str) -> Tuple[Optional[int], int]: + """从 ``text`` 起始位置解析一个非负十进制整数。 + + 返回 ``(value, end_index)``。若没有数字或数值超出 64 位无符号 + 整数表示范围(与 C 端 ``strtoull`` 的 ``ERANGE`` 等价),则 ``value`` + 为 ``None``。 + """ + i = 0 + n = len(text) + while i < n and "0" <= text[i] <= "9": + i += 1 + if i == 0: + return None, 0 + value = int(text[:i]) + if value > _UINT64_MAX: + return None, i + return value, i + + +def _parse_argument(argument: str) -> Tuple[CsmMassDataStatus, int, int, str]: + """解析形如 ``Start:;Size:[;DataType:]`` 的字符串。 + + 返回 ``(status, start, size, data_type)``;当字符串中没有 ``DataType`` + 后缀时 ``data_type`` 为空字符串。 + """ + if not isinstance(argument, str): + return CsmMassDataStatus.ERR_INVALID_ARG, 0, 0, "" + if not argument.startswith(_PREFIX): + return CsmMassDataStatus.ERR_PARSE, 0, 0, "" + p = argument[len(_PREFIX):] + + if not p.startswith("Start:"): + return CsmMassDataStatus.ERR_PARSE, 0, 0, "" + p = p[len("Start:"):] + start, end = _parse_uint64(p) + if start is None or end == 0 or end >= len(p) or p[end] != ";": + return CsmMassDataStatus.ERR_PARSE, 0, 0, "" + p = p[end + 1:] + + if not p.startswith("Size:"): + return CsmMassDataStatus.ERR_PARSE, 0, 0, "" + p = p[len("Size:"):] + size, end = _parse_uint64(p) + if size is None or end == 0: + return CsmMassDataStatus.ERR_PARSE, 0, 0, "" + p = p[end:] + + # 可选的 ``;DataType:`` 后缀。 + if p == "": + return CsmMassDataStatus.OK, start, size, "" + if not p.startswith(";"): + return CsmMassDataStatus.ERR_PARSE, 0, 0, "" + p = p[1:] + if not p.startswith("DataType:"): + return CsmMassDataStatus.ERR_PARSE, 0, 0, "" + data_type = p[len("DataType:"):] + # 验证 DataType 值仅包含合法字符。 + for ch in data_type: + if ch in ";<>": + return CsmMassDataStatus.ERR_PARSE, 0, 0, "" + return CsmMassDataStatus.OK, start, size, data_type + + +def _coerce_data(data: Optional[Union[bytes, bytearray, memoryview]]) -> Optional[bytes]: + """将允许的数据输入归一化为 ``bytes``;无效输入返回 ``None``。 + + 与 C 端 ``data_size == 0`` 时允许 ``data == NULL`` 的语义对应: + 本函数允许 ``data is None``,视作零长度数据。 + """ + if data is None: + return b"" + if isinstance(data, (bytes, bytearray, memoryview)): + return bytes(data) + return None + + +# --------------------------------------------------------------------------- # +# 公开 API —— 每个函数对应一个同名的 LabVIEW VI # +# --------------------------------------------------------------------------- # + +def CSM_ConfigMassDataParameterCacheSize(size: int) -> CsmMassDataStatus: + """配置 MassData 后台缓冲区大小。 + + 对应 ``CSM - Config MassData Parameter Cache Size.vi``。 + + 将内部环形缓冲区重新分配为 ``size`` 字节。与 LabVIEW VI 一致, + 在程序运行过程中调用本函数会丢弃当前已缓存的数据;通常应在 + 任何编码 / 解码调用之前、应用启动阶段调用一次。 + + :param size: 新的缓冲区大小(字节)。若从未调用过本函数, + 则默认值为 :data:`CSM_MASSDATA_DEFAULT_CACHE_SIZE`。 + :return: 成功返回 :attr:`CsmMassDataStatus.OK`; + 若 ``size`` 不是正整数返回 :attr:`CsmMassDataStatus.ERR_INVALID_ARG`; + 若分配失败返回 :attr:`CsmMassDataStatus.ERR_NO_MEMORY`。 + """ + if not isinstance(size, int) or isinstance(size, bool) or size <= 0: + return CsmMassDataStatus.ERR_INVALID_ARG + try: + new_buf = bytearray(size) + except (MemoryError, OverflowError): + return CsmMassDataStatus.ERR_NO_MEMORY + + with _state.lock: + _state.buffer = new_buf + _state.capacity = size + _state.write_total = 0 + _state.last_read = CsmMassDataOperation() + _state.last_write = CsmMassDataOperation() + return CsmMassDataStatus.OK + + +def _encode(data: Optional[Union[bytes, bytearray, memoryview]], + data_type: Optional[str]) -> Tuple[CsmMassDataStatus, str]: + """编码公共实现:对应 C 端 ``csm_massdata_encode``。""" + src = _coerce_data(data) + if src is None: + return CsmMassDataStatus.ERR_INVALID_ARG, "" + + if data_type is not None: + if not isinstance(data_type, str): + return CsmMassDataStatus.ERR_INVALID_ARG, "" + # 与 C 端长度限制对应(含末尾 NUL)。 + if len(data_type) + 1 > CSM_MASSDATA_MAX_DATATYPE_LEN: + return CsmMassDataStatus.ERR_BUFFER_TOO_SMALL, "" + # 拒绝可能破坏引用字符串语法的字符。 + for ch in data_type: + if ch in ";<>": + return CsmMassDataStatus.ERR_INVALID_ARG, "" + + with _state.lock: + if len(src) > _state.capacity: + return CsmMassDataStatus.ERR_CACHE_TOO_SMALL, "" + start_cursor = _state.write_total + if src: + _ring_write(src) + _state.write_total += len(src) + _state.last_write = CsmMassDataOperation(start_cursor, len(src)) + + if data_type is not None: + argument = (f"{_PREFIX}Start:{start_cursor};Size:{len(src)};" + f"DataType:{data_type}") + else: + argument = f"{_PREFIX}Start:{start_cursor};Size:{len(src)}" + # 与 C 端一致:包含末尾 NUL 后不得超过 ``CSM_MASSDATA_MAX_ARGUMENT_LEN``。 + if len(argument) + 1 > CSM_MASSDATA_MAX_ARGUMENT_LEN: + return CsmMassDataStatus.ERR_BUFFER_TOO_SMALL, "" + return CsmMassDataStatus.OK, argument + + +def CSM_ConvertMassDataToArgument( + data: Optional[Union[bytes, bytearray, memoryview]], +) -> Tuple[CsmMassDataStatus, str]: + """将原始数据转换为 MassData 参数(不嵌入数据类型)。 + + 对应 ``CSM - Convert MassData to Argument.vim``。原始数据被复制到 + 环形缓冲区,并返回形如 ``Start:;Size:`` 的引用字符串。 + + :param data: 待保存的原始字节,可为 ``bytes`` / ``bytearray`` / + ``memoryview``;``None`` 视作零长度数据。 + :return: ``(status, argument)``。失败时 ``argument`` 为空字符串。 + """ + return _encode(data, None) + + +def CSM_ConvertMassDataToArgumentWithDataType( + data: Optional[Union[bytes, bytearray, memoryview]], + data_type: str, +) -> Tuple[CsmMassDataStatus, str]: + """将原始数据转换为带数据类型标签的 MassData 参数。 + + 对应 ``CSM - Convert MassData to Argument With DataType.vim``。 + 生成的参数形如 ``Start:;Size:;DataType:``。 + + :param data: 待保存的原始字节。 + :param data_type: 以普通字符串表示的数据类型标签(例如 ``"1D I32"``), + 不允许包含 ``;`` / ``<`` / ``>``。 + :return: ``(status, argument)``。 + """ + if data_type is None: + return CsmMassDataStatus.ERR_INVALID_ARG, "" + return _encode(data, data_type) + + +def CSM_ConvertArgumentToMassData( + argument: str, +) -> Tuple[CsmMassDataStatus, bytes]: + """将 MassData 参数还原为原始数据。 + + 对应 ``CSM - Convert Argument to MassData.vim``。本函数解析 + ``argument`` 中的引用字符串,并返回对应的字节。LabVIEW VI 中可选的 + ``Type`` 输入在此处刻意省略:返回的就是此前写入的原始字节, + 不受嵌入的类型标签影响。 + + :param argument: MassData 参数字符串。 + :return: ``(status, data)``,其中 ``data`` 为 ``bytes``; + 当解析失败、数据已被覆盖或缓存不足时为空 ``bytes``。 + """ + status, start, size, _ = _parse_argument(argument) + if status != CsmMassDataStatus.OK: + return status, b"" + + with _state.lock: + if size > _state.capacity: + return CsmMassDataStatus.ERR_OVERWRITTEN, b"" + # 当前驻留在环形缓冲区中的窗口为 + # ``[write_total - capacity, write_total)``。任何末端落在该窗口 + # 之外的请求都视为已被覆盖。Python 整数为任意精度,加法不会 + # 溢出,但仍按 C 端等价的非溢出比较方式拆开判断,便于跨语言对照。 + oldest = (_state.write_total - _state.capacity + if _state.write_total > _state.capacity else 0) + if start < oldest or start > _state.write_total: + return CsmMassDataStatus.ERR_OVERWRITTEN, b"" + end = start + size + if end > _state.write_total: + return CsmMassDataStatus.ERR_OVERWRITTEN, b"" + result = _ring_read(start, size) if size > 0 else b"" + _state.last_read = CsmMassDataOperation(start, size) + + return CsmMassDataStatus.OK, result + + +def CSM_MassDataDataTypeString( + argument: str, +) -> Tuple[CsmMassDataStatus, str, str]: + """从 MassData 参数中解析出数据类型字符串。 + + 对应 ``CSM - MassData Data Type String.vi``。本函数不会消费输入: + 返回 ``(status, argument_dup, data_type)``,``argument_dup`` 为 + ``argument`` 的副本,以模仿 LabVIEW VI 中“返回输入副本”的数据流 + 行为;若参数中没有 ``DataType`` 字段,``data_type`` 为空字符串。 + """ + if argument is None or not isinstance(argument, str): + return CsmMassDataStatus.ERR_INVALID_ARG, "", "" + status, _, _, data_type = _parse_argument(argument) + if status != CsmMassDataStatus.OK: + # 即便解析失败,也按照 LabVIEW VI 的语义返回输入字符串自身。 + return status, argument, "" + return CsmMassDataStatus.OK, argument, data_type + + +def CSM_MassDataParameterStatus( +) -> Tuple[CsmMassDataStatus, CsmMassDataOperation, CsmMassDataOperation, int]: + """读取 MassData 后台缓冲区的状态信息。 + + 对应 ``CSM - MassData Parameter Status.vi``。 + + :return: ``(status, active_read, active_write, cache_size)``,其中 + ``active_read`` / ``active_write`` 为 :class:`CsmMassDataOperation` + 快照,``cache_size`` 为当前配置的缓冲区大小(字节)。 + """ + with _state.lock: + active_read = CsmMassDataOperation(_state.last_read.start, + _state.last_read.size) + active_write = CsmMassDataOperation(_state.last_write.start, + _state.last_write.size) + cache_size = _state.capacity + return CsmMassDataStatus.OK, active_read, active_write, cache_size + + +__all__ = [ + "CSM_MASSDATA_DEFAULT_CACHE_SIZE", + "CSM_MASSDATA_MAX_ARGUMENT_LEN", + "CSM_MASSDATA_MAX_DATATYPE_LEN", + "CsmMassDataStatus", + "CsmMassDataOperation", + "CSM_ConfigMassDataParameterCacheSize", + "CSM_ConvertMassDataToArgument", + "CSM_ConvertMassDataToArgumentWithDataType", + "CSM_ConvertArgumentToMassData", + "CSM_MassDataDataTypeString", + "CSM_MassDataParameterStatus", +] From a73f5f6343943c49677017e4aa9c5706480ed982 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 25 Apr 2026 15:44:32 +0000 Subject: [PATCH 3/3] Address PR review: harden parser, zero-copy data ingest, uint64 wraparound Agent-Logs-Url: https://github.com/NEVSTOP-LAB/CSM-MassData-Parameter-Support/sessions/c40cd282-956a-404b-9866-3e3e838998a2 Co-authored-by: nevstop <8196752+nevstop@users.noreply.github.com> --- python/_test/test_csm_massdata.py | 32 +++++++++ python/csm_massdata.py | 106 +++++++++++++++++++++--------- 2 files changed, 107 insertions(+), 31 deletions(-) diff --git a/python/_test/test_csm_massdata.py b/python/_test/test_csm_massdata.py index 2417225..3807da9 100644 --- a/python/_test/test_csm_massdata.py +++ b/python/_test/test_csm_massdata.py @@ -233,5 +233,37 @@ def worker(tid: int) -> None: self.assertEqual(errors, [], msg="\n".join(errors)) + # 测试:超长数字串应被快速拒绝(不构造任意精度大整数) + def test_parse_long_digit_run_rejected_fast(self) -> None: + bad = "Start:" + ("9" * 1_000_000) + ";Size:0" + status, data = CSM_ConvertArgumentToMassData(bad) + self.assertEqual(status, CsmMassDataStatus.ERR_PARSE) + self.assertEqual(data, b"") + + # 测试:解析端的 DataType 长度限制与 C 端一致 + def test_parse_rejects_overlong_data_type(self) -> None: + too_long = "x" * CSM_MASSDATA_MAX_DATATYPE_LEN # 含末尾 NUL 后超限 + arg = f"Start:0;Size:0;DataType:{too_long}" + status, dup, dtype = CSM_MassDataDataTypeString(arg) + self.assertEqual(status, CsmMassDataStatus.ERR_BUFFER_TOO_SMALL) + self.assertEqual(dtype, "") + + # 测试:传入 ``memoryview`` / ``bytearray`` 时不会被多余拷贝破坏数据 + def test_accepts_bytearray_and_memoryview(self) -> None: + ba = bytearray(b"hello world") + status, arg = CSM_ConvertMassDataToArgument(ba) + self.assertEqual(status, CsmMassDataStatus.OK) + status, restored = CSM_ConvertArgumentToMassData(arg) + self.assertEqual(status, CsmMassDataStatus.OK) + self.assertEqual(restored, bytes(ba)) + + mv = memoryview(b"another payload") + status, arg = CSM_ConvertMassDataToArgument(mv) + self.assertEqual(status, CsmMassDataStatus.OK) + status, restored = CSM_ConvertArgumentToMassData(arg) + self.assertEqual(status, CsmMassDataStatus.OK) + self.assertEqual(restored, bytes(mv)) + + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/python/csm_massdata.py b/python/csm_massdata.py index 50a85ab..17d691b 100644 --- a/python/csm_massdata.py +++ b/python/csm_massdata.py @@ -109,9 +109,17 @@ class _State: def __init__(self) -> None: # 互斥量保护所有可变成员;模块导入时即创建,避免延迟初始化竞态。 self.lock = threading.Lock() - self.buffer: bytearray = bytearray(CSM_MASSDATA_DEFAULT_CACHE_SIZE) - self.capacity: int = CSM_MASSDATA_DEFAULT_CACHE_SIZE - # 累计已写入的字节数(与 C 端 ``write_total`` 等价的 64 位单调游标)。 + try: + # 默认在导入阶段就分配好缓冲区,避免任何首次调用的并发竞争。 + self.buffer: bytearray = bytearray(CSM_MASSDATA_DEFAULT_CACHE_SIZE) + self.capacity: int = CSM_MASSDATA_DEFAULT_CACHE_SIZE + except MemoryError: + # 在受限环境中允许模块继续导入;公开 API 可基于 ``capacity == 0`` + # 统一返回 ERR_NO_MEMORY,而不是在导入阶段直接抛异常。 + self.buffer = bytearray() + self.capacity = 0 + # 累计已写入的字节数(与 C 端 ``write_total`` 等价的 64 位单调游标, + # 始终保持在 ``[0, 2**64)`` 范围内以匹配跨语言契约)。 self.write_total: int = 0 self.last_read = CsmMassDataOperation() self.last_write = CsmMassDataOperation() @@ -125,10 +133,12 @@ def __init__(self) -> None: # 内部辅助函数 # # --------------------------------------------------------------------------- # -def _ring_write(src: bytes) -> None: +def _ring_write(src) -> None: """按 ``write_total`` 暗示的位置把 ``src`` 写入环形缓冲区。 - 调用方必须已经持有 ``_state.lock``,并已确认 ``len(src) <= capacity``。 + ``src`` 可以是任何长度等于其字节数的 bytes-like 对象(``bytes`` / + ``bytearray`` / 单字节 ``memoryview``)。调用方必须已经持有 + ``_state.lock``,并已确认 ``len(src) <= capacity``。 """ cap = _state.capacity offset = _state.write_total % cap @@ -160,16 +170,31 @@ def _parse_uint64(text: str) -> Tuple[Optional[int], int]: 返回 ``(value, end_index)``。若没有数字或数值超出 64 位无符号 整数表示范围(与 C 端 ``strtoull`` 的 ``ERANGE`` 等价),则 ``value`` - 为 ``None``。 + 为 ``None``;``end_index`` 仍指向连续数字串的末尾,供调用方继续 + 向后定位分隔符。 + + 解析采用增量方式并在每一步检测溢出,以便在面对超长数字串时 + 快速失败,避免构造任意精度大整数带来的 CPU / 内存放大。 """ - i = 0 n = len(text) + i = 0 + value = 0 + overflow = False + max_before_mul = _UINT64_MAX // 10 + max_last_digit = _UINT64_MAX % 10 while i < n and "0" <= text[i] <= "9": + digit = ord(text[i]) - ord("0") + if not overflow: + if value > max_before_mul or ( + value == max_before_mul and digit > max_last_digit + ): + overflow = True + else: + value = value * 10 + digit i += 1 if i == 0: return None, 0 - value = int(text[:i]) - if value > _UINT64_MAX: + if overflow: return None, i return value, i @@ -210,25 +235,42 @@ def _parse_argument(argument: str) -> Tuple[CsmMassDataStatus, int, int, str]: p = p[1:] if not p.startswith("DataType:"): return CsmMassDataStatus.ERR_PARSE, 0, 0, "" - data_type = p[len("DataType:"):] + p = p[len("DataType:"):] # 验证 DataType 值仅包含合法字符。 - for ch in data_type: + for ch in p: if ch in ";<>": return CsmMassDataStatus.ERR_PARSE, 0, 0, "" - return CsmMassDataStatus.OK, start, size, data_type + # 与 C 端保持一致:DataType 必须能放入固定长度缓冲区(含末尾 NUL)。 + if len(p) + 1 > CSM_MASSDATA_MAX_DATATYPE_LEN: + return CsmMassDataStatus.ERR_BUFFER_TOO_SMALL, 0, 0, "" + return CsmMassDataStatus.OK, start, size, p -def _coerce_data(data: Optional[Union[bytes, bytearray, memoryview]]) -> Optional[bytes]: - """将允许的数据输入归一化为 ``bytes``;无效输入返回 ``None``。 +def _coerce_data( + data: Optional[Union[bytes, bytearray, memoryview]] +) -> Optional[memoryview]: + """将允许的数据输入归一化为单字节 ``memoryview``,避免额外拷贝。 与 C 端 ``data_size == 0`` 时允许 ``data == NULL`` 的语义对应: - 本函数允许 ``data is None``,视作零长度数据。 + 本函数允许 ``data is None``,视作零长度数据。返回的 ``memoryview`` + 会直接传给环形缓冲区的切片赋值,不会构造中间 ``bytes`` 对象, + 在大负载下可避免一次额外的全量拷贝。 """ if data is None: - return b"" - if isinstance(data, (bytes, bytearray, memoryview)): - return bytes(data) - return None + return memoryview(b"") + if not isinstance(data, (bytes, bytearray, memoryview)): + return None + try: + mv = memoryview(data) + except TypeError: + return None + if mv.itemsize != 1: + # 多维或非字节视图:转换为单字节平面视图(仅创建视图、不拷贝)。 + try: + mv = mv.cast("B") + except TypeError: + return None + return mv # --------------------------------------------------------------------------- # @@ -285,12 +327,16 @@ def _encode(data: Optional[Union[bytes, bytearray, memoryview]], return CsmMassDataStatus.ERR_INVALID_ARG, "" with _state.lock: + if _state.capacity == 0: + return CsmMassDataStatus.ERR_NO_MEMORY, "" if len(src) > _state.capacity: return CsmMassDataStatus.ERR_CACHE_TOO_SMALL, "" start_cursor = _state.write_total - if src: + if len(src) > 0: _ring_write(src) - _state.write_total += len(src) + # 与 C 端 ``uint64_t`` 一致按 64 位环绕,确保写入足够多数据后 + # 仍能产生合法的 ``Start:`` 字段且能被解码端正确识别。 + _state.write_total = (_state.write_total + len(src)) & _UINT64_MAX _state.last_write = CsmMassDataOperation(start_cursor, len(src)) if data_type is not None: @@ -357,18 +403,16 @@ def CSM_ConvertArgumentToMassData( return status, b"" with _state.lock: + if _state.capacity == 0: + return CsmMassDataStatus.ERR_NO_MEMORY, b"" if size > _state.capacity: return CsmMassDataStatus.ERR_OVERWRITTEN, b"" - # 当前驻留在环形缓冲区中的窗口为 - # ``[write_total - capacity, write_total)``。任何末端落在该窗口 - # 之外的请求都视为已被覆盖。Python 整数为任意精度,加法不会 - # 溢出,但仍按 C 端等价的非溢出比较方式拆开判断,便于跨语言对照。 - oldest = (_state.write_total - _state.capacity - if _state.write_total > _state.capacity else 0) - if start < oldest or start > _state.write_total: - return CsmMassDataStatus.ERR_OVERWRITTEN, b"" - end = start + size - if end > _state.write_total: + # 当前驻留在环形缓冲区中的窗口为 ``[write_total - capacity, write_total)``, + # 但 ``write_total`` 与 ``start`` 都按 64 位无符号语义解释,可能发生 + # 环绕。使用模 2**64 的“反向距离”可在不依赖绝对大小关系的前提下 + # 同时覆盖未环绕与环绕两种情形,与 C 端的 ``uint64_t`` 计算等价。 + distance = (_state.write_total - start) & _UINT64_MAX + if distance > _state.capacity or distance < size: return CsmMassDataStatus.ERR_OVERWRITTEN, b"" result = _ring_read(start, size) if size > 0 else b"" _state.last_read = CsmMassDataOperation(start, size)