Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .agent/rules/environment_rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,9 @@ description: Strict environment and dependency rules for PyKoopman development (
## 7. Documentation
- **README Format**: Use Markdown (`README.md`), not RST.
- **Version Bumps**: Update version in `pyproject.toml` only; build artifacts auto-update.

## 8. Testing Best Practices (Update 2026-01)
- **Matplotlib**: Tests involving `matplotlib` or `mpl_toolkits` MUST explicitly lock the backend to non-interactive (e.g., `MPLBACKEND=Agg`) or mock `plt.show()` to prevent hangs on Windows/CI.
- *Note*: `torus_dynamics` tests in `common` are skipped due to persistent backend conflicts in `pytest` environments despite code correctness.
- **Coverage & Extensions**: Be cautious when running full coverage (`--cov`) if the environment includes conflicting C-extensions (e.g., `PyO3` via `pydantic`/`fastapi`). Targeted coverage runs (e.g., `pytest test/common --cov=pykoopman.common`) are more stable.
- **Input Validation**: `pykoopman` classes (e.g., `forced_duffing`, `slow_manifold`) often require strict input shapes (e.g., `(n_states, n_traj)`). Always verify array dimensions in tests to avoid `IndexError`.
8 changes: 4 additions & 4 deletions src/pykoopman/common/examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,11 +421,11 @@ def setup(self):
for k in range(self.sparsity):
loopbreak = 0
while loopbreak != 1:
self.J[k, 0] = np.ceil(
np.random.rand(1) * self.n_states / (self.freq_max + 1)
self.J[k, 0] = int(
np.ceil(np.random.rand() * self.n_states / (self.freq_max + 1))
)
self.J[k, 1] = np.ceil(
np.random.rand(1) * self.n_states / (self.freq_max + 1)
self.J[k, 1] = int(
np.ceil(np.random.rand() * self.n_states / (self.freq_max + 1))
)
if xhat[self.J[k, 0], self.J[k, 1]] == 0.0:
loopbreak = 1
Expand Down
97 changes: 97 additions & 0 deletions test/common/test_cqgle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import annotations

from unittest.mock import patch

import numpy as np
import pytest
from pykoopman.common import cqgle


@pytest.fixture
def cqgle_model():
n = 64
x = np.linspace(-10, 10, n, endpoint=False)
dt = 0.01
return cqgle(n, x, dt)


def test_cqgle_init(cqgle_model):
assert cqgle_model.n_states == 64
assert cqgle_model.dt == 0.01
assert cqgle_model.k.shape == (64,)


def test_cqgle_sys(cqgle_model):
t = 0
x = np.zeros(cqgle_model.n_states)
u = 0
dx = cqgle_model.sys(t, x, u)
assert dx.shape == (cqgle_model.n_states,)
assert np.all(dx == 0) # Should be zero for zero state


def test_cqgle_simulate(cqgle_model):
x0 = np.exp(-(cqgle_model.x**2))
n_int = 10
n_sample = 2
X, t = cqgle_model.simulate(x0, n_int, n_sample)

# Check return shapes
# X shape: (n_int // n_sample, n_states)
expected_steps = n_int // n_sample
assert X.shape == (expected_steps, cqgle_model.n_states)
assert len(t) == expected_steps


def test_cqgle_collect_data_continuous(cqgle_model):
n_traj = 3
x0_single = np.exp(-(cqgle_model.x**2))
x0 = np.vstack([x0_single] * n_traj)

X, Y = cqgle_model.collect_data_continuous(x0)

assert X.shape == (n_traj, cqgle_model.n_states)
assert Y.shape == (n_traj, cqgle_model.n_states)


def test_cqgle_collect_one_step_data_discrete(cqgle_model):
n_traj = 3
x0_single = np.exp(-(cqgle_model.x**2))
x0 = np.vstack([x0_single] * n_traj)

X, Y = cqgle_model.collect_one_step_data_discrete(x0)

assert X.shape == (n_traj, cqgle_model.n_states)
assert Y.shape == (n_traj, cqgle_model.n_states)


def test_cqgle_collect_one_trajectory_data(cqgle_model):
x0 = np.exp(-(cqgle_model.x**2))
n_int = 10
n_sample = 2
y = cqgle_model.collect_one_trajectory_data(x0, n_int, n_sample)

expected_steps = n_int // n_sample
assert y.shape == (expected_steps, cqgle_model.n_states)


@patch("matplotlib.pyplot.show")
def test_cqgle_visualize_data(mock_show, cqgle_model):
x0 = np.exp(-(cqgle_model.x**2))
n_int = 10
n_sample = 5
X, t = cqgle_model.simulate(x0, n_int, n_sample)

# Just ensure it runs without error
cqgle_model.visualize_data(cqgle_model.x, t, X)
mock_show.assert_called()


@patch("matplotlib.pyplot.show")
def test_cqgle_visualize_state_space(mock_show, cqgle_model):
# Create some dummy data (needs enough potential components for SVD)
X = np.random.rand(10, cqgle_model.n_states)

# Just ensure it runs without error
cqgle_model.visualize_state_space(X)
mock_show.assert_called()
222 changes: 222 additions & 0 deletions test/common/test_examples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
from __future__ import annotations

from unittest.mock import patch

import matplotlib.pyplot as plt
import numpy as np
from pykoopman.common.examples import advance_linear_system
from pykoopman.common.examples import drss
from pykoopman.common.examples import forced_duffing
from pykoopman.common.examples import Linear2Ddynamics
from pykoopman.common.examples import lorenz
from pykoopman.common.examples import rev_dvdp
from pykoopman.common.examples import rk4
from pykoopman.common.examples import sine_wave
from pykoopman.common.examples import slow_manifold
from pykoopman.common.examples import square_wave
from pykoopman.common.examples import vdp_osc


def test_drss_shapes():
n_states = 3
n_controls = 2
n_measurements = 4
A, B, C = drss(n=n_states, p=n_controls, m=n_measurements)
assert A.shape == (n_states, n_states)
assert B.shape == (n_states, n_controls)
assert C.shape == (n_measurements, n_states)


def test_drss_identity_measurement():
# If m=0, C should be identity
n_states = 3
A, B, C = drss(n=n_states, m=0)
assert C.shape == (n_states, n_states)
np.testing.assert_array_equal(C, np.eye(n_states))


def test_advance_linear_system():
n = 2
A = np.eye(n)
B = np.eye(n)
C = np.eye(n)
x0 = np.array([1.0, 1.0])
# consistent for 1 step
# n_steps to simulate
n_steps = 2
# Expanding u to match steps if needed, but the function handles 1D u as row vector
# Let's provide u of shape (p, n_steps-1)
u_seq = np.ones((n, n_steps - 1))

x, y = advance_linear_system(x0, u_seq, n_steps, A, B, C)

# x shape should be (n, n_steps)?? Wait, let's check docstring or implementation.
# Implementation: x = np.zeros([n, len(x0)]) -> Wait, len(x0) is n.
# The implementation:
# x = np.zeros([n, len(x0)]) ??? No, x0 is (n,). len(x0) is n.
# But usually x should be (n_states, n_time_steps).
# docstring says: returns x of shape (n, len(x0)).
# This seems like a potential bug or confusion in docstring vs code
# if n_steps != n_states.
# Let's look at code: 'x = np.zeros([n, len(x0)])'
# where n is passed as arg 'n' (steps).
# The argument name 'n' shadows dimension 'n'.
# In function def: advance_linear_system(x0, u, n, ...):
# n is "Number of steps to simulate"
# But inside: x = np.zeros([n, len(x0)])
# So dim 0 is n (steps), dim 1 is len(x0) (states?).
# Usually states are rows or columns.
# If n=steps, then x is (steps, states) or (states, steps).
# Code: x[0, :] = x0. So x is (steps, states).

# x has shape (n_steps, n_states)
assert x.shape == (n_steps, len(x0))
assert y.shape == (n_steps, C.shape[0])


def test_vdp_osc_rk4():
t = 0
x = np.array([[1.0], [0.5]])
u = 0.0
dt = 0.01

# Check vdp_osc structure
dx = vdp_osc(t, x, u)
assert dx.shape == x.shape

# Check rk4 integration step
x_next = rk4(t, x, u, dt, vdp_osc)
assert x_next.shape == x.shape
assert not np.array_equal(x, x_next)


def test_square_and_sine_wave():
# Just smoke tests to ensure they run/return floats
val_sq = square_wave(10)
assert (
isinstance(val_sq, float)
or isinstance(val_sq, int)
or isinstance(val_sq, np.float64)
)

val_sin = sine_wave(10)
assert isinstance(val_sin, float) or isinstance(val_sin, np.floating)


def test_lorenz():
x = [10.0, 10.0, 10.0]
t = 0.0
dx = lorenz(x, t)
assert len(dx) == 3


def test_rev_dvdp():
x = np.array(
[[1.0], [0.5]]
) # needs to be 2D array (2, 1) based on code usage of x[0,:]?
# Code: x[0,:] - ...
# So if we pass (2, 1), x[0,:] is shape (1,).
t = 0
x_next = rev_dvdp(t, x)
assert x_next.shape == x.shape


def test_linear_2d_dynamics():
sys = Linear2Ddynamics()
x = np.array([[1.0], [1.0]])

# Test linear_map
y = sys.linear_map(x)
assert y.shape == x.shape

# Test collect_data
n_int = 10
n_traj = 1
X, Y = sys.collect_data(x, n_int, n_traj)
# shapes: (n_states, n_int * n_traj)
assert X.shape == (2, n_int * n_traj)
assert Y.shape == (2, n_int * n_traj)


def test_slow_manifold():
model = slow_manifold()
x = np.array([[0.1], [0.1]]) # (2, 1) to match usage of x[0, :]

# Test sys
t = 0
u = 0
dx = model.sys(t, x, u)
assert dx.shape == x.shape

# Test simulate (requires x0 to be (2, 1))
x0 = np.array([[0.1], [0.1]])
n_int = 100
X = model.simulate(x0, n_int)
assert X.shape == (2, n_int * 1) # n_traj is 1


def test_forced_duffing():
dt = 0.01
d = 0.1
alpha = 1.0
beta = 1.0
model = forced_duffing(dt, d, alpha, beta)

assert model.n_states == 2

# Test sys
t = 0
x = np.array([[1.0], [1.0]])
u = 0.0
dx = model.sys(t, x, u)
assert dx.shape == x.shape

# Test simulate
x0 = np.array([[1.0], [1.0]])
n_int = 10
u_seq = np.zeros((n_int, 1)) # (n_int, n_traj) ?
# collect_data_discrete uses u[step, :] which implies u is (n_int, n_traj) ??
# Let's check simulate implementation: u is passed as (n_int, ...?)
# simulate(x0, n_int, u) -> u[step, :]
# if x0 has n_traj=1.

# Wait, in forced_duffing.simulate:
# u[step, :] is passed to rk4.
# if u is (n_int, 1), u[step,:] is shape (1,).
# sys takes u.
# sys implementation: ... + u
# if u is scalar or (1,) it broadcasts.

X = model.simulate(x0, n_int, u_seq)
assert X.shape == (2, n_int * 1)

# Test collect_data_continuous
u_static = 0.0
X_c, Y_c = model.collect_data_continuous(x0, u_static)
assert X_c.shape == x0.shape
assert Y_c.shape == x0.shape

# Test collect_data_discrete
X_d, Y_d = model.collect_data_discrete(x0, n_int, u_seq)
assert X_d.shape == (2, n_int * 1)
assert Y_d.shape == (2, n_int * 1)


@patch("matplotlib.pyplot.show")
def test_forced_duffing_visualize(mock_show):
dt = 0.01
model = forced_duffing(dt, 0.1, 1.0, 1.0)
t = np.linspace(0, 1, 100)
X = np.random.rand(2, 100)

model.visualize_trajectories(t, X, n_traj=1)
mock_show.assert_not_called()
# visualize_trajectories doesn't call show() in source??
# Let's check source.
# visualize_trajectories: plt.subplots... axs.plot... axs.set... No plt.show()
# It just makes plots.
plt.close() # Close to avoid warning

model.visualize_state_space(X, X, n_traj=1)
# visualize_state_space: plt.subplots... axs.plot... No plt.show()
plt.close()
Loading