diff --git a/py-rconv/README.md b/py-rconv/README.md new file mode 100644 index 0000000..a5b64c1 --- /dev/null +++ b/py-rconv/README.md @@ -0,0 +1,116 @@ +# rconv-py + +A pure-Python rewrite of SeisSol's `rconv` tool. + +`rconv` converts kinematic rupture models from the [Standard Rupture +Format](https://strike.scec.org/scecpedia/Standard_Rupture_Format) (SRF) to +SeisSol's NetCDF Rupture Format (NRF). It can also emit an XDMF file for +visualisation in ParaView. + +The original `rconv` is a C++ tool that depends on a deprecated version of +PROJ.4 and on the SeisSol build system. This package replaces it with a small, +self-contained Python module that uses modern `pyproj` and `netCDF4` — nothing +else. + +## Installation + +```sh +pip install . +``` + +Dependencies (installed automatically): + +* `numpy >= 1.24` +* `pyproj >= 3.4` +* `netCDF4 >= 1.6` + +## Quick start + +```sh +# Convert SRF → NRF for SeisSol input. +rconv to-nrf input.srf output.nrf \ + --mcs "+proj=tmerc +datum=WGS84 +k=0.9996 +lon_0=-118.5150 +lat_0=34.3440 +axis=enu" + +# Produce an XDMF visualisation file for ParaView. +rconv to-xdmf input.srf visualization.xmf +``` + +Equivalent invocation as a module (no entry-point installation needed): + +```sh +python -m rconv to-nrf input.srf output.nrf --mcs "..." +``` + +## CLI + +Two subcommands cover the two distinct conversions: + +``` +rconv to-nrf INPUT OUTPUT --mcs PROJ_STRING [--normalize-onset] [--source-proj PROJ_STRING] +rconv to-xdmf INPUT OUTPUT [--vcs PROJ_STRING] [--source-proj PROJ_STRING] +``` + +* `--mcs` — PROJ string for the *mesh* coordinate system. **Required** for NRF + output. Determines both how lon/lat/depth becomes (x, y, z) and how + slip-vector components are oriented (via the `+axis=` option). +* `--vcs` — PROJ string for the *visualisation* coordinate system. Defaults to + `+proj=geocent +datum=WGS84 +units=m +no_defs` (a geocentric globe view). +* `--normalize-onset` — subtract the minimum onset time across all sources. + Useful when the SRF's absolute clock is meaningless. +* `--source-proj` — override the source CRS (default: WGS84 longlat). Useful + only when the SRF has already-projected coordinates. + +## Programmatic API + +```python +from rconv import parse_srf, CoordinateMapping, write_nrf, write_xmf + +sources = parse_srf("northridge.srf") +mapping = CoordinateMapping( + "+proj=tmerc +datum=WGS84 +k=0.9996 +lon_0=-118.515 +lat_0=34.344 +axis=ned" +) +write_nrf("northridge.nrf", sources, mapping) +write_xmf("northridge.xmf", sources, mapping) +``` + +## What `+axis=` means + +The `+axis=` value in your PROJ string controls how a canonical +(north, east, down) triple — the natural frame of SRF coordinates — is +permuted and signed into the mesh CS. It is three characters from `enwsud`: + +* `+axis=enu` — x = east, y = north, z = up (PROJ's default if you omit + `+axis=` entirely). +* `+axis=ned` — x = north, y = east, z = down. +* `+axis=seu` — x = south, y = east, z = up. + +The same string also controls how slip vectors (along strike, in fault plane, +along normal) are oriented in your mesh. If your moment tensors look wrong, +double-check `+axis=` first. + +## Differences from the C++ tool + +* **CLI**: two focused subcommands instead of five short flags. +* **PROJ**: uses modern `pyproj` (which wraps PROJ 8/9), not the legacy + PROJ.4 API the C++ tool depended on. +* **Output**: NRF output is functionally identical and consumed by SeisSol's + NRF reader without modification. Two cosmetic differences: + - The `Subfault_units` compound type is replaced by a plain string + attribute on `subfaults`. SeisSol's reader never inspects either form, + so this is invisible to downstream use; only `ncdump` output differs. + - Slip-rate dimensions of size 0 are written as UNLIMITED rather than + fixed-size-0. Again, the reader doesn't care. +* **Examples**: ships with a test suite that round-trips the Northridge SRF + from `SeisSol/Examples`. + +## Development + +Run the tests: + +```sh +pip install -e .[dev] +pytest +``` + +The Northridge integration tests are skipped automatically if the SRF file +isn't available at `~/Examples/Northridge/`. diff --git a/py-rconv/pyproject.toml b/py-rconv/pyproject.toml new file mode 100644 index 0000000..e360826 --- /dev/null +++ b/py-rconv/pyproject.toml @@ -0,0 +1,43 @@ +[build-system] +requires = ["setuptools>=64", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "rconv" +version = "0.1.0" +description = "Convert SRF kinematic rupture models to SeisSol's NetCDF Rupture Format (NRF)." +readme = "README.md" +requires-python = ">=3.10" +license = { text = "BSD-3-Clause" } +authors = [ + { name = "SeisSol Group" }, +] +keywords = ["seismology", "srf", "nrf", "seissol", "rupture", "earthquake"] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: BSD License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Topic :: Scientific/Engineering", +] +dependencies = [ + "numpy>=1.24", + "pyproj>=3.4", + "netCDF4>=1.6", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7", +] + +[project.scripts] +rconv = "rconv.cli:main" + +[project.urls] +Homepage = "https://github.com/SeisSol/SeisSol" +Documentation = "https://seissol.readthedocs.io/en/latest/standard-rupture-format.html" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/py-rconv/src/rconv/__init__.py b/py-rconv/src/rconv/__init__.py new file mode 100644 index 0000000..f2287b4 --- /dev/null +++ b/py-rconv/src/rconv/__init__.py @@ -0,0 +1,45 @@ +"""rconv-py — convert SRF kinematic rupture models to SeisSol's NRF. + +A pure-Python replacement for the C++ ``rconv`` tool that ships with SeisSol. +This package is dependency-free with respect to SeisSol; it relies only on +``numpy``, ``pyproj`` and ``netCDF4``. + +The public API is small: + +* :func:`rconv.srf.parse_srf` — read an SRF file into :class:`PointSource`\\ s. +* :class:`rconv.mapping.CoordinateMapping` — transform geographic coords and + fault-local vectors into a mesh CS. +* :func:`rconv.nrf.write_nrf` — emit a NetCDF Rupture Format file for SeisSol. +* :func:`rconv.xmf.write_xmf` — emit an XDMF file for ParaView. + +Example +------- + +>>> from rconv import parse_srf, CoordinateMapping, write_nrf +>>> sources = parse_srf("northridge.srf") +>>> mapping = CoordinateMapping( +... "+proj=tmerc +datum=WGS84 +k=0.9996 +lon_0=-118.515 +lat_0=34.344 " +... "+units=m +axis=ned" +... ) +>>> write_nrf("northridge.nrf", sources, mapping) +""" + +from __future__ import annotations + +from .mapping import CoordinateMapping, Vec3 +from .nrf import read_nrf, write_nrf +from .srf import PointSource, SRFParseError, parse_srf +from .xmf import write_xmf + +__all__ = [ + "CoordinateMapping", + "PointSource", + "SRFParseError", + "Vec3", + "parse_srf", + "read_nrf", + "write_nrf", + "write_xmf", +] + +__version__ = "0.1.0" diff --git a/py-rconv/src/rconv/__main__.py b/py-rconv/src/rconv/__main__.py new file mode 100644 index 0000000..af6a302 --- /dev/null +++ b/py-rconv/src/rconv/__main__.py @@ -0,0 +1,10 @@ +"""Allow ``python -m rconv ...`` invocation.""" + +from __future__ import annotations + +import sys + +from .cli import main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/py-rconv/src/rconv/cli.py b/py-rconv/src/rconv/cli.py new file mode 100644 index 0000000..39cabe7 --- /dev/null +++ b/py-rconv/src/rconv/cli.py @@ -0,0 +1,148 @@ +"""Modern command-line interface for ``rconv``. + +The classic C++ tool accepts five short flags that mix concerns (some required +together, others mutually independent). Here we split into two focused +sub-commands so the help is self-documenting and shell-completion is happy: + +* ``rconv to-nrf INPUT OUTPUT --mcs ...`` — produce a SeisSol input file. +* ``rconv to-xdmf INPUT OUTPUT [--vcs ...]`` — produce a visualisation file. + +Both commands accept ``--source-proj`` to override the default input CRS +(WGS84 longlat) on the rare occasion an SRF arrives in something else. +""" + +from __future__ import annotations + +import argparse +import logging +import sys +from pathlib import Path + +from .mapping import CoordinateMapping +from .nrf import write_nrf +from .srf import parse_srf +from .xmf import write_xmf + +_DEFAULT_VCS = "+proj=geocent +datum=WGS84 +units=m +no_defs" + + +def main(argv: list[str] | None = None) -> int: + """Entry point. Returns a process exit code.""" + parser = _build_parser() + args = parser.parse_args(argv) + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(levelname)s %(name)s: %(message)s", + ) + + if args.command == "to-nrf": + return _cmd_to_nrf(args) + if args.command == "to-xdmf": + return _cmd_to_xdmf(args) + parser.print_help() + return 2 + + +# --------------------------------------------------------------------------- +# argparse plumbing +# --------------------------------------------------------------------------- + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="rconv", + description=( + "Convert SRF (Standard Rupture Format) files to NRF (NetCDF " + "Rupture Format) for SeisSol, or to XDMF for visualisation." + ), + ) + parser.add_argument( + "-v", "--verbose", action="store_true", help="Enable debug logging.", + ) + sub = parser.add_subparsers(dest="command", required=True, metavar="COMMAND") + + nrf = sub.add_parser( + "to-nrf", + help="Convert SRF → NRF.", + description="Convert an SRF file to a NetCDF Rupture Format file for SeisSol.", + ) + nrf.add_argument("input", type=Path, help="Input SRF file (.srf).") + nrf.add_argument("output", type=Path, help="Output NRF file (.nrf).") + nrf.add_argument( + "--mcs", required=True, metavar="PROJ_STRING", + help=( + "PROJ string describing the mesh coordinate system, e.g. " + '"+proj=utm +zone=10 +datum=WGS84 +units=m +axis=ned +no_defs". ' + "The +axis= option determines how slip vectors are oriented in the " + "MCS (default: enu)." + ), + ) + nrf.add_argument( + "--normalize-onset", action="store_true", + help="Subtract the minimum onset time across sources from every source.", + ) + nrf.add_argument( + "--source-proj", default=None, metavar="PROJ_STRING", + help=( + "Override the source CRS (defaults to WGS84 longlat). Useful only " + "if your SRF carries already-projected coordinates." + ), + ) + + xmf = sub.add_parser( + "to-xdmf", + help="Convert SRF → XDMF (for ParaView).", + description="Convert an SRF file to an XDMF visualisation file.", + ) + xmf.add_argument("input", type=Path, help="Input SRF file (.srf).") + xmf.add_argument("output", type=Path, help="Output XDMF file (.xmf/.xdmf).") + xmf.add_argument( + "--vcs", default=_DEFAULT_VCS, metavar="PROJ_STRING", + help=( + "PROJ string for the visualisation coordinate system. " + f"Defaults to {_DEFAULT_VCS!r} (geocentric — gives a globe view)." + ), + ) + xmf.add_argument( + "--source-proj", default=None, metavar="PROJ_STRING", + help="Override the source CRS (defaults to WGS84 longlat).", + ) + + return parser + + +# --------------------------------------------------------------------------- +# commands +# --------------------------------------------------------------------------- + +def _cmd_to_nrf(args: argparse.Namespace) -> int: + sources = _read_srf(args.input) + if not sources: + print(f"error: {args.input} produced no usable point sources", file=sys.stderr) + return 1 + mapping = CoordinateMapping(args.mcs, source_proj=args.source_proj) + write_nrf(args.output, sources, mapping, normalize_onset=args.normalize_onset) + print(f"Wrote {args.output} ({len(sources)} sources).") + return 0 + + +def _cmd_to_xdmf(args: argparse.Namespace) -> int: + sources = _read_srf(args.input) + if not sources: + print(f"error: {args.input} produced no usable point sources", file=sys.stderr) + return 1 + mapping = CoordinateMapping(args.vcs, source_proj=args.source_proj) + write_xmf(args.output, sources, mapping) + print(f"Wrote {args.output} ({len(sources)} sources).") + return 0 + + +def _read_srf(path: Path) -> list: + print(f"Reading {path} ...", end=" ", flush=True) + sources = parse_srf(path) + print(f"{len(sources)} sources.") + return sources + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/py-rconv/src/rconv/mapping.py b/py-rconv/src/rconv/mapping.py new file mode 100644 index 0000000..08aefa0 --- /dev/null +++ b/py-rconv/src/rconv/mapping.py @@ -0,0 +1,244 @@ +"""Coordinate transformations for SRF → NRF conversion. + +Two distinct operations live here: + +1. ``to_mesh`` — transform a geographic point (lon, lat, depth) into the user's + mesh coordinate system (MCS) using a PROJ.4 / PROJ string. +2. ``to_mcs`` — rotate a vector given in the fault-local frame + (along strike, along dip, along normal) into the MCS. + +Both honour the ``+axis=`` PROJ option (default ``enu``: x=east, y=north, z=up). +The axis option determines how a canonical (north, east, down) triple — which is +the natural frame of the SRF format — is permuted/signed into the user's MCS. + +A note on pyproj subtleties +--------------------------- + +pyproj's behaviour around ``+axis=`` differs between projected and geocentric +target CRSs: + +* For **geocentric** targets, pyproj fully honours ``+axis=`` (it permutes and + negates X/Y/Z accordingly), so we delegate the 3-D transform to it. +* For **projected** targets, pyproj only permutes the *horizontal* x/y when + ``always_xy=False``; the vertical z is always passed through. To get + consistent, predictable behaviour we therefore do the horizontal projection + with ``always_xy=True`` (yielding canonical east/north) and apply the axis + permutation ourselves. + +The end result matches the C++ ``rconv``'s use of PROJ.4's ``pj_transform`` plus +the ``adjustAxes`` helper, without depending on PROJ.4's deprecated internals. +""" + +from __future__ import annotations + +import logging +import math +import re +from dataclasses import dataclass + +from pyproj import CRS, Transformer + +_log = logging.getLogger(__name__) + +# Canonical source CRS for SRF inputs. We do unit conversions (km→m) and +# vertical sign handling ourselves, so the source CRS is the plain WGS84 +# geographic CRS in degrees with no vertical hint. +_DEFAULT_SOURCE_PROJ = "+proj=longlat +datum=WGS84 +no_defs" + +# Default +axis= in PROJ.4 if the option is absent. +_DEFAULT_AXIS = "enu" + +_VALID_AXIS_CHARS = frozenset("enwsud") + + +@dataclass(frozen=True) +class Vec3: + """A tiny 3-vector used purely for clarity at call sites.""" + x: float + y: float + z: float + + def as_tuple(self) -> tuple[float, float, float]: + return (self.x, self.y, self.z) + + +class CoordinateMapping: + """Transformation between WGS84 lon/lat/depth and a target mesh CS. + + Parameters + ---------- + target_proj: + PROJ string describing the MCS, e.g. + ``"+proj=utm +zone=10 +datum=WGS84 +units=m +axis=ned +no_defs"``. + source_proj: + PROJ string for the input CRS. Defaults to plain WGS84 longlat + (``+proj=longlat +datum=WGS84 +no_defs``); this matches the C++ tool's + intent, with the km→m vertical conversion handled explicitly in + :meth:`to_mesh`. + """ + + def __init__(self, target_proj: str, source_proj: str | None = None): + self.target_proj = target_proj + self.source_proj = source_proj or _DEFAULT_SOURCE_PROJ + + self.target_crs = CRS.from_proj4(target_proj) + self.source_crs = CRS.from_proj4(self.source_proj) + + self.axis = _parse_axis(target_proj) + if len(self.axis) != 3 or any(c not in _VALID_AXIS_CHARS for c in self.axis): + raise ValueError( + f"Invalid +axis= in {target_proj!r}: {self.axis!r} " + f"(expected a 3-character string of {sorted(_VALID_AXIS_CHARS)})" + ) + + self.is_geocentric = self.target_crs.is_geocentric + + # We use always_xy=True for projected targets so we get canonical + # (east, north) and apply our own axis logic. For geocentric, the + # value of always_xy is moot — pyproj handles +axis= directly. + self._transformer = Transformer.from_crs( + self.source_crs, self.target_crs, always_xy=True, + ) + + # Sanity-check vertical units. The output is documented as meters; if + # the MCS chose something else, warn (the C++ tool emits a similar + # warning, since downstream code assumes m / m^2 / m/s). + self._maybe_warn_units() + + # ------------------------------------------------------------------ + # main operations + # ------------------------------------------------------------------ + + def to_mesh(self, longitude: float, latitude: float, depth_km: float) -> Vec3: + """Map (lon, lat, depth) → (x, y, z) in the mesh CS. + + ``depth_km`` is positive *downwards*, as in SRF. The returned ``z`` + sign follows the third character of ``self.axis`` (``u``: positive + upwards → output negative for positive depth; ``d``: positive downwards + → output positive for positive depth). + """ + depth_m = depth_km * 1000.0 + + if self.is_geocentric: + # For geocentric targets pyproj does the full 3-D transformation, + # including any +axis= permutation. We pass height = -depth. + x, y, z = self._transformer.transform(longitude, latitude, -depth_m) + return Vec3(x, y, z) + + # Projected target: pyproj returns canonical (east, north) with + # always_xy=True. The z it returns is unmodified — we ignore it and + # compute z ourselves from the depth. + east, north, _z_unused = self._transformer.transform(longitude, latitude, -depth_m) + # Canonical NED triple (north, east, down): + ned = (north, east, depth_m) + return Vec3(*_permute_ned(ned, self.axis)) + + def to_mcs( + self, + strike: float, dip: float, rake: float, + u1: float, u2: float, u3: float, + ) -> Vec3: + """Rotate a fault-local vector into the mesh CS. + + ``(u1, u2, u3)`` are the components of the vector in the fault-local + frame: along strike, along dip (down-dip in the fault plane), along + the fault normal. The rotation goes via canonical NED and is then + permuted/signed according to ``self.axis``. + """ + ned = _strike_dip_rake_to_ned(strike, dip, rake, u1, u2, u3) + return Vec3(*_permute_ned(ned, self.axis)) + + # ------------------------------------------------------------------ + # internals + # ------------------------------------------------------------------ + + def _maybe_warn_units(self) -> None: + # axis_info has a `unit_name` per axis. We expect 'metre' across the + # board; anything else means the user mixed units, and downstream + # area/sliprate conversions (cm^2 → m^2, cm/s → m/s) will produce + # mismatched output. The C++ tool emits the same warning. + for axis in self.target_crs.axis_info: + if axis.unit_name and axis.unit_name not in ("metre", "meter"): + _log.warning( + "Mesh coordinate system does not use metre as length unit " + "(axis %s is in %s). NRF area/sliprate are still written in " + "m^2 / m/s — expect inconsistencies.", + axis.abbrev or axis.direction, axis.unit_name, + ) + return + + +# --------------------------------------------------------------------------- +# free functions: rotation and permutation +# --------------------------------------------------------------------------- + +def _strike_dip_rake_to_ned( + strike: float, dip: float, rake: float, + u1: float, u2: float, u3: float, +) -> tuple[float, float, float]: + """Rotate ``(u1, u2, u3)`` from the fault-local frame into NED. + + The rotation matrix is the standard one for the SRF convention: + + * ``u1`` is along strike (in the fault plane, in the strike direction) + * ``u2`` is in the fault plane, perpendicular to strike, pointing down-dip + * ``u3`` is the fault normal + + The output is in the canonical (north, east, down) frame. + + The expression is verbatim from the C++ ``Map::toMCS`` (before its + ``adjustAxes`` step), kept in this explicit form so the source-to-source + correspondence is easy to verify. + """ + s = math.radians(strike) + d = math.radians(dip) + r = math.radians(rake) + sin_s, cos_s = math.sin(s), math.cos(s) + sin_d, cos_d = math.sin(d), math.cos(d) + sin_r, cos_r = math.sin(r), math.cos(r) + + n = (u1 * (sin_r * sin_s * cos_d + cos_r * cos_s) + + u2 * (-sin_r * cos_s + sin_s * cos_d * cos_r) + - u3 * sin_d * sin_s) + e = (u1 * (-sin_r * cos_d * cos_s + sin_s * cos_r) + + u2 * (-sin_r * sin_s - cos_d * cos_r * cos_s) + + u3 * sin_d * cos_s) + d_comp = (-u1 * sin_d * sin_r + - u2 * sin_d * cos_r + - u3 * cos_d) + return n, e, d_comp + + +def _permute_ned(ned: tuple[float, float, float], axis: str) -> tuple[float, float, float]: + """Permute and sign a canonical NED triple according to a PROJ ``+axis=`` string. + + Each character of ``axis`` (length 3, characters in ``enwsud``) selects + which canonical component, with which sign, ends up in the output + position. For example ``axis='enu'`` produces ``(east, north, -down)``. + """ + n, e, d = ned + out = [0.0, 0.0, 0.0] + for i, ch in enumerate(axis): + if ch == "n": out[i] = n + elif ch == "s": out[i] = -n + elif ch == "e": out[i] = e + elif ch == "w": out[i] = -e + elif ch == "d": out[i] = d + elif ch == "u": out[i] = -d + else: + # Caller validated up-front, but keep this branch for safety. + raise ValueError(f"Invalid axis character: {ch!r}") + return out[0], out[1], out[2] + + +# --------------------------------------------------------------------------- +# proj-string helpers +# --------------------------------------------------------------------------- + +_AXIS_RE = re.compile(r"\+axis=(\S+)") + + +def _parse_axis(proj_string: str) -> str: + """Extract the ``+axis=`` value from a PROJ string, defaulting to ``"enu"``.""" + m = _AXIS_RE.search(proj_string) + return m.group(1).lower() if m else _DEFAULT_AXIS diff --git a/py-rconv/src/rconv/nrf.py b/py-rconv/src/rconv/nrf.py new file mode 100644 index 0000000..b8522eb --- /dev/null +++ b/py-rconv/src/rconv/nrf.py @@ -0,0 +1,240 @@ +"""Writer for the NetCDF Rupture Format (NRF). + +The NRF is the binary form of a kinematic rupture model consumed by SeisSol. +This module produces files that match the layout written by the reference C++ +``rconv`` tool closely enough that SeisSol's NRF reader treats them as +equivalent. See ``preprocessing/science/rconv/spec/nrf.cdl`` in the SeisSol +repository for the canonical schema. + +A note on units +--------------- + +SRF carries lengths in cm and times in s; the NRF schema requires SI lengths +(metres). This module performs the conversions explicitly: + +* depth (km) → m (via :class:`rconv.mapping.CoordinateMapping`) +* area (cm²) → m² (× 1e-4) +* shear modulus (g/(cm·s²)) → Pa = kg/(m·s²) (× 1e-1) +* slip rate (cm/s) → m/s (× 1e-2) + +A note on the ``Subfault_units`` compound +----------------------------------------- + +The C++ ``rconv`` attaches a compound-typed ``units`` attribute on +``subfaults`` (compound of seven VLEN strings). The Python ``netCDF4`` package +does not support strings inside compound types, so we attach the same +information as a plain-text attribute instead. SeisSol's reader never inspects +this attribute — it's pure metadata for tools like ``ncdump``. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Iterable, Sequence + +import netCDF4 +import numpy as np + +from .mapping import CoordinateMapping +from .srf import PointSource + +_log = logging.getLogger(__name__) + +# Unit-conversion constants (cgs → SI). +_CM2_TO_M2 = 1.0e-4 +_CGS_MU_TO_PA = 1.0e-1 +_CMS_TO_MS = 1.0e-2 + +# numpy dtypes matching the NRF compound types. +_VECTOR3_DTYPE = np.dtype([("x", "f8"), ("y", "f8"), ("z", "f8")]) +_SUBFAULT_DTYPE = np.dtype([ + ("tinit", "f8"), + ("timestep", "f8"), + ("mu", "f8"), + ("area", "f8"), + ("tan1", _VECTOR3_DTYPE), + ("tan2", _VECTOR3_DTYPE), + ("normal", _VECTOR3_DTYPE), +]) + +# Units metadata, mirroring rconv's Subfault_units compound attribute. +_SUBFAULT_UNITS_TEXT = ( + "tinit=s, timestep=s, mu=pascal, area=m^2, tan1=m, tan2=m, normal=m" +) + + +def write_nrf( + path: str | Path, + sources: Sequence[PointSource], + mapping: CoordinateMapping, + *, + normalize_onset: bool = False, +) -> None: + """Write a list of point sources to a NetCDF Rupture Format file. + + Parameters + ---------- + path: + Destination file path. Overwritten if it exists. + sources: + Point sources, as produced by :func:`rconv.srf.parse_srf`. Must be + non-empty. + mapping: + Coordinate mapping into the mesh CS. + normalize_onset: + If true, subtract the minimum ``tinit`` across all sources from every + source's onset. Useful when the SRF's absolute clock is meaningless. + """ + if not sources: + raise ValueError("write_nrf: empty source list") + + centres, subfaults, sroffsets, sliprates = _build_arrays( + sources, mapping, normalize_onset=normalize_onset, + ) + + path = Path(path) + # mode='w' clobbers — matches NC_CLOBBER in the C++ writer. + with netCDF4.Dataset(path, mode="w", format="NETCDF4") as ds: + _write_dataset(ds, centres, subfaults, sroffsets, sliprates) + + +# --------------------------------------------------------------------------- +# array assembly +# --------------------------------------------------------------------------- + +def _build_arrays( + sources: Sequence[PointSource], + mapping: CoordinateMapping, + *, + normalize_onset: bool, +) -> tuple[np.ndarray, np.ndarray, np.ndarray, tuple[np.ndarray, np.ndarray, np.ndarray]]: + n = len(sources) + + # Offset adjustment for onsets. + if normalize_onset: + min_tinit = min(s.tinit for s in sources) + _log.info("Normalising onset: minimal tinit %g s → 0.", min_tinit) + else: + min_tinit = 0.0 + + # Allocate output arrays. + centres = np.empty(n, dtype=_VECTOR3_DTYPE) + subfaults = np.empty(n, dtype=_SUBFAULT_DTYPE) + + # sroffsets layout: (n+1, 3). offsets[i][d] is the start index of source i + # in direction d's flat sliprate array; offsets[n][d] is the total length. + sroffsets = np.zeros((n + 1, 3), dtype=np.uint32) + + # Accumulate sliprate samples per direction. + sliprate_chunks: tuple[list[np.ndarray], list[np.ndarray], list[np.ndarray]] = ([], [], []) + + for i, src in enumerate(sources): + x, y, z = mapping.to_mesh(src.longitude, src.latitude, src.depth).as_tuple() + centres[i] = (x, y, z) + + # Three orthogonal unit vectors of the fault-local frame, transformed + # into the MCS. These define the slip basis in the mesh CS. + tan1 = mapping.to_mcs(src.strike, src.dip, src.rake, 1.0, 0.0, 0.0).as_tuple() + tan2 = mapping.to_mcs(src.strike, src.dip, src.rake, 0.0, 1.0, 0.0).as_tuple() + normal = mapping.to_mcs(src.strike, src.dip, src.rake, 0.0, 0.0, 1.0).as_tuple() + + subfaults[i] = ( + src.tinit - min_tinit, # tinit (s) + src.dt, # timestep (s) + src.shear_modulus * _CGS_MU_TO_PA, # mu (Pa) + src.area * _CM2_TO_M2, # area (m^2) + tan1, tan2, normal, + ) + + # Sliprate offsets: cumulative count per direction. + for d in range(3): + samples = np.asarray(src.slip_rates[d], dtype=np.float64) * _CMS_TO_MS + sliprate_chunks[d].append(samples) + sroffsets[i + 1, d] = sroffsets[i, d] + samples.size + + # Concatenate per direction. Empty list → zero-length array. + sliprates: tuple[np.ndarray, np.ndarray, np.ndarray] = tuple( + np.concatenate(chunks) if chunks else np.empty(0, dtype=np.float64) + for chunks in sliprate_chunks + ) # type: ignore[assignment] + + return centres, subfaults, sroffsets, sliprates + + +# --------------------------------------------------------------------------- +# NetCDF emission +# --------------------------------------------------------------------------- + +def _write_dataset( + ds: netCDF4.Dataset, + centres: np.ndarray, + subfaults: np.ndarray, + sroffsets: np.ndarray, + sliprates: tuple[np.ndarray, np.ndarray, np.ndarray], +) -> None: + # --- compound types --------------------------------------------------- + vec3_t = ds.createCompoundType(_VECTOR3_DTYPE, "Vector3") + sf_t = ds.createCompoundType(_SUBFAULT_DTYPE, "Subfault") + + # --- dimensions ------------------------------------------------------- + n = centres.shape[0] + ds.createDimension("source", n) + ds.createDimension("sroffset", n + 1) + ds.createDimension("direction", 3) + # netCDF4-python maps size=0 to UNLIMITED; this is a cosmetic difference + # from the C++ writer (which makes them fixed-size-0). The reader doesn't + # care. + ds.createDimension("sample1", sliprates[0].size) + ds.createDimension("sample2", sliprates[1].size) + ds.createDimension("sample3", sliprates[2].size) + + # --- variables -------------------------------------------------------- + centres_var = ds.createVariable("centres", vec3_t, ("source",)) + centres_var.units = "m" + centres_var[:] = centres + + subfaults_var = ds.createVariable( + "subfaults", sf_t, ("source",), zlib=True, complevel=1, shuffle=False, + ) + subfaults_var.units = _SUBFAULT_UNITS_TEXT + subfaults_var[:] = subfaults + + sroffsets_var = ds.createVariable( + "sroffsets", "u4", ("sroffset", "direction"), + zlib=True, complevel=1, shuffle=False, + ) + sroffsets_var[:] = sroffsets + + for d, name in enumerate(("sliprates1", "sliprates2", "sliprates3")): + dim = f"sample{d + 1}" + var = ds.createVariable( + name, "f8", (dim,), zlib=True, complevel=1, shuffle=False, + ) + var.units = "m/s" + if sliprates[d].size: + var[:] = sliprates[d] + + +# --------------------------------------------------------------------------- +# reader (for tests + round-trip verification) +# --------------------------------------------------------------------------- + +def read_nrf(path: str | Path) -> dict: + """Read an NRF file and return a dict of its arrays. + + Provided primarily for testing and round-trip verification; not intended + as a replacement for SeisSol's own consumer. The return value keys mirror + the variable names in the NRF schema: ``centres``, ``subfaults``, + ``sroffsets``, ``sliprates1/2/3``. + """ + path = Path(path) + with netCDF4.Dataset(path, mode="r") as ds: + return { + "centres": np.asarray(ds["centres"][:]), + "subfaults": np.asarray(ds["subfaults"][:]), + "sroffsets": np.asarray(ds["sroffsets"][:]), + "sliprates1": np.asarray(ds["sliprates1"][:]), + "sliprates2": np.asarray(ds["sliprates2"][:]), + "sliprates3": np.asarray(ds["sliprates3"][:]), + } diff --git a/py-rconv/src/rconv/srf.py b/py-rconv/src/rconv/srf.py new file mode 100644 index 0000000..8b39a9a --- /dev/null +++ b/py-rconv/src/rconv/srf.py @@ -0,0 +1,234 @@ +"""Parser for the Standard Rupture Format (SRF). + +Reference: https://strike.scec.org/scecpedia/Standard_Rupture_Format + +This module reads SRF v1.0 and v2.0 files. Both versions describe a kinematic +rupture model as a collection of point sources, each with a per-source time +history of slip rate in three orthogonal directions of the fault-local frame. + +The parser is tokenizer-based (rather than line-based), mirroring the C++ +``rconv`` behaviour: SRF files mix whitespace and newlines as separators within +both the per-point header lines and the slip-rate sample arrays. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from io import TextIOBase +from pathlib import Path +from typing import IO, Iterator + +_log = logging.getLogger(__name__) + + +@dataclass +class PointSource: + """A single SRF point source. + + Field units follow the SRF specification verbatim — converting them to SI + is the responsibility of downstream code (e.g. :func:`rconv.nrf.write_nrf`). + """ + + longitude: float #: degrees + latitude: float #: degrees + depth: float #: km, positive downwards + strike: float #: degrees + dip: float #: degrees + rake: float #: degrees + area: float #: cm^2 + tinit: float #: s, onset time of the source-time function + dt: float #: s, sample spacing of the slip-rate function + shear_modulus: float #: g/(cm*s^2); 0.0 if unknown (v1.0 always; v2.0 if vs or den missing) + slip_rates: tuple[list[float], list[float], list[float]] = field( + default_factory=lambda: ([], [], []) + ) #: cm/s, samples per fault-local direction (u1, u2, u3) + + +class SRFParseError(ValueError): + """Raised when an SRF file cannot be parsed.""" + + +def parse_srf(path: str | Path) -> list[PointSource]: + """Parse an SRF file and return the list of (non-empty) point sources. + + Point sources whose slip-rate samples are empty in all three directions are + dropped — they carry no information and would otherwise cause divide-by-zero + issues downstream. The number removed is logged at INFO level. + + Parameters + ---------- + path: + Path to the SRF file. + + Returns + ------- + list[PointSource] + The parsed point sources, in file order. + + Raises + ------ + SRFParseError + If the file has an unsupported version, an unknown block type, or is + malformed. + """ + path = Path(path) + with path.open("r") as fh: + return _parse_stream(fh, source=str(path)) + + +def _parse_stream(stream: IO[str] | TextIOBase, *, source: str = "") -> list[PointSource]: + tokens = _tokenize(stream) + + # First token: version. Accept floats within tolerance, as the C++ tool + # parses them with `in >> version` and then compares to 1.0 / 2.0 exactly. + try: + version = float(next(tokens)) + except StopIteration as exc: + raise SRFParseError(f"{source}: empty file") from exc + + if abs(version - 1.0) < 1e-6: + parse_point = _parse_point_v10 + elif abs(version - 2.0) < 1e-6: + parse_point = _parse_point_v20 + else: + raise SRFParseError(f"{source}: unsupported SRF version {version}") + + sources: list[PointSource] = [] + + for block in tokens: + if block == "POINTS": + n_points = int(next(tokens)) + for _ in range(n_points): + sources.append(parse_point(tokens)) + elif block == "PLANE": + # The PLANE block has 2 header lines per plane in v1.0/v2.0. The + # C++ tool skips them entirely; we do the same since the per-point + # records following carry all the information we actually need. + n_planes = int(next(tokens)) + _skip_plane(tokens, n_planes) + elif block.startswith("#"): + # Comments span until end of line; the tokenizer already strips them + # (see `_tokenize`), so this branch shouldn't normally fire — kept + # as a defensive fallback in case a `#` appears mid-stream. + continue + else: + raise SRFParseError(f"{source}: unknown block keyword {block!r}") + + # Drop point sources that contain no samples whatsoever. + keep = [s for s in sources if any(s.slip_rates)] + removed = len(sources) - len(keep) + if removed: + _log.info("Removed %d point source(s) with no samples in any direction.", removed) + return keep + + +# --------------------------------------------------------------------------- +# tokenizer +# --------------------------------------------------------------------------- + +def _tokenize(stream: IO[str] | TextIOBase) -> Iterator[str]: + """Yield whitespace-separated tokens from `stream`, stripping `#` comments. + + The SRF format treats newlines and spaces interchangeably within records, + so a flat token stream is the natural representation. Comments start with + `#` and extend to the end of the line, exactly as in the C++ rconv tool. + """ + for line in stream: + # Strip everything from the first `#` onward (comment to end-of-line). + hash_idx = line.find("#") + if hash_idx != -1: + line = line[:hash_idx] + yield from line.split() + + +# --------------------------------------------------------------------------- +# block readers +# --------------------------------------------------------------------------- + +def _skip_plane(tokens: Iterator[str], n_planes: int) -> None: + """Consume the two header records that follow a `PLANE n` block. + + Each PLANE block contains, per plane, two records of fixed length: + + line 1: ELON ELAT NSTK NDIP LEN WID + line 2: STK DIP DTOP SHYP DHYP + + That's 6 + 5 = 11 tokens per plane. We don't use these for point-source + conversion (the per-point records following carry their own geometry). + """ + _consume(tokens, 11 * n_planes) + + +def _parse_point_v10(tokens: Iterator[str]) -> PointSource: + # Line 1: LON LAT DEP STK DIP AREA TINIT DT + lon, lat, dep, stk, dip, area, tinit, dt = _consume_floats(tokens, 8) + # Line 2: RAKE SLIP1 NT1 SLIP2 NT2 SLIP3 NT3 + rake, _slip1, nt1, _slip2, nt2, _slip3, nt3 = _consume_floats(tokens, 7) + # v1.0 does not provide rho/vs, so shear modulus is unknown. + return _read_point( + tokens, + lon=lon, lat=lat, dep=dep, stk=stk, dip=dip, rake=rake, + area=area, tinit=tinit, dt=dt, shear_modulus=0.0, + nt=(int(nt1), int(nt2), int(nt3)), + ) + + +def _parse_point_v20(tokens: Iterator[str]) -> PointSource: + # Line 1: LON LAT DEP STK DIP AREA TINIT DT VS DEN + lon, lat, dep, stk, dip, area, tinit, dt, vs, den = _consume_floats(tokens, 10) + # Line 2: RAKE SLIP1 NT1 SLIP2 NT2 SLIP3 NT3 + rake, _slip1, nt1, _slip2, nt2, _slip3, nt3 = _consume_floats(tokens, 7) + # Shear modulus mu = rho * vs^2. Units: vs in cm/s (per SRF), den in g/cm^3 + # → mu in g/(cm*s^2). The C++ tool uses the same product and silently + # treats non-positive vs/den as "unknown" (mu = 0). + shear_modulus = vs * vs * den if vs > 0.0 and den > 0.0 else 0.0 + return _read_point( + tokens, + lon=lon, lat=lat, dep=dep, stk=stk, dip=dip, rake=rake, + area=area, tinit=tinit, dt=dt, shear_modulus=shear_modulus, + nt=(int(nt1), int(nt2), int(nt3)), + ) + + +def _read_point( + tokens: Iterator[str], + *, + lon: float, lat: float, dep: float, + stk: float, dip: float, rake: float, + area: float, tinit: float, dt: float, + shear_modulus: float, + nt: tuple[int, int, int], +) -> PointSource: + slip_rates = tuple(_consume_floats(tokens, n) for n in nt) # type: ignore[assignment] + return PointSource( + longitude=lon, latitude=lat, depth=dep, + strike=stk, dip=dip, rake=rake, + area=area, tinit=tinit, dt=dt, + shear_modulus=shear_modulus, + slip_rates=slip_rates, + ) + + +# --------------------------------------------------------------------------- +# small token helpers +# --------------------------------------------------------------------------- + +def _consume(tokens: Iterator[str], n: int) -> list[str]: + out: list[str] = [] + for _ in range(n): + try: + out.append(next(tokens)) + except StopIteration as exc: + raise SRFParseError( + f"unexpected end of file (wanted {n} tokens, got {len(out)})" + ) from exc + return out + + +def _consume_floats(tokens: Iterator[str], n: int) -> list[float]: + raw = _consume(tokens, n) + try: + return [float(x) for x in raw] + except ValueError as exc: + raise SRFParseError(f"expected {n} floats, got: {raw!r}") from exc diff --git a/py-rconv/src/rconv/xmf.py b/py-rconv/src/rconv/xmf.py new file mode 100644 index 0000000..5a59a68 --- /dev/null +++ b/py-rconv/src/rconv/xmf.py @@ -0,0 +1,107 @@ +"""Writer for XDMF/XMF visualisation output. + +Produces an XML file that ParaView (and any other XDMF-aware viewer) can open +to render the point cloud of SRF sources, decorated with the integrated slip +path length per source. + +The output mirrors the C++ ``rconv``'s ``writeXMF`` exactly in structure (a +``Polyvertex`` topology with inline XYZ geometry and a single ``Slip path +length (m)`` node attribute), modulo whitespace. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Sequence + +import numpy as np + +from .mapping import CoordinateMapping +from .srf import PointSource + +# cm/s → m/s on slip path: integration is in (cm/s)·s = cm, then /100 → m. +_CM_TO_M = 1.0e-2 + + +def write_xmf( + path: str | Path, + sources: Sequence[PointSource], + mapping: CoordinateMapping, +) -> None: + """Write an XDMF/XMF visualisation file for a list of point sources. + + Parameters + ---------- + path: + Destination file path. Overwritten if it exists. + sources: + Point sources, as produced by :func:`rconv.srf.parse_srf`. + mapping: + Coordinate mapping used to project source positions for display. + Typically a geocentric CRS for global views or the same MCS as for the + NRF for mesh-aligned visualisation. + """ + path = Path(path) + n = len(sources) + + with path.open("w") as out: + out.write('\n') + out.write('\n') + out.write("\n") + out.write(" \n") + out.write(f' \n') + out.write(' \n') + out.write( + f' \n' + ) + for src in sources: + x, y, z = mapping.to_mesh(src.longitude, src.latitude, src.depth).as_tuple() + out.write(f"{x} {y} {z}\n") + out.write(" \n") + out.write(" \n") + out.write(' \n') + out.write(' \n') + out.write(' \n') + _write_slip_attribute(out, sources) + out.write(" \n") + out.write(" \n") + out.write("\n") + + +def _write_slip_attribute(out, sources: Sequence[PointSource]) -> None: + """Write the per-source 'Slip path length (m)' attribute. + + Path length is the time-integral of the magnitude of the (vector) slip + rate, evaluated with the trapezoidal rule on the SRF sample grid. The C++ + ``rconv`` does the same thing. + """ + n = len(sources) + out.write(' \n') + out.write( + f' \n' + ) + for src in sources: + out.write(f"{_slip_path_length_m(src)}\n") + out.write(" \n") + out.write(" \n") + + +def _slip_path_length_m(src: PointSource) -> float: + """Trapezoidal integral of |slip rate vector| over time, in metres. + + Samples are padded with zeros if the per-direction sample counts differ + (which is permitted by SRF but uncommon in practice). + """ + max_steps = max((len(r) for r in src.slip_rates), default=0) + if max_steps == 0: + return 0.0 + # Pack into an (max_steps, 3) array, zero-padding short components. + sr = np.zeros((max_steps, 3), dtype=np.float64) + for d, samples in enumerate(src.slip_rates): + if samples: + sr[: len(samples), d] = samples + magnitudes = np.linalg.norm(sr, axis=1) # cm/s, per sample + # Trapezoidal rule with uniform spacing dt; numpy's trapezoid is exactly + # what the C++ implementation does pointwise. + slip_cm = float(np.trapezoid(magnitudes, dx=src.dt)) + return slip_cm * _CM_TO_M diff --git a/py-rconv/tests/conftest.py b/py-rconv/tests/conftest.py new file mode 100644 index 0000000..293d5d1 --- /dev/null +++ b/py-rconv/tests/conftest.py @@ -0,0 +1,49 @@ +"""Shared pytest fixtures.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +# Make tests/data importable as a module and src/ importable as the rconv pkg +HERE = Path(__file__).parent +sys.path.insert(0, str(HERE / "data")) +sys.path.insert(0, str(HERE.parent / "src")) + +import pytest # noqa: E402 (must come after sys.path edits) + +import synthetic # noqa: E402 (from tests/data/synthetic.py) + + +@pytest.fixture +def srf_v10_minimal(tmp_path: Path) -> Path: + return synthetic.write_srf(synthetic.SRF_V10_MINIMAL, tmp_path / "v10.srf") + + +@pytest.fixture +def srf_v20_two_dir(tmp_path: Path) -> Path: + return synthetic.write_srf(synthetic.SRF_V20_TWO_DIR, tmp_path / "v20.srf") + + +@pytest.fixture +def srf_v10_for_slip_integration(tmp_path: Path) -> Path: + return synthetic.write_srf( + synthetic.SRF_V10_FOR_SLIP_INTEGRATION, tmp_path / "slip.srf", + ) + + +@pytest.fixture +def northridge_srf() -> Path: + """Real-world Northridge SRF file, if available on this system. + + Skips the test if the file is not present; it lives in the SeisSol + Examples repository. + """ + candidates = [ + Path("/home/claude/Examples/Northridge/nr6.70-s0000-h0000.txt"), + Path(__file__).parent / "data" / "northridge.srf", + ] + for p in candidates: + if p.exists(): + return p + pytest.skip("Northridge SRF file not available; clone SeisSol/Examples to enable.") diff --git a/py-rconv/tests/synthetic.py b/py-rconv/tests/synthetic.py new file mode 100644 index 0000000..5bf9a0a --- /dev/null +++ b/py-rconv/tests/synthetic.py @@ -0,0 +1,58 @@ +"""Helpers to build deterministic synthetic SRF fixtures inline in tests. + +Keeping fixtures generated by code rather than checked in as files is helpful +when tweaking the format: every field is visible and the relationship between +inputs and expected outputs is explicit. +""" + +from __future__ import annotations + +from pathlib import Path +from textwrap import dedent + +# A minimal valid v1.0 SRF with one POINTS block of 2 sources. The second one +# is intentionally empty (zero samples in all three directions); the parser +# must drop it. +SRF_V10_MINIMAL = dedent("""\ + 1.0 + POINTS 2 + -118.5 34.3 5.0 90.0 60.0 1.0e8 0.5 0.01 + 180.0 100.0 4 0.0 0 0.0 0 + 0.0 1.0 2.0 0.0 + -118.6 34.4 6.0 90.0 60.0 1.0e8 0.5 0.01 + 180.0 0.0 0 0.0 0 0.0 0 + """) + +# A v2.0 SRF with one source that exercises: +# * non-zero vs and den (so shear modulus is non-trivial) +# * non-zero slip in two of three directions +# * an explicit PLANE block (must be skipped) +# * a comment line (must be ignored) +SRF_V20_TWO_DIR = dedent("""\ + 2.0 + # comment line that should be ignored + PLANE 1 + -118.5 34.3 4 4 8.0 8.0 + 90.0 60.0 0.0 4.0 4.0 + POINTS 1 + -118.5 34.3 5.0 90.0 60.0 1.0e8 0.0 0.01 350000.0 2.7 + 45.0 1.0 3 0.0 2 0.0 0 + 1.0 2.0 1.5 + 0.5 0.25 + """) + +# v1.0 with a single source containing five samples in u1, used to validate +# slip path integration in the XDMF writer. +SRF_V10_FOR_SLIP_INTEGRATION = dedent("""\ + 1.0 + POINTS 1 + 0.0 0.0 1.0 0.0 90.0 1.0e4 0.0 0.5 + 0.0 4.0 5 0.0 0 0.0 0 + 1.0 2.0 3.0 2.0 1.0 + """) + + +def write_srf(text: str, dst: Path) -> Path: + """Write ``text`` to ``dst`` and return ``dst`` for convenience.""" + dst.write_text(text) + return dst diff --git a/py-rconv/tests/test_cli.py b/py-rconv/tests/test_cli.py new file mode 100644 index 0000000..69031de --- /dev/null +++ b/py-rconv/tests/test_cli.py @@ -0,0 +1,112 @@ +"""Tests for the command-line interface.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from rconv.cli import main +from rconv.nrf import read_nrf + + +class TestArgparse: + def test_no_command_exits_nonzero(self, capsys: pytest.CaptureFixture[str]) -> None: + # argparse "required=True" on the subparsers slot makes invocation + # without a subcommand fail with exit code 2. + with pytest.raises(SystemExit) as exc: + main([]) + assert exc.value.code == 2 + + def test_unknown_subcommand_fails(self) -> None: + with pytest.raises(SystemExit) as exc: + main(["frobnicate", "a", "b"]) + assert exc.value.code == 2 + + def test_to_nrf_requires_mcs(self, tmp_path: Path) -> None: + with pytest.raises(SystemExit) as exc: + main(["to-nrf", str(tmp_path / "x.srf"), str(tmp_path / "y.nrf")]) + assert exc.value.code == 2 + + +class TestEndToEnd: + def test_to_nrf_writes_file( + self, tmp_path: Path, srf_v10_minimal: Path, + ) -> None: + out = tmp_path / "out.nrf" + code = main([ + "to-nrf", str(srf_v10_minimal), str(out), + "--mcs", "+proj=tmerc +datum=WGS84 +k=0.9996 " + "+lon_0=-118.5150 +lat_0=34.3440 +units=m +axis=enu", + ]) + assert code == 0 + assert out.exists() + data = read_nrf(out) + # The fixture's first source is kept, the empty one is dropped. + assert data["centres"].size == 1 + + def test_to_nrf_normalize_onset( + self, tmp_path: Path, srf_v10_minimal: Path, + ) -> None: + out = tmp_path / "out.nrf" + code = main([ + "to-nrf", str(srf_v10_minimal), str(out), + "--mcs", "+proj=tmerc +datum=WGS84 +k=0.9996 " + "+lon_0=-118.5150 +lat_0=34.3440 +units=m +axis=enu", + "--normalize-onset", + ]) + assert code == 0 + data = read_nrf(out) + # Single source survives → min tinit = its own tinit → result 0. + assert data["subfaults"][0]["tinit"] == pytest.approx(0.0) + + def test_to_xdmf_writes_file( + self, tmp_path: Path, srf_v10_minimal: Path, + ) -> None: + out = tmp_path / "out.xmf" + code = main(["to-xdmf", str(srf_v10_minimal), str(out)]) + assert code == 0 + assert out.exists() + text = out.read_text() + assert "" in text + assert "Polyvertex" in text + + def test_to_xdmf_default_vcs_is_geocentric( + self, tmp_path: Path, srf_v10_minimal: Path, + ) -> None: + # The default VCS is geocentric; result coordinates should have + # magnitude ~ Earth radius. + import re + out = tmp_path / "out.xmf" + code = main(["to-xdmf", str(srf_v10_minimal), str(out)]) + assert code == 0 + # First float triple after the geometry DataItem opening. + m = re.search( + r'Geometry.*?XML">\s*([-\d.eE]+)\s+([-\d.eE]+)\s+([-\d.eE]+)', + out.read_text(), re.DOTALL, + ) + assert m is not None + x, y, z = map(float, m.groups()) + r = (x * x + y * y + z * z) ** 0.5 + assert 6.3e6 < r < 6.4e6 + + +class TestEmptyInput: + def test_no_sources_returns_error( + self, tmp_path: Path, + ) -> None: + # An SRF whose sole point source has no samples in any direction + # parses to an empty list and should be a CLI error rather than an + # unhelpful crash. + srf = tmp_path / "empty.srf" + srf.write_text( + "1.0\n" + "POINTS 1\n" + "0 0 1 0 90 1e8 0 0.1\n" + "0 0 0 0 0 0 0\n" + ) + code = main([ + "to-nrf", str(srf), str(tmp_path / "out.nrf"), + "--mcs", "+proj=tmerc +datum=WGS84 +k=0.9996 +lon_0=0 +lat_0=0 +units=m", + ]) + assert code == 1 diff --git a/py-rconv/tests/test_mapping.py b/py-rconv/tests/test_mapping.py new file mode 100644 index 0000000..c95ca9b --- /dev/null +++ b/py-rconv/tests/test_mapping.py @@ -0,0 +1,205 @@ +"""Tests for :mod:`rconv.mapping`.""" + +from __future__ import annotations + +import logging +import math + +import pytest + +from rconv.mapping import ( + CoordinateMapping, + Vec3, + _parse_axis, + _permute_ned, + _strike_dip_rake_to_ned, +) + + +class TestAxisParsing: + @pytest.mark.parametrize( + "proj, expected", + [ + ("+proj=utm +zone=10 +axis=ned", "ned"), + ("+proj=utm +zone=10", "enu"), # default + ("+axis=SeU +proj=tmerc", "seu"), # case-insensitive + ("+proj=geocent +axis=enu +datum=WGS84", "enu"), + ], + ) + def test_parse_axis(self, proj: str, expected: str) -> None: + assert _parse_axis(proj) == expected + + def test_invalid_axis_chars_rejected_by_proj(self) -> None: + # PROJ itself rejects unrecognised axis characters before our own + # check fires, but the effect is the same — construction fails. + with pytest.raises(Exception, match="(?i)axis|proj"): + CoordinateMapping("+proj=utm +zone=10 +datum=WGS84 +units=m +axis=xyz") + + def test_permute_ned_rejects_bad_char(self) -> None: + # _permute_ned has its own guard for defence in depth. + with pytest.raises(ValueError, match="Invalid axis character"): + _permute_ned((1.0, 2.0, 3.0), "enx") + + +class TestPermutationLow: + """Unit tests for the bare ``_permute_ned`` helper.""" + + def test_default_enu(self) -> None: + # NED=(1,2,3) → ENU=(2, 1, -3) + assert _permute_ned((1.0, 2.0, 3.0), "enu") == (2.0, 1.0, -3.0) + + def test_ned_identity(self) -> None: + assert _permute_ned((1.0, 2.0, 3.0), "ned") == (1.0, 2.0, 3.0) + + def test_seu(self) -> None: + # NED=(1,2,3) → SEU=(-N, E, -D) = (-1, 2, -3) + assert _permute_ned((1.0, 2.0, 3.0), "seu") == (-1.0, 2.0, -3.0) + + def test_all_negations(self) -> None: + assert _permute_ned((1.0, 2.0, 3.0), "wsu") == (-2.0, -1.0, -3.0) + + +class TestRotationBasis: + """Validate :func:`_strike_dip_rake_to_ned` produces a right-handed basis. + + With strike=0 (along north), dip=90 (vertical fault), rake=0 (pure + strike-slip), the SRF fault-local basis maps into NED as: + * u1 (along strike) → (1, 0, 0) (= north) + * u2 (orthogonal, in plane) → (0, 0, -1) (= up; right-handed with u1, u3) + * u3 (fault normal) → (0, 1, 0) (= east) + + Notes on u2: the SRF docs describe u2 as "orthogonal to the strike + direction but lies in the fault plane" without fixing the sign. The C++ + rconv's rotation formula realises u2 as the *up-dip* direction (so + {u1, u2, u3} is right-handed with u3 pointing outward), which is what we + reproduce here verbatim. + """ + + def test_u1_is_strike(self) -> None: + n, e, d = _strike_dip_rake_to_ned(0, 90, 0, 1, 0, 0) + assert (n, e, d) == pytest.approx((1.0, 0.0, 0.0), abs=1e-12) + + def test_u2_is_updip_for_vertical_fault(self) -> None: + n, e, d = _strike_dip_rake_to_ned(0, 90, 0, 0, 1, 0) + # d = -1 means upwards in NED. + assert (n, e, d) == pytest.approx((0.0, 0.0, -1.0), abs=1e-12) + + def test_u3_is_normal(self) -> None: + n, e, d = _strike_dip_rake_to_ned(0, 90, 0, 0, 0, 1) + assert (n, e, d) == pytest.approx((0.0, 1.0, 0.0), abs=1e-12) + + def test_basis_is_orthonormal(self) -> None: + # For any strike/dip/rake, {u1, u2, u3} → an orthonormal triple in NED. + import numpy as np + for s, d, r in [(0, 90, 0), (150, 90, 0), (122, 40, 90), (45, 30, 60)]: + vs = [ + _strike_dip_rake_to_ned(s, d, r, *e) + for e in [(1, 0, 0), (0, 1, 0), (0, 0, 1)] + ] + m = np.array(vs) + # Should satisfy m @ m.T ≈ I (i.e. an orthogonal matrix). + assert np.allclose(m @ m.T, np.eye(3), atol=1e-12) + + +class TestRotationDocExample: + """Match the documentation example exactly. + + The SeisSol docs (standard-rupture-format.html) give expected u1/u2/u3 in + an MCS with ``+axis=ned`` for a vertical strike-slip fault striking 150° + (the docs show only the resulting vectors, not the source angles; the + angles below reproduce those vectors exactly under the C++ formula): + + u_1 = {-0.866025403784439, 0.5, 0} + u_2 = { 0, 0, -1} + u_3 = {-0.5, -0.866025403784439, 0} + """ + + @pytest.fixture + def mapping(self) -> CoordinateMapping: + # +axis=ned: output is (north, east, down). + return CoordinateMapping("+proj=lonlat +datum=WGS84 +units=m +axis=ned") + + def test_u1(self, mapping: CoordinateMapping) -> None: + v = mapping.to_mcs(150, 90, 0, 1, 0, 0) + assert v.as_tuple() == pytest.approx( + (-0.866025403784439, 0.5, 0.0), abs=1e-12, + ) + + def test_u2(self, mapping: CoordinateMapping) -> None: + v = mapping.to_mcs(150, 90, 0, 0, 1, 0) + assert v.as_tuple() == pytest.approx((0.0, 0.0, -1.0), abs=1e-12) + + def test_u3(self, mapping: CoordinateMapping) -> None: + v = mapping.to_mcs(150, 90, 0, 0, 0, 1) + assert v.as_tuple() == pytest.approx( + (-0.5, -0.866025403784439, 0.0), abs=1e-12, + ) + + +class TestToMesh: + """Geographic → MCS coordinate transforms.""" + + def test_projected_tmerc_enu(self) -> None: + # tmerc with +axis=enu: the result must be (east, north, up). + # depth_km > 0 means below the surface → z should be negative. + m = CoordinateMapping( + "+proj=tmerc +datum=WGS84 +k=0.9996 " + "+lon_0=-118.5150 +lat_0=34.3440 +units=m +axis=enu" + ) + v = m.to_mesh(-118.6049, 34.3864, 5.3214) + # east < 0 (we're west of the origin), north > 0 (we're north of it). + assert v.x < 0 + assert v.y > 0 + # depth converted to metres and signed for "up": -5321.4 m exactly. + assert v.z == pytest.approx(-5321.4) + + def test_projected_ned_swaps_xy_and_flips_z(self) -> None: + # +axis=ned: the (east, north) we got with +enu should swap and + # the z should flip sign. + m_enu = CoordinateMapping( + "+proj=tmerc +datum=WGS84 +k=0.9996 " + "+lon_0=-118.5150 +lat_0=34.3440 +units=m +axis=enu" + ) + m_ned = CoordinateMapping( + "+proj=tmerc +datum=WGS84 +k=0.9996 " + "+lon_0=-118.5150 +lat_0=34.3440 +units=m +axis=ned" + ) + v_enu = m_enu.to_mesh(-118.6049, 34.3864, 5.3214) + v_ned = m_ned.to_mesh(-118.6049, 34.3864, 5.3214) + assert v_ned.x == pytest.approx(v_enu.y) # ned.x = north = enu.y + assert v_ned.y == pytest.approx(v_enu.x) # ned.y = east = enu.x + assert v_ned.z == pytest.approx(-v_enu.z) # flipped + + def test_geocentric_is_handled(self) -> None: + # A geocentric MCS is delegated to pyproj entirely. Sanity-check that + # the output has plausible magnitude (~ Earth radius, 6.3e6 m). + m = CoordinateMapping("+proj=geocent +datum=WGS84 +units=m +no_defs") + v = m.to_mesh(-118.6049, 34.3864, 5.3214) + r = math.sqrt(v.x ** 2 + v.y ** 2 + v.z ** 2) + assert 6.3e6 < r < 6.4e6 + # Latitude is +34°, so the geocentric Z must be positive. + assert v.z > 0 + + def test_depth_zero_means_surface(self) -> None: + # With depth=0, z must be 0 regardless of axis (modulo geocentric). + m = CoordinateMapping( + "+proj=tmerc +datum=WGS84 +k=0.9996 " + "+lon_0=0 +lat_0=0 +units=m +axis=enu" + ) + v = m.to_mesh(0.0, 0.0, 0.0) + assert v.z == pytest.approx(0.0) + + +class TestUnitWarnings: + def test_non_metre_unit_logs_warning(self, caplog: pytest.LogCaptureFixture) -> None: + with caplog.at_level(logging.WARNING, logger="rconv.mapping"): + # `+proj=lonlat` axes are reported in degrees, which trips the + # warning. + CoordinateMapping("+proj=lonlat +datum=WGS84 +units=m +axis=ned") + warnings = [r for r in caplog.records if r.levelno == logging.WARNING] + assert any("metre" in r.message for r in warnings) + + +class TestVec3: + def test_as_tuple(self) -> None: + assert Vec3(1.0, 2.0, 3.0).as_tuple() == (1.0, 2.0, 3.0) diff --git a/py-rconv/tests/test_nrf.py b/py-rconv/tests/test_nrf.py new file mode 100644 index 0000000..e5573a0 --- /dev/null +++ b/py-rconv/tests/test_nrf.py @@ -0,0 +1,199 @@ +"""Tests for :mod:`rconv.nrf`: round-trip writes, unit conversions, structure.""" + +from __future__ import annotations + +import subprocess +from pathlib import Path + +import numpy as np +import pytest + +from rconv import CoordinateMapping, parse_srf, read_nrf, write_nrf +from rconv.srf import PointSource + + +@pytest.fixture +def mapping() -> CoordinateMapping: + return CoordinateMapping( + "+proj=tmerc +datum=WGS84 +k=0.9996 " + "+lon_0=-118.5150 +lat_0=34.3440 +units=m +axis=enu" + ) + + +@pytest.fixture +def single_source() -> PointSource: + # Hand-crafted source where every quantity is non-zero so unit conversions + # are easy to check. + return PointSource( + longitude=-118.5, + latitude=34.3, + depth=5.0, # km + strike=0.0, dip=90.0, rake=0.0, + area=1.0e8, # cm^2 → expect 1e4 m^2 + tinit=1.0, dt=0.5, + shear_modulus=3.0e10, # g/(cm*s^2) → expect 3e9 Pa + slip_rates=([100.0, 50.0], [], []), # cm/s → expect [1.0, 0.5] m/s + ) + + +class TestRefuseEmpty: + def test_empty_sources_raises(self, tmp_path: Path, mapping: CoordinateMapping) -> None: + with pytest.raises(ValueError, match="empty source list"): + write_nrf(tmp_path / "out.nrf", [], mapping) + + +class TestUnitConversion: + def test_area_cm2_to_m2( + self, tmp_path: Path, mapping: CoordinateMapping, single_source: PointSource, + ) -> None: + out = tmp_path / "out.nrf" + write_nrf(out, [single_source], mapping) + data = read_nrf(out) + # area: 1e8 cm^2 = 1e4 m^2 + assert data["subfaults"][0]["area"] == pytest.approx(1.0e4) + + def test_shear_modulus_cgs_to_pa( + self, tmp_path: Path, mapping: CoordinateMapping, single_source: PointSource, + ) -> None: + out = tmp_path / "out.nrf" + write_nrf(out, [single_source], mapping) + data = read_nrf(out) + # mu: 3e10 g/(cm*s^2) = 3e9 Pa + assert data["subfaults"][0]["mu"] == pytest.approx(3.0e9) + + def test_sliprate_cm_s_to_m_s( + self, tmp_path: Path, mapping: CoordinateMapping, single_source: PointSource, + ) -> None: + out = tmp_path / "out.nrf" + write_nrf(out, [single_source], mapping) + data = read_nrf(out) + # [100, 50] cm/s → [1.0, 0.5] m/s + assert list(data["sliprates1"]) == pytest.approx([1.0, 0.5]) + + def test_depth_km_to_m( + self, tmp_path: Path, mapping: CoordinateMapping, single_source: PointSource, + ) -> None: + out = tmp_path / "out.nrf" + write_nrf(out, [single_source], mapping) + data = read_nrf(out) + # Depth 5.0 km, axis=enu (z=up) → z = -5000 m + assert data["centres"][0]["z"] == pytest.approx(-5000.0) + + +class TestSroffsets: + """The ``sroffsets`` array must be the cumulative count per direction.""" + + def test_cumulative_per_direction( + self, tmp_path: Path, mapping: CoordinateMapping, + ) -> None: + sources = [ + PointSource(0, 0, 1, 0, 90, 0, 1e8, 0.0, 0.1, 0.0, ([1, 2], [3], [])), + PointSource(0, 0, 1, 0, 90, 0, 1e8, 0.0, 0.1, 0.0, ([4, 5, 6], [], [7, 8])), + PointSource(0, 0, 1, 0, 90, 0, 1e8, 0.0, 0.1, 0.0, ([9], [], [])), + ] + out = tmp_path / "out.nrf" + write_nrf(out, sources, mapping) + data = read_nrf(out) + # Direction 0: 2 + 3 + 1 = 6 samples → offsets 0, 2, 5, 6 + # Direction 1: 1 + 0 + 0 = 1 samples → offsets 0, 1, 1, 1 + # Direction 2: 0 + 2 + 0 = 2 samples → offsets 0, 0, 2, 2 + expected = np.array([ + [0, 0, 0], + [2, 1, 0], + [5, 1, 2], + [6, 1, 2], + ], dtype=np.uint32) + assert np.array_equal(data["sroffsets"], expected) + + def test_final_offset_equals_sliprate_length( + self, tmp_path: Path, mapping: CoordinateMapping, single_source: PointSource, + ) -> None: + out = tmp_path / "out.nrf" + write_nrf(out, [single_source], mapping) + data = read_nrf(out) + # For n sources, sroffsets[n, d] must equal sliprates{d+1}.size. + n = data["centres"].size + assert data["sroffsets"][n, 0] == data["sliprates1"].size + assert data["sroffsets"][n, 1] == data["sliprates2"].size + assert data["sroffsets"][n, 2] == data["sliprates3"].size + + +class TestNormalizeOnset: + def test_subtracts_minimum_tinit( + self, tmp_path: Path, mapping: CoordinateMapping, + ) -> None: + # PointSource positional order: + # lon, lat, depth, strike, dip, rake, area, tinit, dt, shear_modulus, slip_rates + sources = [ + PointSource(0, 0, 1, 0, 90, 0, 1e8, 5.0, 0.1, 0.0, ([1.0], [], [])), + PointSource(0, 0, 1, 0, 90, 0, 1e8, 7.5, 0.1, 0.0, ([1.0], [], [])), + PointSource(0, 0, 1, 0, 90, 0, 1e8, 3.0, 0.1, 0.0, ([1.0], [], [])), # min + ] + out = tmp_path / "out.nrf" + write_nrf(out, sources, mapping, normalize_onset=True) + data = read_nrf(out) + assert data["subfaults"][0]["tinit"] == pytest.approx(2.0) + assert data["subfaults"][1]["tinit"] == pytest.approx(4.5) + assert data["subfaults"][2]["tinit"] == pytest.approx(0.0) + + def test_off_by_default( + self, tmp_path: Path, mapping: CoordinateMapping, single_source: PointSource, + ) -> None: + out = tmp_path / "out.nrf" + write_nrf(out, [single_source], mapping) + data = read_nrf(out) + # Default — onsets pass through unmodified. + assert data["subfaults"][0]["tinit"] == pytest.approx(1.0) + + +class TestStructure: + """Smoke tests for the on-disk file structure via ncdump.""" + + def test_compound_types_present( + self, tmp_path: Path, mapping: CoordinateMapping, single_source: PointSource, + ) -> None: + if not _have_ncdump(): + pytest.skip("ncdump not on PATH") + out = tmp_path / "out.nrf" + write_nrf(out, [single_source], mapping) + header = subprocess.check_output( + ["ncdump", "-h", str(out)], text=True, + ) + assert "compound Vector3" in header + assert "compound Subfault" in header + # Expected vars + for var in ("centres", "subfaults", "sroffsets", + "sliprates1", "sliprates2", "sliprates3"): + assert var in header + + +class TestRealistic: + def test_northridge_round_trip( + self, tmp_path: Path, northridge_srf: Path, mapping: CoordinateMapping, + ) -> None: + sources = parse_srf(northridge_srf) + out = tmp_path / "northridge.nrf" + write_nrf(out, sources, mapping) + data = read_nrf(out) + assert data["centres"].size == 474 + assert data["sroffsets"].shape == (475, 3) + # All depths positive in SRF → all z negative in ENU output. + assert (data["centres"]["z"] <= 0).all() + # Every source's tan1 / tan2 / normal should be a unit vector triple. + for sf in data["subfaults"]: + for name in ("tan1", "tan2", "normal"): + v = sf[name] + norm = (v["x"] ** 2 + v["y"] ** 2 + v["z"] ** 2) ** 0.5 + assert norm == pytest.approx(1.0, abs=1e-10) + + +def _have_ncdump() -> bool: + try: + # ncdump prints usage and exits 0 when given no args. + subprocess.run( + ["ncdump"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, + check=False, + ) + return True + except FileNotFoundError: + return False diff --git a/py-rconv/tests/test_srf.py b/py-rconv/tests/test_srf.py new file mode 100644 index 0000000..7c88a06 --- /dev/null +++ b/py-rconv/tests/test_srf.py @@ -0,0 +1,108 @@ +"""Tests for the SRF parser.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from rconv.srf import PointSource, SRFParseError, parse_srf + + +class TestVersionDispatch: + def test_unsupported_version_raises(self, tmp_path: Path) -> None: + bad = tmp_path / "bad.srf" + bad.write_text("0.5\nPOINTS 0\n") + with pytest.raises(SRFParseError, match="unsupported SRF version"): + parse_srf(bad) + + def test_empty_file_raises(self, tmp_path: Path) -> None: + empty = tmp_path / "empty.srf" + empty.write_text("") + with pytest.raises(SRFParseError, match="empty file"): + parse_srf(empty) + + +class TestV10: + def test_minimal_parse_drops_empty_sources(self, srf_v10_minimal: Path) -> None: + # The minimal fixture has 2 sources, the second of which carries no + # slip-rate samples in any direction → must be dropped. + sources = parse_srf(srf_v10_minimal) + assert len(sources) == 1 + s = sources[0] + assert s.longitude == -118.5 + assert s.latitude == 34.3 + assert s.depth == 5.0 + + def test_v10_has_no_shear_modulus(self, srf_v10_minimal: Path) -> None: + # v1.0 doesn't carry vs/den, so shear modulus stays 0 (unknown). + s = parse_srf(srf_v10_minimal)[0] + assert s.shear_modulus == 0.0 + + def test_v10_samples_have_expected_counts(self, srf_v10_minimal: Path) -> None: + s = parse_srf(srf_v10_minimal)[0] + assert len(s.slip_rates[0]) == 4 + assert len(s.slip_rates[1]) == 0 + assert len(s.slip_rates[2]) == 0 + + def test_v10_sample_values(self, srf_v10_minimal: Path) -> None: + s = parse_srf(srf_v10_minimal)[0] + assert s.slip_rates[0] == [0.0, 1.0, 2.0, 0.0] + + +class TestV20: + def test_v20_shear_modulus_from_vs_den(self, srf_v20_two_dir: Path) -> None: + # vs=350000 cm/s, den=2.7 g/cm^3 → mu = vs^2 * den + s = parse_srf(srf_v20_two_dir)[0] + expected = 350000.0 ** 2 * 2.7 + assert s.shear_modulus == pytest.approx(expected) + + def test_v20_plane_block_skipped(self, srf_v20_two_dir: Path) -> None: + # If the PLANE block were not consumed correctly the subsequent POINTS + # parse would fail or produce garbage. Reaching here with the right + # number of sources is the assertion. + sources = parse_srf(srf_v20_two_dir) + assert len(sources) == 1 + # Comment line was also handled correctly. + assert sources[0].latitude == 34.3 + + def test_v20_two_direction_samples(self, srf_v20_two_dir: Path) -> None: + s = parse_srf(srf_v20_two_dir)[0] + assert s.slip_rates[0] == [1.0, 2.0, 1.5] + assert s.slip_rates[1] == [0.5, 0.25] + assert s.slip_rates[2] == [] + + +class TestV20EdgeCases: + def test_v20_zero_vs_or_den_gives_unknown_mu(self, tmp_path: Path) -> None: + # The C++ tool treats vs=0 or den=0 as "unknown" → mu = 0. + srf = tmp_path / "v20_zero.srf" + srf.write_text( + "2.0\n" + "POINTS 1\n" + "0.0 0.0 1.0 0.0 90.0 1.0e4 0.0 0.01 0.0 2.7\n" # vs=0 + "0.0 1.0 1 0.0 0 0.0 0\n" + "1.0\n" + ) + s = parse_srf(srf)[0] + assert s.shear_modulus == 0.0 + + +class TestRealistic: + def test_northridge_loads(self, northridge_srf: Path) -> None: + # The Northridge SRF has 500 raw point sources, 26 of which are + # empty placeholders the parser must drop. + sources = parse_srf(northridge_srf) + assert len(sources) == 474 + + def test_northridge_first_source(self, northridge_srf: Path) -> None: + s = parse_srf(northridge_srf)[0] + # Values straight from the SRF; if any of these change the parser is + # mis-reading the file. + assert s.longitude == pytest.approx(-118.6049) + assert s.latitude == pytest.approx(34.3864) + assert s.depth == pytest.approx(5.3214) + assert s.strike == pytest.approx(122.0) + assert s.dip == pytest.approx(40.0) + assert s.rake == pytest.approx(90.0) + assert len(s.slip_rates[0]) == 16 diff --git a/py-rconv/tests/test_xmf.py b/py-rconv/tests/test_xmf.py new file mode 100644 index 0000000..a0e64a8 --- /dev/null +++ b/py-rconv/tests/test_xmf.py @@ -0,0 +1,92 @@ +"""Tests for :mod:`rconv.xmf`.""" + +from __future__ import annotations + +import xml.etree.ElementTree as ET +from pathlib import Path + +import numpy as np +import pytest + +from rconv import CoordinateMapping, parse_srf, write_xmf +from rconv.xmf import _slip_path_length_m + + +@pytest.fixture +def mapping() -> CoordinateMapping: + return CoordinateMapping("+proj=geocent +datum=WGS84 +units=m +no_defs") + + +class TestStructure: + def test_is_valid_xml( + self, tmp_path: Path, srf_v10_for_slip_integration: Path, mapping: CoordinateMapping, + ) -> None: + sources = parse_srf(srf_v10_for_slip_integration) + out = tmp_path / "out.xmf" + write_xmf(out, sources, mapping) + # ET parses xmf even with the DOCTYPE present. + tree = ET.parse(out) + root = tree.getroot() + assert root.tag == "Xdmf" + + def test_polyvertex_count_matches( + self, tmp_path: Path, northridge_srf: Path, mapping: CoordinateMapping, + ) -> None: + sources = parse_srf(northridge_srf) + out = tmp_path / "nr.xmf" + write_xmf(out, sources, mapping) + tree = ET.parse(out) + topo = tree.getroot().find(".//Topology") + assert topo is not None + assert topo.get("Type") == "Polyvertex" + assert int(topo.get("NumberOfElements", "0")) == len(sources) + + def test_has_slip_attribute( + self, tmp_path: Path, srf_v10_for_slip_integration: Path, mapping: CoordinateMapping, + ) -> None: + sources = parse_srf(srf_v10_for_slip_integration) + out = tmp_path / "out.xmf" + write_xmf(out, sources, mapping) + tree = ET.parse(out) + attr = tree.getroot().find(".//Attribute") + assert attr is not None + assert attr.get("Name") == "Slip path length (m)" + assert attr.get("Center") == "Node" + + +class TestSlipPathLength: + """The XDMF 'Slip path length' attribute is the trapezoidal integral of + the slip-rate magnitude over time, then converted from cm to m.""" + + def test_trapezoid_known_value( + self, srf_v10_for_slip_integration: Path, + ) -> None: + sources = parse_srf(srf_v10_for_slip_integration) + s = sources[0] + # The fixture has u1 = [1, 2, 3, 2, 1] cm/s, dt = 0.5 s. + # Trapezoidal integral with dx=0.5 on [1,2,3,2,1] = 4.0 cm = 0.04 m. + expected_cm = float(np.trapezoid([1.0, 2.0, 3.0, 2.0, 1.0], dx=0.5)) + assert expected_cm == pytest.approx(4.0) + assert _slip_path_length_m(s) == pytest.approx(0.04) + + def test_empty_source_yields_zero(self) -> None: + from rconv.srf import PointSource + empty = PointSource( + longitude=0, latitude=0, depth=0, + strike=0, dip=90, rake=0, + area=1e8, tinit=0, dt=0.1, shear_modulus=0, + slip_rates=([], [], []), + ) + assert _slip_path_length_m(empty) == 0.0 + + def test_multi_direction_magnitude(self) -> None: + # If u1=[3,3] and u3=[4,4] cm/s, the vector magnitude is 5 at every + # sample. Trapezoid with dx=1.0 over 2 samples = 5*1.0 = 5 cm = 0.05 m. + from rconv.srf import PointSource + s = PointSource( + longitude=0, latitude=0, depth=0, + strike=0, dip=90, rake=0, + area=1e8, tinit=0, dt=1.0, shear_modulus=0, + slip_rates=([3.0, 3.0], [], [4.0, 4.0]), + ) + assert _slip_path_length_m(s) == pytest.approx(0.05)