Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pytest-embedded-wokwi/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ requires-python = ">=3.10"
dependencies = [
"pytest-embedded~=2.7.0",
"wokwi-client>=0.4.0",
"tomli>=1.1.0; python_version < '3.11'",
]

[project.optional-dependencies]
Expand Down
131 changes: 130 additions & 1 deletion pytest-embedded-wokwi/pytest_embedded_wokwi/wokwi.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import json
import logging
import os
import sys
import typing as t
from pathlib import Path

if sys.version_info >= (3, 11):
import tomllib
else:
import tomli as tomllib # type: ignore[no-redef]

from pytest_embedded.log import DuplicateStdoutPopen, MessageQueue
from pytest_embedded.utils import Meta
from wokwi_client import GET_TOKEN_URL, WokwiClientSync
Expand Down Expand Up @@ -79,19 +85,142 @@ def _setup_simulation(self, diagram: str, firmware_path: str, elf_path: str):
hello = self.client.connect()
logging.info('Connected to Wokwi Simulator, server version: %s', hello.get('version', 'unknown'))

# Upload files
# Upload custom chips before diagram so the server can resolve chip
# references in the diagram at upload time.
custom_chips = self._upload_custom_chips(Path(diagram).parent)

# Upload diagram and ELF
self.client.upload_file('diagram.json', Path(diagram))
self.client.upload_file('pytest.elf', Path(elf_path))

if firmware_path.endswith('flasher_args.json'):
firmware = self.client.upload_idf_firmware(firmware_path)
kwargs = {'firmware': firmware.firmware, 'elf': 'pytest.elf', 'flash_size': firmware.flash_size}
else:
firmware = self.client.upload_file('pytest.bin', Path(firmware_path))
kwargs = {'firmware': firmware, 'elf': 'pytest.elf'}

if custom_chips:
kwargs['chips'] = custom_chips

logging.info('Uploaded diagram and firmware to Wokwi. Starting simulation...')
self.client.start_simulation(**kwargs)

def _upload_custom_chips(self, diagram_dir: Path) -> list[str]:
"""Upload custom chip files and return chip names.

Reads chip definitions from ``wokwi.toml`` if present in *diagram_dir*.
Each ``[[chip]]`` entry must supply a ``name`` and a ``binary`` path
(relative to the ``wokwi.toml`` file). When ``wokwi.toml`` is absent
or contains no chip entries the method falls back to scanning a
``chips/`` sub-directory for ``*.chip.json`` / ``*.chip.wasm`` pairs.
"""
toml_path = diagram_dir / 'wokwi.toml'
if toml_path.is_file():
chip_specs = self._chip_specs_from_toml(toml_path)
if chip_specs is not None:
return self._upload_chip_specs(chip_specs)

# Fallback: auto-detect from chips/ directory
return self._upload_chip_specs(self._chip_specs_from_dir(diagram_dir))

def _chip_specs_from_toml(self, toml_path: Path) -> list[tuple[Path, Path, str]] | None:
"""Parse ``[[chip]]`` entries from *toml_path*.

All relative paths in the ``[[chip]]`` entries are resolved relative to
the directory containing *toml_path* (i.e. the sketch directory where
``wokwi.toml`` lives), which matches the convention used by the
``generate_wokwi_toml`` script.

Returns a list of ``(json_path, binary_path, chip_name)`` tuples, or
``None`` if the file cannot be parsed or contains no chip entries.
"""
base_dir = toml_path.parent
try:
with open(toml_path, 'rb') as f:
data = tomllib.load(f)
except (OSError, tomllib.TOMLDecodeError) as e:
logging.warning('Could not parse wokwi.toml: %s', e)
return None

chip_entries = data.get('chip', [])
if not chip_entries:
return None

specs: list[tuple[Path, Path, str]] = []
for entry in chip_entries:
name = entry.get('name')
binary_rel = entry.get('binary')
if not name or not binary_rel:
logging.warning('Skipping chip entry missing name or binary: %s', entry)
continue

binary_path = (base_dir / binary_rel).resolve()
json_path = binary_path.parent / (name + '.chip.json')

if not binary_path.exists():
logging.warning('Chip binary not found: %s', binary_path)
continue
if not json_path.exists():
logging.warning('Chip JSON not found: %s', json_path)
continue

specs.append((json_path, binary_path, name))

return specs if specs else None

def _chip_specs_from_dir(self, diagram_dir: Path) -> list[tuple[Path, Path, str]]:
"""Auto-detect chip specs by scanning ``chips/`` under *diagram_dir*.

Returns a list of ``(json_path, binary_path, chip_name)`` tuples.
"""
chips_dir = diagram_dir / 'chips'
if not chips_dir.is_dir():
return []

specs: list[tuple[Path, Path, str]] = []
for chip_json in chips_dir.glob('*.chip.json'):
chip_name = chip_json.name.removesuffix('.chip.json')

chip_binary = None
for ext in ['.chip.wasm', '.chip.bin']:
candidate = chips_dir / (chip_name + ext)
if candidate.exists():
chip_binary = candidate
break

if chip_binary is None:
logging.warning('No binary file found for chip %s, skipping', chip_name)
continue

specs.append((chip_json, chip_binary, chip_name))

return specs

def _upload_chip_specs(self, specs: list[tuple[Path, Path, str]]) -> list[str]:
"""Upload chip files described by *specs* and return the chip names.

The Wokwi server requires chip JSON files to be uploaded via the
``text`` field of the ``file:upload`` command (not base64-encoded
binary), matching the behaviour of the official ``wokwi-cli`` TypeScript
client. The wokwi-python-client public API only supports binary
uploads, so we send the JSON via the transport's ``request`` method
directly. The binary (``.chip.wasm``) is uploaded normally under its
original filename.
"""
chip_names = []
for json_path, binary_path, chip_name in specs:
# Send chip JSON as text (server rejects binary-encoded chip JSON).
self.client._call(
self.client._async_client._transport.request(
'file:upload', {'name': json_path.name, 'text': json_path.read_text(encoding='utf-8')}
)
)
self.client.upload_file(binary_path.name, binary_path)
chip_names.append(chip_name)
logging.info('Uploaded custom chip: %s', chip_name)
return chip_names

def _start_serial_monitoring(self):
"""Start monitoring serial output and forward to stdout and message queue."""

Expand Down
Loading