diff --git a/configs/layout.json.example b/configs/layout.json.example new file mode 100644 index 00000000..ce0ac344 --- /dev/null +++ b/configs/layout.json.example @@ -0,0 +1,10 @@ +{ + "rx_coords" : [ + [0.0, 0.0], + [0.508, 0.137], + [0.0, 0.615], + [0.7, 0.1] + ], + "tx_true" : + [0.565, 0.906] +} diff --git a/scripts/experiment_scripts/collect_raw_data.py b/scripts/experiment_scripts/collect_raw_data.py index 60e3d7a2..8c4215a4 100644 --- a/scripts/experiment_scripts/collect_raw_data.py +++ b/scripts/experiment_scripts/collect_raw_data.py @@ -11,8 +11,8 @@ SOURCE_DAT_FILE = "./data_files/rand_ofdm_packet_rx" # --------- MODIFY -------------- -EXPERIMENT_NAME = "virtual_multilateration_3" # CHOOSE NAME OF EXPERIMENT TO BE RUN -ROAMING_DEVICES = ["RX2ch1"] # NAME OF DEVICE THAT IS MOVED (WILL ASK FOR POSITIONS EACH RUN) +EXPERIMENT_NAME = "rj_virtual_multilateration_300" # CHOOSE NAME OF EXPERIMENT TO BE RUN +ROAMING_DEVICES = ["RX4ch1"] # NAME OF DEVICE THAT IS MOVED (WILL ASK FOR POSITIONS EACH RUN) FIXED_DEVICES = ["ANCHORch0", "TX"] # NAME OF DEVICES THAT ARE FIXED (WILL ONLY ASK ONCE PER EXPERIMENT) # ------------------------------- diff --git a/scripts/simulation/heapmap.py b/scripts/simulation/heapmap.py new file mode 100644 index 00000000..45849b7d --- /dev/null +++ b/scripts/simulation/heapmap.py @@ -0,0 +1,46 @@ +import numpy as np +import matplotlib.pyplot as plt +from ofdm.simulation.monte_carlo import run_monte_carlo +from ofdm.config import loadLayout + + +def main(): + layout_config_pth = "./configs/layout.json" + rx_coords, tx_true = loadLayout(layout_config_pth) + + rx_x = rx_coords[:, 0].reshape(-1, 1, 1) + rx_y = rx_coords[:, 1].reshape(-1, 1, 1) + + x_range = np.linspace(0, 1, num=40) + y_range = np.linspace(0, 1, num=40) + X, Y = np.meshgrid(x_range, y_range) + + error_heatmap = np.zeros(X.shape) + + for i in range(len(x_range)): + for j in range(len(y_range)): + tx_coords = np.array([X[i][j], Y[i][j]]) + results = run_monte_carlo( + rx_coords=rx_coords, + tx_pos=tx_coords, + sigma_ns=0.1, + n_trials=10, + seed=42, + ) + error_heatmap[i][j] = results['rmse'] + + plt.figure(figsize=(10, 8)) + v_max = 1 + v_min = 0 + cp = plt.pcolormesh(X, Y, error_heatmap,vmin=v_min, vmax=v_max, shading='auto', cmap='viridis') + plt.colorbar(cp, label='RMSE (meters)') + plt.scatter(rx_x, rx_y, marker='^', color='red', label='Receivers') + plt.title('OFDM Localization Error Heatmap') + plt.xlabel('X Position (m)') + plt.ylabel('Y Position (m)') + plt.legend() + plt.show() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/simulation/monte_carlo.py b/scripts/simulation/monte_carlo.py new file mode 100644 index 00000000..80525f35 --- /dev/null +++ b/scripts/simulation/monte_carlo.py @@ -0,0 +1,49 @@ +import argparse +import numpy as np +import matplotlib.pyplot as plt +from ofdm.viz.sim_plotter import plot_mc_results, plot_tdoa_hyperbolas, DraggableSimulation +from ofdm.simulation.monte_carlo import run_monte_carlo +from ofdm.config import loadLayout + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--tx", nargs=2, type=float) + parser.add_argument("--sigma-ns", type=float, default=0.05) + parser.add_argument("--trials", type=int, default=1000) + args = parser.parse_args() + + layout_conf_path = "./configs/layout.json" + rx_coords, tx_true = loadLayout(layout_conf_path) + + if args.tx is not None: + tx_true = np.array(args.tx) + + results = run_monte_carlo( + tx_pos=tx_true, + rx_coords=rx_coords, + sigma_ns=args.sigma_ns, + n_trials=args.trials, + seed=42, + ) + + print(f"Converged: {results['n_converged']}/{results['n_trials']}") + print(f"RMSE: {results['rmse']*100:.2f} cm") + print(f"Mean error: {results['mean_error']*100:.2f} cm") + print(f"P95 errors: {results['p95_error']*100:.2f} cm") + print(f"Centroid: X={results['centroid'][0]:.4f} cm, Y={results['centroid'][1]:.4f} cm") + + ax = plot_mc_results(results, tx_true, rx_coords, args.sigma_ns) + plot_tdoa_hyperbolas(tx_true, rx_coords, results, ax) + plt.tight_layout() + sim = DraggableSimulation( + ax=ax, + tx_pos=tx_true, + rx_coords=rx_coords, + sigma_ns=args.sigma_ns, + n_trials=args.trials + ) + + plt.show() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/trilateration.py b/scripts/trilateration.py index 4e6c0910..81fcb015 100644 --- a/scripts/trilateration.py +++ b/scripts/trilateration.py @@ -137,7 +137,6 @@ def toa_cost_function(guess, rx_coords, measured_distance): Calculates difference between theoretical distances based on guessed (x,y) and the actual measured distnace. """ - x, y = guess residuals = np.zeros(len(rx_coords)) diff --git a/src/ofdm/config.py b/src/ofdm/config.py index 0b641aa7..2ac2b00f 100644 --- a/src/ofdm/config.py +++ b/src/ofdm/config.py @@ -1,5 +1,7 @@ from dataclasses import dataclass import random +import json +import numpy as np @dataclass @@ -49,8 +51,6 @@ def _load_random_map(self): self.pilot_carriers.sort() self.data_carriers.sort() - - def _load_default_map(self): """ Define Default OFDM mapping if no Config is provided @@ -71,3 +71,12 @@ def _idx(self, k: int) -> int: Helper to convert from python indexing to freq bin indexing """ return (k + self.N) % self.N + + +def loadLayout(layout_config_path:str)->np.ndarray: + """ + Loads configs/layout.json. Returns rx_coords, tx_true as np.ndarrays + """ + with open(layout_config_path, "r") as f: + coords = json.load(f) + return np.array(coords['rx_coords']), np.array(coords['tx_true']) diff --git a/src/ofdm/simulation/__init__.py b/src/ofdm/simulation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/ofdm/simulation/geometry.py b/src/ofdm/simulation/geometry.py new file mode 100644 index 00000000..1ac198cd --- /dev/null +++ b/src/ofdm/simulation/geometry.py @@ -0,0 +1,15 @@ +import numpy as np +from scipy import constants + +C = constants.c + +def compute_toa(tx_pos, rx_coords): + distances = np.linalg.norm(rx_coords - tx_pos, axis = 1) + return distances / C + +def compute_tdoa(tx_pos, rx_coords): + toa = compute_toa(tx_pos, rx_coords) + return toa[1:] - toa[0] # roaming rx - anchor + +def compute_distances(tx_pos, rx_coords): + return np.linalg.norm(rx_coords - tx_pos, axis = 1) \ No newline at end of file diff --git a/src/ofdm/simulation/monte_carlo.py b/src/ofdm/simulation/monte_carlo.py new file mode 100644 index 00000000..0ea0ee13 --- /dev/null +++ b/src/ofdm/simulation/monte_carlo.py @@ -0,0 +1,33 @@ +import numpy as np +from ofdm.simulation.geometry import compute_tdoa +from ofdm.simulation.noise_model import add_gausian_nosie +from ofdm.simulation.solver import solve_tdoa + + +def run_monte_carlo(tx_pos, rx_coords, sigma_ns, n_trials=1000, seed=None): + """ + Run monte carlo TDOA localization simluation. + Returns dict with estimates, errors, and summary stats + """ + rng = np.random.default_rng(seed) + ideal_tdoas = compute_tdoa(tx_pos, rx_coords) + + estimates = [] + for _ in range(n_trials): + noisy_tdoas = add_gausian_nosie(ideal_tdoas, sigma_ns, rng=rng) + est = solve_tdoa(rx_coords, noisy_tdoas) + if est is not None: + estimates.append(est) + estimates = np.array(estimates) + errors = np.linalg.norm(estimates - tx_pos, axis=1) + return { + "estimates": estimates, + "errors": errors, + "rmse": np.sqrt(np.mean(errors**2)), + "mean_error": np.mean(errors), + "p95_error": np.percentile(errors, 95), + "centroid": np.mean(estimates, axis=0), + "n_converged": len(estimates), + "n_trials": n_trials, + } + diff --git a/src/ofdm/simulation/noise_model.py b/src/ofdm/simulation/noise_model.py new file mode 100644 index 00000000..c4fa4b62 --- /dev/null +++ b/src/ofdm/simulation/noise_model.py @@ -0,0 +1,16 @@ +import numpy as np + +def add_gausian_nosie(tdoa_values, sigma_ns, rng=None): + """ + Add gausian noise to time difference of arrival delay values. + signa_ns: noise standard deviation in ns + rng: optional rng for repoducibility + """ + if rng is None: + rng = np.random.default_rng() + sigma_sec = sigma_ns * 1e-9 + noise = rng.normal(0, sigma_sec, size=tdoa_values.shape) + return tdoa_values + noise + + + \ No newline at end of file diff --git a/src/ofdm/simulation/solver.py b/src/ofdm/simulation/solver.py new file mode 100644 index 00000000..cb85203f --- /dev/null +++ b/src/ofdm/simulation/solver.py @@ -0,0 +1,60 @@ +import numpy as np +from scipy import constants +from scipy.optimize import least_squares + +C = constants.c + +def tdoa_cost_function(guess, rx_coords, delay_diffs_sec): + """ + Calculates residuals for TDOA least-squares solve. + rx_coords[0] must be the anchor receiver. + delay_diffs_sec: array of length len(rx_coords)-1, t_i - t_anchor in seconds. + """ + x, y = guess + residuals = np.zeros(len(rx_coords) - 1) + + anchor_x, anchor_y = rx_coords[0] + dist_to_anchor = np.sqrt((x - anchor_x)**2 + (y - anchor_y)**2) + + for i in range(1, len(rx_coords)): + rx_x, rx_y = rx_coords[i] + dist_to_rx = np.sqrt((x - rx_x)**2 + (y - rx_y)**2) + theoretical_diff = dist_to_rx - dist_to_anchor + measured_diff = delay_diffs_sec[i-1] * C + residuals[i-1] = theoretical_diff - measured_diff + + return residuals + +def solve_tdoa(rx_coords, delay_diffs_sec, initial_guess=None): + """ + Runs LM least-squares TDOA solve for a single set of delay measurements. + Returns (x, y) estimate or None if solve failed. + """ + + if initial_guess is None: + initial_guess = np.mean(rx_coords, axis=0) # centroid of recievers + + result = least_squares( + tdoa_cost_function, + initial_guess, + args=(rx_coords, delay_diffs_sec), + method='lm' + ) + + if result.success: + return result.x + return None + + +def toa_cost_function(guess, rx_coords, measured_distance): + """ + PLACEHOLDER + """ + x, y = guess + residuals = np.zeros(len(rx_coords)) + + for i in range(len(rx_coords)): + rx_x, rx_y = rx_coords[i] + theoretical_dist = np.sqrt((x - rx_x)**2 + (y-rx_y)**2) + residuals[i] = theoretical_dist - measured_distance + return residuals \ No newline at end of file diff --git a/src/ofdm/viz/sim_plotter.py b/src/ofdm/viz/sim_plotter.py new file mode 100644 index 00000000..6825cc7d --- /dev/null +++ b/src/ofdm/viz/sim_plotter.py @@ -0,0 +1,133 @@ +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.patches import Ellipse +from typing import Optional +from scipy import constants +from ofdm.simulation.geometry import compute_tdoa +from ofdm.simulation.monte_carlo import run_monte_carlo + +C = constants.c + +def plot_mc_results( + results: dict, + tx_pos: np.ndarray, + rx_coords: np.ndarray, + sigma_ns: float, + ax: Optional[plt.Axes] = None, +): + """ + Plot Monte Carlo localizaiton results. + results: dict returned by run_monte_carlo() + """ + if ax is None: + fig, ax = plt.subplots(figsize=(9, 9)) + + estimates = results["estimates"] + centroid = results["centroid"] + + ax.scatter(estimates[:, 0], estimates[:, 1], + c='red', marker='o', s=20, alpha=0.3, label='Estimates', zorder=2) + + # centroid + ax.scatter(*centroid, c='purple', marker='X', s=200, edgecolor='black', linewidths=1.5, label='Centroid', zorder=5) + + # tx positions + ax.scatter(*tx_pos, c='green', marker='s', s=150, label='TX (Ground Truth)', zorder=5) + + # rx positions + ax.scatter(*rx_coords[0], c='blue', marker='^', s=150, label='Anchor RX', zorder=4) + ax.scatter(rx_coords[1:, 0], rx_coords[1:, 1], c='cyan', marker='^', s= 100, label='Roaming RX', zorder=4) + + stats = ( + f"$\\sigma$ = {sigma_ns} ns\n" + f"RMSE = {results['rmse']*100:.2f} cm\n" + f"Mean err = {results['mean_error']*100:.2f} cm\n" + f"P95 err = {results['p95_error']*100:2.2f} cm\n" + f"Converged: = {results['n_converged']}/{results['n_trials']}" + ) + ax.text(0.02, 0.98, stats, transform=ax.transAxes, fontsize=9, + verticalalignment='top', family='monospace', + bbox=dict(boxstyle='round', facecolor='white', alpha=0.8)) + + ax.set_title(f"OFDM Monte Carlo Simulation ({results['n_trials']})") + ax.set_xlabel("X (meters)") + ax.set_ylabel("Y (meters)") + ax.legend(loc='lower right') + ax.grid(True, linestyle='--', alpha=0.6) + ax.set_aspect('equal') + + return ax + +def plot_tdoa_hyperbolas(tx_pos, rx_coords, results, ax, sigma_ns=None): + """ + For each non-anchor RX, plot the hyperbola defined by the ideal TDOA + """ + + ideal_tdoas = compute_tdoa(tx_pos, rx_coords) + + x = np.linspace(-0.5, 1.5, 800) + y = np.linspace(-0.5, 1.5, 800) + X, Y = np.meshgrid(x, y) + + anchor = rx_coords[0] + dist_to_anchor = np.sqrt((X - anchor[0])**2 + (Y - anchor[1])**2) + + for i in range(1, len(rx_coords)): + rx = rx_coords[i] + dist_to_rx = np.sqrt((X - rx[0])**2 + (Y - rx[1])**2) + diff = dist_to_rx - dist_to_anchor - (ideal_tdoas[i - 1] * C) + ax.contour(X, Y, diff, levels=[0], linewidths=1.5, linestyles='-') + + +class DraggableSimulation: + def __init__(self, ax, tx_pos, rx_coords, sigma_ns, n_trials=1000): + self.ax = ax + self.tx_pos = tx_pos.copy() + self.rx_coords = rx_coords.copy() + self.sigma_ns = sigma_ns + self.n_trials = n_trials + self._dragging = None + self._pick_radius = 0.05 + fig = ax.get_figure() + fig.canvas.mpl_connect('button_press_event', self._on_press) + fig.canvas.mpl_connect('motion_notify_event', self._on_motion) + fig.canvas.mpl_connect('button_release_event', self._on_release) + + def _hit(self, event, pos): + if event.xdata is None: + return False + return np.hypot(event.xdata - pos[0], event.ydata - pos[1]) < self._pick_radius + + def _on_press(self, event): + if self._hit(event, self.tx_pos): + self._dragging = ('tx', None) + else: + for i, rx in enumerate(self.rx_coords): + if self._hit(event, rx): + self._dragging = ('rx', i) + break + + def _on_motion(self, event): + if self._dragging is None or event.xdata is None: + return + kind, idx = self._dragging + if kind == 'tx': + self.tx_pos[:] = [event.xdata, event.ydata] + else: + self.rx_coords[idx] = [event.xdata, event.ydata] + self._redraw() + + def _on_release(self, event): + self._dragging = None + + def _redraw(self): + xlim = self.ax.get_xlim() + ylim = self.ax.get_ylim() + self.ax.cla() + results = run_monte_carlo(self.tx_pos, self.rx_coords, self.sigma_ns, self.n_trials, seed=42) + plot_mc_results(results, self.tx_pos, self.rx_coords, self.sigma_ns, ax=self.ax) + plot_tdoa_hyperbolas(self.tx_pos, self.rx_coords, results, self.ax) + self.ax.set_xlim(xlim) + self.ax.set_ylim(ylim) + self.ax.get_figure().canvas.draw_idle() + diff --git a/tests/test_geometry.py b/tests/test_geometry.py new file mode 100644 index 00000000..a6d3788b --- /dev/null +++ b/tests/test_geometry.py @@ -0,0 +1,38 @@ +import numpy as np +import pytest +from scipy import constants +from ofdm.simulation.geometry import compute_toa, compute_tdoa, compute_distances + +C = constants.c + +@pytest.fixture +def four_rx_layout(): + rx_coords = np.array([ + [0.0, 0.0], # anchor + [0.508, 0.137], # rx 2 + [0.0, 0.615], # rx 3 + [0.13, 0.22] + ]) + known_tx = np.array([0.270, 0.970]) + return rx_coords, known_tx + + +def test_toa_single_receiver(): + """TOA should equal distance / c.""" + tx = np.array([1.0, 0.0]) + rx = np.array([[0.0, 0.0]]) + toa = compute_toa(tx, rx) + np.testing.assert_allclose(toa[0], 1.0 / C, rtol=1e-9) + +def test_tdoa_zero_when_equidistant(): + """TX at center of symmetric layout should produce all-zero TDOAs.""" + rx_coords = np.array([[0.0, 0.0], [1.0, 0.0], [0.0, 1.0], [1.0, 1.0]]) + tx = np.array([0.5, 0.5]) # equidistant from all four + tdoa = compute_tdoa(tx, rx_coords) + np.testing.assert_allclose(tdoa, 0.0, atol=1e-12) + + +def test_tdoa_output_shape(four_rx_layout): + rx_coords, tx = four_rx_layout + tdoa = compute_tdoa(tx, rx_coords) + assert tdoa.shape == (len(rx_coords) - 1,) \ No newline at end of file diff --git a/tests/test_noise_model.py b/tests/test_noise_model.py new file mode 100644 index 00000000..81cf8e10 --- /dev/null +++ b/tests/test_noise_model.py @@ -0,0 +1,25 @@ +import numpy as np +import pytest + +from ofdm.simulation.noise_model import add_gausian_nosie + +def test_output_shape(): + tdoa = np.array([1e-9, 2e-9, 3e-9]) + noisy = add_gausian_nosie(tdoa, sigma_ns=2.0) + assert noisy.shape == tdoa.shape + +def test_noise_is_applied(): + tdoa = np.array([1e-9, 2e-9, 3e-9]) + noisy = add_gausian_nosie(tdoa, sigma_ns=2.0, rng=np.random.default_rng(42)) + assert not np.allclose(noisy, tdoa, atol=1e-12) + +def test_reproducible_with_seed(): + tdoa = np.array([1e-9, 2e-9, 3e-9]) + noisy = add_gausian_nosie(tdoa, sigma_ns=2.0, rng=np.random.default_rng(0)) + noisy2 = add_gausian_nosie(tdoa,sigma_ns=2.0, rng=np.random.default_rng(0)) + np.testing.assert_array_equal(noisy, noisy2) + +def test_zero_noise_unchanged(): + tdoa = np.array([1e-9, 2e-9, 3e-9]) + noisy = add_gausian_nosie(tdoa, sigma_ns=0) + np.testing.assert_array_equal(noisy, tdoa) \ No newline at end of file diff --git a/tests/test_solver.py b/tests/test_solver.py new file mode 100644 index 00000000..4ec6d5b3 --- /dev/null +++ b/tests/test_solver.py @@ -0,0 +1,78 @@ +import numpy as np +import pytest +from scipy import constants +from ofdm.simulation.solver import solve_tdoa, tdoa_cost_function, toa_cost_function + + +C = constants.c + +@pytest.fixture +def four_rx_layout(): + rx_coords = np.array([ + [0.0, 0.0], # anchor + [0.508, 0.137], # rx 2 + [0.0, 0.615], # rx 3 + [0.13, 0.22] + ]) + known_tx = np.array([0.270, 0.970]) + return rx_coords, known_tx + +def ideal_tdoa(tx_pos, rx_coords): + distances = np.linalg.norm(rx_coords - tx_pos, axis=1) + toa = distances / C + return toa[1:] - toa[0] + +class TestTDoACostFunction: + def test_cost_function_zero_at_true_position(self, four_rx_layout): + rx_coords, tx_true = four_rx_layout + tdoas = ideal_tdoa(tx_true, rx_coords) + residuals = tdoa_cost_function(tx_true, rx_coords, tdoas) + np.testing.assert_allclose(residuals, 0.0, atol = 1e-9) + + def test_cost_function_residuals_shape(self, four_rx_layout): + rx_coords, tx_true = four_rx_layout + tdoas = ideal_tdoa(tx_true, rx_coords) + residuals = tdoa_cost_function(tx_true, rx_coords, tdoas) + assert residuals.shape == (len(rx_coords) - 1,) + + def test_cost_function_not_zero_at_different_position(self, four_rx_layout): + rx_coords, tx_true = four_rx_layout + wrong_guess = np.array([0.1, 0.1]) + tdoas = ideal_tdoa(tx_true, rx_coords) + residuals = tdoa_cost_function(wrong_guess, rx_coords, tdoas) + assert np.any(np.abs(residuals) > 1e-6) + +class TestSolveTDoA: + def test_solve_noiseless_recovers_true_position(self, four_rx_layout): + rx_coords, tx_true = four_rx_layout + tdoas = ideal_tdoa(tx_true, rx_coords) + est = solve_tdoa(rx_coords, tdoas) + assert est is not None + np.testing.assert_allclose(est, tx_true, atol=1e-3) + + def test_with_default_initial_guess(self, four_rx_layout): + rx_coords, tx_true = four_rx_layout + tdoas = ideal_tdoa(tx_true, rx_coords) + est = solve_tdoa(rx_coords, tdoas, initial_guess=None) + assert est is not None + np.testing.assert_allclose(est, tx_true, atol=1e-3) + + def solve_with_three_rx(self): + rx_coords = np.array([ + [0.0, 0.0], + [0.9, 0.0], + [0.0, 0.9] + ]) + tx_true = np.array([0.7, 0.4]) + tdoas = ideal_tdoa(tx_true, rx_coords) + est = solve_tdoa(rx_coords, tdoas) + assert est is not None + np.testing.assert_allclose(est, tx_true, atol=1e-3) + + def solver_returns_ndarray(self, four_rx_layout): + rx_coords, tx_true = four_rx_layout + tdoas = ideal_tdoa(tx_true, rx_coords) + est = solve_tdoa(rx_coords, tdoas) + assert est is not None + assert isinstance(est, np.ndarray) + assert est.shape == (2,) \ No newline at end of file