From ef58e0ae9cc19c7f8e4ed41c3354162c0536ddcd Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Mon, 30 Mar 2026 17:49:04 -0400 Subject: [PATCH 01/14] initialize conftest.py --- tests/conftest.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 tests/conftest.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..f3ac0e58 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,43 @@ +import numpy as np +import pytest + +from ofdm.config import OFDMConfig +from ofdm.utils.generator import DataGenerator +from ofdm.core.preamble import generate_pilot_symbol, generate_pilot_vals +from ofdm.core.payload import generate_ofdm_data_symbol +from ofdm.core.waveform import create_time_domain_symbol + +@pytest.fixture(scope="session") +def cfg(): + return OFDMConfig(seed=42) + +@pytest.fixture(scope="session") +def active_bins(cfg): + return np.unique(np.concatenate([cfg.data_carriers], cfg.pilot_carriers)) + +@pytest.fixture(scope="session") +def pilot_values(cfg): + return generate_pilot_vals(config=cfg) + +@pytest.fixture(scope="session") +def pilot_symbol_freq(cfg): + return generate_pilot_symbol(config=cfg) + +@pytest.fixture(scope="session") +def data_symbol_freq(cfg, pilot_vals): + gen = DataGenerator(config=cfg, seed = 0) + iq_tx, bits_tx = gen.generate_random_payload() + symbol = generate_ofdm_data_symbol(config=cfg, iq_data=iq_tx, pilot_data=pilot_vals) + return symbol, iq_tx, bits_tx + +@pytest.fixture(scope="session") +def pilot_symbol_time(cfg, pilot_symbol_freq): + return create_time_domain_symbol(pilot_symbol_freq, cp_len=cfg.CP_LEN) + +@pytest.fixture(scope="session") +def data_symbol_time(cfg, data_symbol_freq): + return create_time_domain_symbol(data_symbol_freq, cp_len=cfg.CP_LEN) + +@pytest.fixture +def generator(cfg): + return DataGenerator(config=cfg, seed=0) \ No newline at end of file From 94163fca7edd78040be3dc75266aea16dc727caf Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Mon, 30 Mar 2026 17:59:28 -0400 Subject: [PATCH 02/14] clean --- tests/test_delay.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/test_delay.py b/tests/test_delay.py index a294f8b5..3433d07e 100644 --- a/tests/test_delay.py +++ b/tests/test_delay.py @@ -6,20 +6,17 @@ def test_matched_filter_output_length(): rx = np.random.randn(100) + 1j * np.random.randn(100) ref = np.random.randn(100) + 1j * np.random.randn(100) - z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, -fs=100e6) + z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, fs=100e6) assert len(z) == len(rx) + len(ref) - 1 def test_matched_filter_lags_length(): rx = np.random.randn(100) + 1j * np.random.randn(100) ref = np.random.randn(100) + 1j * np.random.randn(100) - z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, -fs=100e6) + z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, fs=100e6) assert len(lags) == len(z) def test_matched_filter_peak_at_zero_delay(): signal = np.random.randn(100) + 1j * np.random.randn(100) - z, lags = matched_filter_calc(rx_iq=signal, ref_iq=signal, -fs=100e6) + z, lags = matched_filter_calc(rx_iq=signal, ref_iq=signal, fs=100e6) peak_lag = lags[np.argmax(np.abs(z))] assert peak_lag == 0.0 \ No newline at end of file From 78ed64b1aba1646c9931d4c670929d939e7e934a Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Mon, 30 Mar 2026 18:35:51 -0400 Subject: [PATCH 03/14] bug fix: scale_rx_signal trying to return value that wasnt created The function would check if the max value was greater than 0 and if it was then it would scale the input array and return if. If the input is all zeros, the scaled array would never be created but it would still try to return it. Changed to if the input is all zeros it just returns the input --- scripts/delay/calc_delay.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/scripts/delay/calc_delay.py b/scripts/delay/calc_delay.py index 1e7c8c11..481c53ff 100644 --- a/scripts/delay/calc_delay.py +++ b/scripts/delay/calc_delay.py @@ -280,12 +280,9 @@ def binary_ref_to_iq(binary_string:str, n_samples:int)->np.ndarray: def scale_rx_signal(raw_rx_data:np.ndarray)->np.ndarray: max_val = np.max(np.abs(raw_rx_data)) - if max_val > 0: - scale_factor = 0.9 / max_val - scaled_rx_data = raw_rx_data * scale_factor - return scaled_rx_data - + return raw_rx_data * (0.9) / max_val + return raw_rx_data # def calculate_precise_delay(rx_signal, ref_signal, fs, upsample_factor=100, debug_plot:bool = False): # """ From f8208ed97e7b07d3d8fe08791e1bd5226812b8cd Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Mon, 30 Mar 2026 18:37:22 -0400 Subject: [PATCH 04/14] add delay tests --- tests/test_delay.py | 51 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/test_delay.py b/tests/test_delay.py index 3433d07e..99eb4e9d 100644 --- a/tests/test_delay.py +++ b/tests/test_delay.py @@ -2,6 +2,57 @@ from ofdm.channel.delay import matched_filter_calc from ofdm.config import OFDMConfig import pytest +from scripts.delay.calc_delay import upsample, scale_rx_signal, calculate_precise_delay + +class TestUpsample: + def test_output_length(self): + x = np.ones(10, dtype=complex) + out = upsample(x, scale_factor=4) + assert len(out) == 40 + + def test_scale_factor_one_unchanged(self): + x = np.random.randn(64) + 1j * np.random.randn(64) + out = upsample(x, scale_factor=1) + np.testing.assert_allclose(np.abs(out), np.abs(x), atol=1e-10) + + def test_pure_tone_preserved(self): + N = 64 + f = 5 + t = np.arange(N) + x = np.exp(1j * 2 * np.pi * f * t / N) + out = upsample(x, scale_factor=4) + assert len(out) == N * 4 + +class TestScaleRxSignal: + def test_max_value_is_09(self): + x = np.array([0.5, 1.0, -2.0, 0.3], dtype=float) + out = scale_rx_signal(x) + assert np.max(np.abs(out)) == pytest.approx(0.9) + + def test_zero_signal_unchanged(self): + x = np.zeros(10) + out = scale_rx_signal(x) + np.testing.assert_array_equal(out, x) + +class TestCalculatePreciseDelay: + def test_zero_delay(self): + ref = np.random.randn(64) + 1j * np.random.rand(64) + delay = calculate_precise_delay(ref, ref, fs=100e6) + assert abs(delay) < 1 / 100e6 + + def test_known_integer_delay(self): + ref=np.zeros(256, dtype=complex) + ref[10:20] = 1.0 + rx = np.zeros(256, dtype=complex) + rx[20:30] = 1.0 + delay = calculate_precise_delay(rx, ref, fs=100e6) + expected = 10 / 100e6 + assert abs(delay - expected) < 2 / 100e6 + + def test_returns_scaler(self): + ref = np.random.randn(64) + 1j * np.random.randn(64) + result = calculate_precise_delay(ref, ref, fs=100e6) + assert np.isscalar(result) or result.ndim == 0 def test_matched_filter_output_length(): rx = np.random.randn(100) + 1j * np.random.randn(100) From 888d17339774f9a13d4972c1f9244edfc2f56494 Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Mon, 30 Mar 2026 18:53:31 -0400 Subject: [PATCH 05/14] refactor: upsample from script function to include in ofdm package --- src/ofdm/channel/delay.py | 34 ++++++++++++++++++++++++++++++---- tests/test_delay.py | 5 +++-- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/src/ofdm/channel/delay.py b/src/ofdm/channel/delay.py index 01e9645d..e3f90006 100644 --- a/src/ofdm/channel/delay.py +++ b/src/ofdm/channel/delay.py @@ -1,14 +1,40 @@ import numpy as np - def matched_filter_calc(rx_iq:np.ndarray, ref_iq:np.ndarray, fs:float) -> np.ndarray: """ - Calculated the delay through matched filter delay estimation + Calculate the delay through matched filter delay estimation + + Args: + rx_iq: Recieved IQ data + ref_iq: Reference IQ data + fs: Sampling Frequency + Returns: + z: Complex corss-correlation output + lags: Time lag corresponding to each element of z, in seconds """ z = np.correlate(rx_iq, ref_iq, mode="full") - lags = np.arange(-len(rx_iq) + 1, len(ref_iq)) / fs + return z, lags + +def upsample(raw_data:np.ndarray, scale_factor:int = 100)->np.ndarray: + """ + Upsamples raw data based on the scale factor for sub sampling matched filter delay estimation + + Args: + raw_data: Raw pilot symbol data to be upsampled + scale_factor: Multiplication factor for umsampling i.e. to upsample from 100MHz to 10Ghz use 100 - return z, lags \ No newline at end of file + Returns: + Upsampled np.ndarray data + """ + n_data = len(raw_data) + n_padded = n_data * scale_factor + + freq = np.fft.fftshift(np.fft.fft(raw_data)) + n_total_zeros = n_padded - n_data + zeros_one_side = np.zeros(n_total_zeros // 2) + freq_zero_padded_shifted = np.fft.ifftshift(np.concatenate([zeros_one_side, freq, zeros_one_side])) + return np.fft.ifft(freq_zero_padded_shifted) * scale_factor + diff --git a/tests/test_delay.py b/tests/test_delay.py index 99eb4e9d..23041316 100644 --- a/tests/test_delay.py +++ b/tests/test_delay.py @@ -1,8 +1,9 @@ import numpy as np -from ofdm.channel.delay import matched_filter_calc +from ofdm.channel.delay import matched_filter_calc, upsample from ofdm.config import OFDMConfig import pytest -from scripts.delay.calc_delay import upsample, scale_rx_signal, calculate_precise_delay +from scripts.delay.calc_delay import scale_rx_signal, calculate_precise_delay + class TestUpsample: def test_output_length(self): From 36fcd50eec260c8c369ffaa6068bda32bfbc8f08 Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Mon, 30 Mar 2026 18:54:57 -0400 Subject: [PATCH 06/14] refactor sclae_rx_signal from calc_delay.py to package definition --- src/ofdm/channel/delay.py | 5 +++++ tests/test_delay.py | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/ofdm/channel/delay.py b/src/ofdm/channel/delay.py index e3f90006..b044bb4b 100644 --- a/src/ofdm/channel/delay.py +++ b/src/ofdm/channel/delay.py @@ -38,3 +38,8 @@ def upsample(raw_data:np.ndarray, scale_factor:int = 100)->np.ndarray: freq_zero_padded_shifted = np.fft.ifftshift(np.concatenate([zeros_one_side, freq, zeros_one_side])) return np.fft.ifft(freq_zero_padded_shifted) * scale_factor +def scale_rx_signal(raw_rx_data:np.ndarray)->np.ndarray: + max_val = np.max(np.abs(raw_rx_data)) + if max_val > 0: + return raw_rx_data * (0.9) / max_val + return raw_rx_data \ No newline at end of file diff --git a/tests/test_delay.py b/tests/test_delay.py index 23041316..5b7f4968 100644 --- a/tests/test_delay.py +++ b/tests/test_delay.py @@ -1,8 +1,8 @@ import numpy as np -from ofdm.channel.delay import matched_filter_calc, upsample +from ofdm.channel.delay import matched_filter_calc, upsample, scale_rx_signal from ofdm.config import OFDMConfig import pytest -from scripts.delay.calc_delay import scale_rx_signal, calculate_precise_delay +from scripts.delay.calc_delay import calculate_precise_delay class TestUpsample: From bfc0d94e2a9a2805d5d4123839968743b348c571 Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Mon, 30 Mar 2026 19:29:51 -0400 Subject: [PATCH 07/14] refactor precise delay functions in calc_delay.py into independent functions in ofdm package --- src/ofdm/channel/delay.py | 79 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/src/ofdm/channel/delay.py b/src/ofdm/channel/delay.py index b044bb4b..d1422230 100644 --- a/src/ofdm/channel/delay.py +++ b/src/ofdm/channel/delay.py @@ -1,5 +1,6 @@ import numpy as np - +import scipy +import scipy.signal def matched_filter_calc(rx_iq:np.ndarray, ref_iq:np.ndarray, fs:float) -> np.ndarray: """ @@ -18,7 +19,7 @@ def matched_filter_calc(rx_iq:np.ndarray, ref_iq:np.ndarray, fs:float) -> np.nda lags = np.arange(-len(rx_iq) + 1, len(ref_iq)) / fs return z, lags -def upsample(raw_data:np.ndarray, scale_factor:int = 100)->np.ndarray: +def zero_padded_upsample(raw_data:np.ndarray, scale_factor:int = 100)->np.ndarray: """ Upsamples raw data based on the scale factor for sub sampling matched filter delay estimation @@ -39,7 +40,79 @@ def upsample(raw_data:np.ndarray, scale_factor:int = 100)->np.ndarray: return np.fft.ifft(freq_zero_padded_shifted) * scale_factor def scale_rx_signal(raw_rx_data:np.ndarray)->np.ndarray: + """ + Scales an array to +- 0.9 + """ max_val = np.max(np.abs(raw_rx_data)) if max_val > 0: return raw_rx_data * (0.9) / max_val - return raw_rx_data \ No newline at end of file + return raw_rx_data + +def _correlate(rx_signal, ref_signal): + corr = scipy.signal.correlate(rx_signal, ref_signal, mode='full') + lags = scipy.signal.correlation_lags(len(rx_signal), len(ref_signal), mode='full') + peak_idx = np.argmax(np.abs(corr)) + return corr, lags, peak_idx + +def calculate_sub_sample_delay_parabolic(rx_signal:np.ndarray, ref_signal:np.ndarray, fs:float)->float: + """ + Calculates sub-sample delay using Parabolic Interpolation + + Correlates rx_signal against ref_signal, then upsamples using parabolic interpolation. + + Args: + rx_signal: Recieved IQ data + ref_signal: Known reference signal (e.g. TX pilot symbol) + fs: Sampling Frequency in HZ + + Returns: + Estimated delay in seconds + """ + corr, lags, peak_idx = _correlate(rx_signal, ref_signal) + corr_mag = np.abs(corr) + + if peak_idx > 0 and peak_idx < len(corr) - 1: + alpha = corr_mag[peak_idx - 1] + beta = corr_mag[peak_idx] + gamma = corr_mag[peak_idx + 1] + fractional_shift = 0.5 * (alpha - gamma) / (alpha - 2*beta + gamma) + else: + fractional_shift = 0.0 + + precise_lag = lags[peak_idx] + fractional_shift + return precise_lag / fs + + +def calculate_sub_sample_delay_zp(rx_signal:np.ndarray, ref_signal:np.ndarray, fs:float, upsample_factor:int = 100, search_radius:int = 16)->float: + """ + Calculates sub sample delay using zero-padded FFT interpolation + + Correlates rx_signal against ref_signal, then upsamples a window around the correlation peak via zero-padding + in the frequency domain to achieve sub-sample timing resolution + + Args: + rx_signal: Recieved IQ samples + ref_signal: Reference IQ samples (e.g. known TX pilot symbol) + fs: Sampling frequency in HZ + upsample_factor: Interpolation factor applied to the correlation window + search_radius: Nuber of samples on each side of the coarse peak to search + + Returns: + Estimated delay in seconds + """ + corr, lags, peak_idx = _correlate(rx_signal, ref_signal) + + start = max(0, peak_idx - search_radius) + end = min(len(corr), peak_idx + search_radius) + search_window = corr[start:end] + + window_upsampled = zero_padded_upsample(search_window, scale_factor=upsample_factor) + upsampled_peak_idx = np.argmax(window_upsampled) + + fractional_offset = upsampled_peak_idx / upsample_factor + final_idx = start + fractional_offset + + zero_lag_index = np.where(lags == 0)[0][0] + final_lag_samples = final_idx - zero_lag_index + return final_lag_samples / fs + From d55f518f24ca2169461ff20c20844c7dcd01893c Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Mon, 30 Mar 2026 19:52:19 -0400 Subject: [PATCH 08/14] add tests for sub sampling delay calculation functions --- tests/test_delay.py | 123 ++++++++++++++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 32 deletions(-) diff --git a/tests/test_delay.py b/tests/test_delay.py index 5b7f4968..be3b9065 100644 --- a/tests/test_delay.py +++ b/tests/test_delay.py @@ -1,19 +1,36 @@ import numpy as np -from ofdm.channel.delay import matched_filter_calc, upsample, scale_rx_signal +from ofdm.channel.delay import matched_filter_calc, zero_padded_upsample, scale_rx_signal, calculate_sub_sample_delay_parabolic, calculate_sub_sample_delay_zp from ofdm.config import OFDMConfig import pytest -from scripts.delay.calc_delay import calculate_precise_delay + +def test_matched_filter_output_length(): + rx = np.random.randn(100) + 1j * np.random.randn(100) + ref = np.random.randn(100) + 1j * np.random.randn(100) + z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, fs=100e6) + assert len(z) == len(rx) + len(ref) - 1 + +def test_matched_filter_lags_length(): + rx = np.random.randn(100) + 1j * np.random.randn(100) + ref = np.random.randn(100) + 1j * np.random.randn(100) + z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, fs=100e6) + assert len(lags) == len(z) + +def test_matched_filter_peak_at_zero_delay(): + signal = np.random.randn(100) + 1j * np.random.randn(100) + z, lags = matched_filter_calc(rx_iq=signal, ref_iq=signal, fs=100e6) + peak_lag = lags[np.argmax(np.abs(z))] + assert peak_lag == 0.0 class TestUpsample: def test_output_length(self): x = np.ones(10, dtype=complex) - out = upsample(x, scale_factor=4) + out = zero_padded_upsample(x, scale_factor=4) assert len(out) == 40 def test_scale_factor_one_unchanged(self): x = np.random.randn(64) + 1j * np.random.randn(64) - out = upsample(x, scale_factor=1) + out = zero_padded_upsample(x, scale_factor=1) np.testing.assert_allclose(np.abs(out), np.abs(x), atol=1e-10) def test_pure_tone_preserved(self): @@ -21,7 +38,7 @@ def test_pure_tone_preserved(self): f = 5 t = np.arange(N) x = np.exp(1j * 2 * np.pi * f * t / N) - out = upsample(x, scale_factor=4) + out = zero_padded_upsample(x, scale_factor=4) assert len(out) == N * 4 class TestScaleRxSignal: @@ -35,40 +52,82 @@ def test_zero_signal_unchanged(self): out = scale_rx_signal(x) np.testing.assert_array_equal(out, x) -class TestCalculatePreciseDelay: +class TestCalculateSubSampleDelayParabolic: + def test_zero_delay(self): - ref = np.random.randn(64) + 1j * np.random.rand(64) - delay = calculate_precise_delay(ref, ref, fs=100e6) + ref = np.random.randn(256) + 1j*np.random.randn(256) + delay = calculate_sub_sample_delay_parabolic(ref, ref, fs=100e6) assert abs(delay) < 1 / 100e6 def test_known_integer_delay(self): - ref=np.zeros(256, dtype=complex) + ref = np.zeros(256, dtype=complex) ref[10:20] = 1.0 rx = np.zeros(256, dtype=complex) rx[20:30] = 1.0 - delay = calculate_precise_delay(rx, ref, fs=100e6) + delay = calculate_sub_sample_delay_parabolic(rx_signal=rx, ref_signal=ref, fs=100e6) expected = 10 / 100e6 - assert abs(delay - expected) < 2 / 100e6 + np.testing.assert_allclose(delay, expected, atol=2/100e6) - def test_returns_scaler(self): - ref = np.random.randn(64) + 1j * np.random.randn(64) - result = calculate_precise_delay(ref, ref, fs=100e6) - assert np.isscalar(result) or result.ndim == 0 + def test_returns_float(self): + ref = np.zeros(256, dtype=complex) + delay = calculate_sub_sample_delay_parabolic(ref, ref, fs=100e6) + assert isinstance(delay, float) -def test_matched_filter_output_length(): - rx = np.random.randn(100) + 1j * np.random.randn(100) - ref = np.random.randn(100) + 1j * np.random.randn(100) - z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, fs=100e6) - assert len(z) == len(rx) + len(ref) - 1 - -def test_matched_filter_lags_length(): - rx = np.random.randn(100) + 1j * np.random.randn(100) - ref = np.random.randn(100) + 1j * np.random.randn(100) - z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, fs=100e6) - assert len(lags) == len(z) - -def test_matched_filter_peak_at_zero_delay(): - signal = np.random.randn(100) + 1j * np.random.randn(100) - z, lags = matched_filter_calc(rx_iq=signal, ref_iq=signal, fs=100e6) - peak_lag = lags[np.argmax(np.abs(z))] - assert peak_lag == 0.0 \ No newline at end of file + def test_sub_sample_delay(self): + fs = 100e6 + true_delay_samples = 10.3 + + t = np.arange(256) / fs + freq = 1e6 + ref = np.sin(2 * np.pi * freq * t).astype(complex) + N = len(ref) + freqs = np.fft.fftfreq(N, d=1/fs) + rx = np.fft.ifft(np.fft.fft(ref) * np.exp(-1j * 2 * np.pi * freqs * true_delay_samples / fs)) + delay = calculate_sub_sample_delay_parabolic(rx_signal=rx, ref_signal=ref, fs=fs) + expected = true_delay_samples / fs + np.testing.assert_allclose(delay, expected, atol=0.1/fs) + +class TestCalculateSubSampleDelayZeroPadded: + + def test_zero_delay(self): + ref = np.random.randn(256) + 1j * np.random.randn(256) + delay = calculate_sub_sample_delay_zp(ref, ref, fs=100e6) + assert abs(delay) < 1 / 100e6 + + def test_known_integer_delay(self): + ref = np.zeros(256, dtype=complex) + ref[10:20] = 1.0 + rx = np.zeros(256, dtype=complex) + rx[20:30] = 1.0 + delay = calculate_sub_sample_delay_zp(rx_signal=rx, ref_signal=ref, fs=100e6) + expected = 10 / 100e6 + np.testing.assert_allclose(delay, expected, atol=2/100e6) + + def test_upsample_factor_affects_precision(self): + # higher upsample factor should give equal or better accuracy + ref = np.zeros(256, dtype=complex) + ref[10:20] = 1.0 + rx = np.zeros(256, dtype=complex) + rx[20:30] = 1.0 + expected = 10 / 100e6 + + delay_low = calculate_sub_sample_delay_zp(rx, ref, fs=100e6, upsample_factor=10) + delay_high = calculate_sub_sample_delay_zp(rx, ref, fs=100e6, upsample_factor=100) + + err_low = abs(delay_low - expected) + err_high = abs(delay_high - expected) + assert err_high <= err_low + + def test_sub_sample_delay(self): + fs = 100e6 + true_delay_samples = 10.3 + + t = np.arange(256) / fs + freq = 1e6 + ref = np.sin(2 * np.pi * freq * t).astype(complex) + N = len(ref) + freqs = np.fft.fftfreq(N, d=1/fs) + rx = np.fft.ifft(np.fft.fft(ref) * np.exp(-1j * 2 * np.pi * freqs * true_delay_samples / fs)) + delay = calculate_sub_sample_delay_zp(rx_signal=rx, ref_signal=ref, fs=fs) + expected = true_delay_samples / fs + np.testing.assert_allclose(delay, expected, atol=0.1/fs) \ No newline at end of file From 2cfe1b3405a6e34e581df2e87c660833c3bcab21 Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Mon, 30 Mar 2026 20:23:51 -0400 Subject: [PATCH 09/14] refactor calc_daly script using package functions --- scripts/delay/calc_delay.py | 372 ++++-------------------------------- 1 file changed, 32 insertions(+), 340 deletions(-) diff --git a/scripts/delay/calc_delay.py b/scripts/delay/calc_delay.py index 481c53ff..22d599c4 100644 --- a/scripts/delay/calc_delay.py +++ b/scripts/delay/calc_delay.py @@ -1,163 +1,34 @@ -import subprocess import numpy as np import os -import matplotlib.pyplot as plt -from scipy.interpolate import interp1d import json -from ofdm.channel import delay -from ofdm.modulation import qam -from ofdm.config import OFDMConfig -from ofdm.core import sync import scipy -import scipy.signal import argparse -from ofdm.utils import usrp +from ofdm.utils import usrp +from ofdm.modulation import qam +from ofdm.config import OFDMConfig +from ofdm.channel.delay import matched_filter_calc, scale_rx_signal, calculate_sub_sample_delay_parabolic, calculate_sub_sample_delay_zp #Config CALIBRATION_PATH = "metadata/calibration.json" RX_DATA_PATH = "data_files/rand_ofdm_packet.dat" TX_REF_PATH = "data_files/rand_ofdm_packet_ref.json" #Define OFDM Configuration -ofdm_conf = OFDMConfig() +OFDM_CONF = OFDMConfig() STORE_REF_PATH = "./metadata/ref_delay_calc.json" STORE_NO_REF_PATH = "./metadata/delay_calc.json" USRP_CONFIG_PATH = "./configs/usrp_settings.yaml" START_IDX_PATH = "./data_files/ofdm_performance.json" - - - C = scipy.constants.c #Speed of light -def calc_delay_with_ref(): - """ - Calculate delay between tx and rx using a wired references. - """ - #Get constant - with open(CALIBRATION_PATH, 'r') as f: - cali_data = json.load(f) - CONSTANT = cali_data['constant'] - - #Unpack Sivers RX and TX data - raw_rx_data = np.fromfile("data_files/rand_ofdm_packet_rx.01.dat", dtype=np.complex64) - - #Unpack wired RX data - wired_rx_data = np.fromfile("data_files/rand_ofdm_packet_rx.00.dat", dtype=np.complex64) - - #Unpack TX Pilot symbol - with open("data_files/rand_ofdm_packet_ref.json", "r") as f: - ref_data = json.load(f) - - #Get Tx pilot symbol - tx_pilot = np.array(ref_data['pilot_ref_real']) + 1j * np.array(ref_data['pilot_ref_imag']) - - #Scale raw_rx_data - scaled_wireless_rx = scale_rx_signal(raw_rx_data=raw_rx_data) - scaled_wired_rx = scale_rx_signal(raw_rx_data=wired_rx_data) - - #Upsampel Data - rx_wireless_upsampled = upsample(scaled_wireless_rx, scale_factor = 100) - rx_wired_upsampled = upsample(scaled_wired_rx, scale_factor=100) - tx_upsampled = upsample(tx_pilot, scale_factor=100) - - #Calculate Matched Filter Delay for wireless - z_wireless, lags_wireless = delay.matched_filter_calc(rx_iq = rx_wireless_upsampled, ref_iq=tx_upsampled, fs = (ofdm_conf.FS * 100)) - z_mag_wireless = np.abs(z_wireless) - - #Find Peak of correlation - peak_idx_wireless = np.argmax(z_mag_wireless) - fine_delay_wireless = lags_wireless[peak_idx_wireless] - - #Calculate Matched Filter Delay for wired - z_wired, lags_wired = delay.matched_filter_calc(rx_iq=rx_wired_upsampled, ref_iq=tx_upsampled,fs = (ofdm_conf.FS * 100)) - z_mag_wired = np.abs(z_wired) - - #Fine peak of correlatiokn - peak_idx_wired = np.argmax(z_mag_wired) - fine_delay_wired = lags_wired[peak_idx_wired] - - #print(f"Wired Delay: {fine_delay_wired}, Wireless Delay: {fine_delay_wireless}") - - #print(f"Wired Delay - Wireless Delay = {fine_delay_wired - fine_delay_wireless}") - caled_delay = ((fine_delay_wireless - fine_delay_wired) * 1e9) - print(f"Delay: {caled_delay:.5f}ns") - - #Calculate Distance - #print(f"Constant used is: {CONSTANT}") - raw_distance = ((fine_delay_wireless - fine_delay_wired) * C) - CONSTANT - print(f"Distance: {raw_distance:.5f}") - print(f"{caled_delay:5f},{raw_distance:.5f}") - - json_data = { - "delay":caled_delay, - "raw_distance":raw_distance, - "mode": "ref" - } - - os.makedirs(os.path.dirname(STORE_REF_PATH), exist_ok=True) - with open(STORE_REF_PATH, "w") as f: - json.dump(json_data, f, indent=2) - print(f"Stored delay calc with ref to {STORE_REF_PATH}") - - - - -def calc_delay(rx_path:str, channel:str, start_idx:list, do_DebugPlot:bool = False): - """ - Calculate delay for all cahnnels - """ - # - - #Unpack Sivers RX and TX data - raw_rx_data = np.fromfile(rx_path, dtype=np.complex64) - coarse_sample_idx = start_idx[1] - rx_data = clean_rx(raw_rx_data, coarse_sample_idx) - - #Unpack TX Pilot symbol - with open("data_files/rand_ofdm_packet_ref.json", "r") as f: - ref_data = json.load(f) - - #Get Tx pilot symbol - tx_pilot = np.array(ref_data['pilot_ref_real']) + 1j * np.array(ref_data['pilot_ref_imag']) - - #Scale raw_rx_data - scaled_wireless_rx = scale_rx_signal(raw_rx_data=rx_data) - - - #Calculate Matched Filter Delay for wireless - fine_delay_sec = calculate_precise_delay( - rx_signal=scaled_wireless_rx, - ref_signal=tx_pilot, - fs=ofdm_conf.FS, - debug_plot=do_DebugPlot - ) - - coarse_delay_sec = coarse_sample_idx / ofdm_conf.FS - - total_abosule_delay_sec = coarse_delay_sec + fine_delay_sec - - - #Calculatiosn - calced_delay_ns = ((total_abosule_delay_sec) * 1e9) - - print(f"Channel {channel}:") - print(f"--Delay: {calced_delay_ns:.5f}ns") - return calced_delay_ns - - - - def clean_rx(rx_raw:np.ndarray, start_idx:int)->np.ndarray: """ Removes leading and trialing zeros from the signal """ - - # ------ Prepare RX signal -------- with open(TX_REF_PATH, 'r') as f: ref_data = json.load(f) - # Get total samples in packet - sym_len = ofdm_conf.N + ofdm_conf.CP_LEN + sym_len = OFDM_CONF.N + OFDM_CONF.CP_LEN total_symbols = 1 + 1 + ref_data["n_data_symb"] total_samples = sym_len * total_symbols @@ -170,6 +41,21 @@ def clean_rx(rx_raw:np.ndarray, start_idx:int)->np.ndarray: return rx_raw[start:end] +def unpack_json(json_file_name:str)->dict: + json_file_path = f"data_files/{json_file_name}" + with open(json_file_path, "r") as f: + data = json.load(f) + print(f"[Success] Unpacked {json_file_path}") + return data + + +def binary_ref_to_iq(binary_string:str, n_samples:int)->np.ndarray: + full_string = "".join(binary_string) + word_len = 4 + binary_word_list = np.array([full_string[i:word_len + i] for i in range(0 ,len(full_string), word_len)]) + iq_array = [qam.binary_to_iq(word) for word in binary_word_list] + return np.array(iq_array) * np.sqrt(10) + def main(): parser = argparse.ArgumentParser() @@ -178,36 +64,30 @@ def main(): parser.add_argument("--file", type=str, help="Specify specific file to unpack") args = parser.parse_args() + usrp_conf = usrp.load_config(USRP_CONFIG_PATH) with open(CALIBRATION_PATH, 'r') as f: cali_data = json.load(f) constants = cali_data['constants'] - with open(START_IDX_PATH, 'r') as f: ofdm_performance_data = json.load(f) - start_idx_list = ofdm_performance_data['start_idx'] - + with open(TX_REF_PATH, "r") as f: + ref_data = json.load(f) + tx_pilot = np.array(ref_data['pilot_ref_real']) + 1j * np.array(ref_data['pilot_ref_imag']) + calced_delays = [] - usrp_conf = usrp.load_config(USRP_CONFIG_PATH) rx_channel_idx = usrp_conf.rx_channel_idx.replace(",", "") - if not args.file: for channel in rx_channel_idx: path = f"./data_files/rand_ofdm_packet_rx.0{channel}.dat" - calced_delays.append(calc_delay( - do_DebugPlot=args.plot, - rx_path=path, - start_idx=start_idx_list[int(channel)], - channel=channel)) + rx_data = scale_rx_signal(clean_rx(np.fromfile(path, dtype=np.complex64), start_idx_list[int(channel)][1])) + delay = calculate_sub_sample_delay_parabolic(rx_signal=rx_data, ref_signal=tx_pilot, fs = OFDM_CONF.FS) * 1e9 # convert to ns + calced_delays.append(delay) + print(f"Channel{channel} delay: {delay:.1f}ns") else: path = args.file - calced_delays.append(calc_delay( - do_DebugPlot=args.plot, - rx_path=path, - start_idx=start_idx_list[0], - channel=0 - )) - + rx_data = scale_rx_signal(clean_rx(np.fromfile(path, dtype=np.complex64), start_idx_list[int(0)][1])) + calced_delays.append(calculate_sub_sample_delay_parabolic(rx_signal=rx_data, ref_signal=tx_pilot, fs = OFDM_CONF.FS)) json_data = { "delays":calced_delays, @@ -219,193 +99,5 @@ def main(): json.dump(json_data, f, indent=2) print(f"Stored delay calc to {STORE_NO_REF_PATH}") - - - -def upsample(raw_data:np.ndarray, scale_factor:int = 100)->np.ndarray: - """ - Upsamples raw data based on the scale factor for matched filter delay esiamtion - - Args: - raw_data: Raw RX or Tx pilot data to be upsampled - scale_factor: Multiplication factor for upsamples i.ee to upsample from 100Mhz to 10Ghz use scale_factor = 100 - - Returns: - np.ndarray of upsampled data - """ - N = len(raw_data) - K = scale_factor - N_padded = N * K - - #Convert to freq - freq = np.fft.fftshift(np.fft.fft(raw_data)) - - total_zeros = N_padded - N #Calculate total number of zeros for upsampling - zeros_side = np.zeros(total_zeros // 2) #Get the number of requied zeros to append and prepend to original data - - freq_padded = np.concatenate([zeros_side, freq, zeros_side]) - freq_ready = np.fft.ifftshift(freq_padded) - - upsampled = np.fft.ifft(freq_ready) * K - return upsampled - - -def unpack_json(json_file_name:str)->dict: - """ - Unpacks a json file and returns the json dictionary - - Args: - json_file_name: name of file in the data_files dir (do not include data_files/) - - Returns: - json data dictionary - """ - json_file_path = f"data_files/{json_file_name}" - with open(json_file_path, "r") as f: - data = json.load(f) - print(f"[Success] Unpacked {json_file_path}") - return data - -def binary_ref_to_iq(binary_string:str, n_samples:int)->np.ndarray: - - full_string = "".join(binary_string) - - #Parse String into 4 bit words - word_len = 4 - binary_word_list = np.array([full_string[i:word_len + i] for i in range(0 ,len(full_string), word_len)]) - - #Convert to IQ - iq_array = [qam.binary_to_iq(word) for word in binary_word_list] - return np.array(iq_array) * np.sqrt(10) - -def scale_rx_signal(raw_rx_data:np.ndarray)->np.ndarray: - max_val = np.max(np.abs(raw_rx_data)) - if max_val > 0: - return raw_rx_data * (0.9) / max_val - return raw_rx_data - -# def calculate_precise_delay(rx_signal, ref_signal, fs, upsample_factor=100, debug_plot:bool = False): -# """ -# Calculates delay using coarse correlation zero-padded FFT interpolation -# """ -# corr = scipy.signal.correlate(rx_signal, ref_signal, mode='full') -# lags = scipy.signal.correlation_lags(len(rx_signal), len(ref_signal), mode='full') - -# # Find coarse peak -# mag = np.abs(corr) -# coarse_idx = np.argmax(mag) - -# #Get small window -# radius = 16 -# start = max(0, coarse_idx - radius) -# end = min(len(corr), coarse_idx + radius) - -# window = corr[start:end] - -# #Zero padded interpolation -# window_fft = np.fft.fft(window) -# window_fft_shifted = np.fft.fftshift(window_fft) - -# #Zero pad -# n_original = len(window) -# n_padded = n_original * upsample_factor -# n_zeros = n_padded - n_original - -# zeros_half = np.zeros(n_zeros // 2, dtype=complex) -# fft_padded_shifted = np.concatenate([ -# zeros_half, -# window_fft_shifted, -# zeros_half -# ]) - -# #IFFT -# fft_padded_ready = np.fft.ifftshift(fft_padded_shifted) -# window_upsampled= np.fft.ifft(fft_padded_ready) * upsample_factor - -# #Find precise peak -# upsampled_mag = np.abs(window_upsampled) -# peak_upsampled_idx = np.argmax(upsampled_mag) - -# #Calculate total delay -# fractional_offset = peak_upsampled_idx / upsample_factor -# total_idx = start + fractional_offset - -# #Convert to time -# zero_lag_index = np.where(lags == 0)[0][0] -# final_lag_samples = total_idx - zero_lag_index - -# if debug_plot: -# plt.figure(figsize=(12, 5)) - -# # Plot 1: The full coarse correlation -# plt.subplot(1, 2, 1) -# plt.plot(lags, mag) -# plt.axvline(x=lags[coarse_idx], color='r', linestyle='--', alpha=0.7) -# plt.title("Full Coarse Correlation") -# plt.xlabel("Lags (Samples)") -# plt.ylabel("Magnitude") - -# # Plot 2: The zoomed-in interpolated peak -# plt.subplot(1, 2, 2) -# # Create a sub-sample x-axis for the upsampled window -# upsampled_lags = np.linspace(lags[start], lags[end-1], len(upsampled_mag)) -# plt.plot(upsampled_lags, upsampled_mag, label="Interpolated Curve") - -# plt.plot(lags[start:end], mag[start:end], 'ko', label="Coarse Samples") - -# precise_lag_val = lags[start] + fractional_offset -# plt.axvline(x=precise_lag_val, color='r', linestyle='--', label=f"True Peak: {precise_lag_val:.2f}") - -# plt.title(f"Interpolated Peak (100x)") -# plt.xlabel("Lags (Samples)") -# plt.legend() -# plt.tight_layout() -# plt.show() - - -# return final_lag_samples / fs - -def calculate_precise_delay(rx_signal, ref_signal, fs, debug_plot=False): - """ - Calculates sub-sample delay using precise Parabolic Interpolation. - """ - corr = scipy.signal.correlate(rx_signal, ref_signal, mode='full') - lags = scipy.signal.correlation_lags(len(rx_signal), len(ref_signal), mode='full') - - mag = np.abs(corr) - peak_idx = np.argmax(mag) - - # Parabolic Interpolation Formula - if peak_idx > 0 and peak_idx < len(mag) - 1: - alpha = mag[peak_idx - 1] - beta = mag[peak_idx] - gamma = mag[peak_idx + 1] - - # Calculate the fractional shift (-0.5 to +0.5 samples) - fractional_shift = 0.5 * (alpha - gamma) / (alpha - 2*beta + gamma) - else: - fractional_shift = 0.0 - - precise_lag = lags[peak_idx] + fractional_shift - - if debug_plot: - plt.figure(figsize=(8, 5)) - - # Plot 10 samples on either side of the peak - start = max(0, peak_idx - 10) - end = min(len(mag), peak_idx + 10) - - plt.plot(lags[start:end], mag[start:end], 'ko-', label="Coarse Samples") - plt.axvline(x=precise_lag, color='r', linestyle='--', label=f"Parabolic Peak: {precise_lag:.2f}") - - plt.title("Parabolic Interpolation Peak") - plt.xlabel("Lags (Samples)") - plt.ylabel("Correlation Magnitude") - plt.legend() - plt.grid(True, alpha=0.3) - plt.show() - - return precise_lag / fs - if __name__ == "__main__": main() From 9af048244ab8e89084797233ac579b021f40e16e Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Tue, 31 Mar 2026 15:00:36 -0400 Subject: [PATCH 10/14] update csv unpacking This is the current way that the data needs to be unpacked when using the parabolic interpolation delay calculation method. --- scripts/trilateration.py | 66 +++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 35 deletions(-) diff --git a/scripts/trilateration.py b/scripts/trilateration.py index 4e6c0910..949f8592 100644 --- a/scripts/trilateration.py +++ b/scripts/trilateration.py @@ -11,24 +11,23 @@ RX_COORDS = np.array([ [0.0, 0.0], # ANchor - [0.4699, 0.08255], # RX 1 - [0.53975, 0.14605], # RX 2 - [0.5461, 0.3048] # rx 3 + [0.508, 0.137], # RX 2 + [0.0, 0.615] # RX 3 ]) -TX_TRUE = np.array([0.565, 0.906]) +TX_TRUE = np.array([0.270, 0.970]) def load_tdoa_from_csvs(): - base_dir = "./experiments/multilat_data1" + base_dir = "./experiments/verification_data" - df_rx2_ch0 = pd.read_csv(os.path.join(base_dir, "rx1_channel0.csv")) - df_rx2_ch1 = pd.read_csv(os.path.join(base_dir, "rx1_channel1.csv")) + df_rx2_ch0 = pd.read_csv(os.path.join(base_dir, "rx2_channel0.csv")) + df_rx2_ch1 = pd.read_csv(os.path.join(base_dir, "rx2_channel1.csv")) - df_rx3_ch0 = pd.read_csv(os.path.join(base_dir, "rx2_channel0.csv")) - df_rx3_ch1 = pd.read_csv(os.path.join(base_dir, "rx2_channel1.csv")) + df_rx3_ch0 = pd.read_csv(os.path.join(base_dir, "rx3_channel0.csv")) + df_rx3_ch1 = pd.read_csv(os.path.join(base_dir, "rx3_channel1.csv")) - df_rx4_ch0 = pd.read_csv(os.path.join(base_dir, "rx3_channel0.csv")) - df_rx4_ch1 = pd.read_csv(os.path.join(base_dir, "rx3_channel1.csv")) + #df_rx4_ch0 = pd.read_csv(os.path.join(base_dir, "rx3_channel0.csv")) + #df_rx4_ch1 = pd.read_csv(os.path.join(base_dir, "rx3_channel1.csv")) col_name = 'delay0' @@ -37,31 +36,18 @@ def load_tdoa_from_csvs(): raw_rover_t1_ns = df_rx2_ch0[col_name].values # Ch 0 raw_anchor_t2_ns = df_rx3_ch1[col_name].values # Ch 1 raw_rover_t2_ns = df_rx3_ch0[col_name].values # Ch 0 - raw_anchor_t3_ns = df_rx4_ch0[col_name].values # Ch 1 - raw_rover_t3_ns = df_rx4_ch1[col_name].values # Ch 0 - # calc tdoa - raw_dt_1_ns = raw_rover_t1_ns - raw_anchor_t1_ns - raw_dt_2_ns = raw_rover_t2_ns - raw_anchor_t2_ns - raw_dt_3_ns = raw_rover_t3_ns - raw_anchor_t3_ns - - # correct hardware delays - HARDWARE_BIAS_NS = 3.463 - bias_corrected_dt_1 = raw_dt_1_ns - HARDWARE_BIAS_NS - bias_corrected_dt_2 = raw_dt_2_ns - HARDWARE_BIAS_NS - bias_corrected_dt_3 = raw_dt_3_ns - HARDWARE_BIAS_NS - - # correct SDR clock slip - dt_1_ns = (bias_corrected_dt_1 + 5.0) % 10.0 - 5.0 - dt_2_ns = (bias_corrected_dt_2 + 5.0) % 10.0 - 5.0 - dt_3_ns = (bias_corrected_dt_3 + 5.0) % 10.0 - 5.0 - # convert to seconds - dt_1_sec = dt_1_ns * 1e-9 - dt_2_sec = dt_2_ns * 1e-9 - dt_3_sec = dt_3_ns * 1e-9 - - return np.column_stack((dt_1_sec, dt_2_sec, dt_3_sec)) + # calc tdoa + raw_dt_1_ns = (raw_rover_t1_ns - raw_anchor_t1_ns) * 1e9 + raw_dt_2_ns = (raw_rover_t2_ns - raw_anchor_t2_ns) * 1e9 + #raw_dt_3_ns = raw_rover_t3_ns - raw_anchor_t3_ns + # + HARDWARE_BIAS_NS = 3.461 + dt_1_sec = (raw_dt_1_ns - HARDWARE_BIAS_NS) * 1e-9 + dt_2_sec = (raw_dt_2_ns - HARDWARE_BIAS_NS) * 1e-9 + + return np.column_stack((dt_1_sec, dt_2_sec)) @@ -80,6 +66,11 @@ def main(): estimated_positions = [] errors = [] + for i in range(num_packets): + delay_diffs_sec = all_delay_diffs[i] + print(f"Packet {i}: tdoa_sec={delay_diffs_sec}, tdoa_m={delay_diffs_sec * C}") + break # just check first packet + for i in range(num_packets): delay_diffs_sec = all_delay_diffs[i] @@ -89,8 +80,9 @@ def main(): args=(RX_COORDS, delay_diffs_sec), method='lm' ) + print(f"Packet {i}: cost={result.cost:.2e}, x={result.x}") - if result.success: + if result.cost < 1e-6: est_x, est_y = result.x estimated_positions.append([est_x, est_y]) error_m = np.sqrt((est_x - TX_TRUE[0])**2 + (est_y - TX_TRUE[1])**2) @@ -99,6 +91,10 @@ def main(): estimated_positions = np.array(estimated_positions) errors = np.array(errors) + if len(estimated_positions) == 0: + print("No succesfful solves") + return + centroid_x, centroid_y = np.mean(estimated_positions, axis=0) mean_error = np.mean(errors) centroid_error = np.sqrt((centroid_x - TX_TRUE[0])**2 + (centroid_y - TX_TRUE[1])**2) From cdd2e48e154258647618eb3b9a1b85515c25af18 Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Tue, 31 Mar 2026 15:04:57 -0400 Subject: [PATCH 11/14] clean: remove debugging prints --- scripts/trilateration.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/scripts/trilateration.py b/scripts/trilateration.py index 949f8592..22bd602d 100644 --- a/scripts/trilateration.py +++ b/scripts/trilateration.py @@ -26,23 +26,17 @@ def load_tdoa_from_csvs(): df_rx3_ch0 = pd.read_csv(os.path.join(base_dir, "rx3_channel0.csv")) df_rx3_ch1 = pd.read_csv(os.path.join(base_dir, "rx3_channel1.csv")) - #df_rx4_ch0 = pd.read_csv(os.path.join(base_dir, "rx3_channel0.csv")) - #df_rx4_ch1 = pd.read_csv(os.path.join(base_dir, "rx3_channel1.csv")) - col_name = 'delay0' - raw_anchor_t1_ns = df_rx2_ch1[col_name].values # Ch 1 raw_rover_t1_ns = df_rx2_ch0[col_name].values # Ch 0 raw_anchor_t2_ns = df_rx3_ch1[col_name].values # Ch 1 raw_rover_t2_ns = df_rx3_ch0[col_name].values # Ch 0 - # calc tdoa raw_dt_1_ns = (raw_rover_t1_ns - raw_anchor_t1_ns) * 1e9 raw_dt_2_ns = (raw_rover_t2_ns - raw_anchor_t2_ns) * 1e9 - #raw_dt_3_ns = raw_rover_t3_ns - raw_anchor_t3_ns - # + HARDWARE_BIAS_NS = 3.461 dt_1_sec = (raw_dt_1_ns - HARDWARE_BIAS_NS) * 1e-9 dt_2_sec = (raw_dt_2_ns - HARDWARE_BIAS_NS) * 1e-9 @@ -66,11 +60,6 @@ def main(): estimated_positions = [] errors = [] - for i in range(num_packets): - delay_diffs_sec = all_delay_diffs[i] - print(f"Packet {i}: tdoa_sec={delay_diffs_sec}, tdoa_m={delay_diffs_sec * C}") - break # just check first packet - for i in range(num_packets): delay_diffs_sec = all_delay_diffs[i] @@ -80,7 +69,6 @@ def main(): args=(RX_COORDS, delay_diffs_sec), method='lm' ) - print(f"Packet {i}: cost={result.cost:.2e}, x={result.x}") if result.cost < 1e-6: est_x, est_y = result.x From a9fd6f73eb257e1a9819270783bb56e18f09e1c8 Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Tue, 31 Mar 2026 15:07:42 -0400 Subject: [PATCH 12/14] reste coords --- scripts/trilateration.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/trilateration.py b/scripts/trilateration.py index 22bd602d..2a21f294 100644 --- a/scripts/trilateration.py +++ b/scripts/trilateration.py @@ -11,11 +11,11 @@ RX_COORDS = np.array([ [0.0, 0.0], # ANchor - [0.508, 0.137], # RX 2 - [0.0, 0.615] # RX 3 + [0.0, 0.0], # RX 2 + [0.0, 0.0] # RX 3 ]) -TX_TRUE = np.array([0.270, 0.970]) +TX_TRUE = np.array([0.0, 0.0]) def load_tdoa_from_csvs(): base_dir = "./experiments/verification_data" From a2b392bf0b8b7b3c503ede9ad8f07af6372b78b3 Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Tue, 31 Mar 2026 15:09:40 -0400 Subject: [PATCH 13/14] add comments --- scripts/experiment_scripts/unpack_experiment.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/experiment_scripts/unpack_experiment.py b/scripts/experiment_scripts/unpack_experiment.py index 966f4f46..33fd1816 100644 --- a/scripts/experiment_scripts/unpack_experiment.py +++ b/scripts/experiment_scripts/unpack_experiment.py @@ -22,12 +22,11 @@ REF_DELAY_DATA_PATH = "./metadata/ref_delay_calc.json" PERFORMANCE_DATA_PATH = "./data_files/ofdm_performance.json" USRP_CONFIG_PATH = "./configs/usrp_settings.yaml" -UNPACKED_DATA_CSV = "./experiments/unpacked_data3/rx3_channel1.csv" - - - -data_dir = Path("/home/guoyixu/OFDM_Sense/EXPERIMENTS/synthetic_trilateration2/trilat1_rx3_x0cm_y61_5cm_archive/channel1") +#----- MODIFY ------ +UNPACKED_DATA_CSV = "./experiments/unpacked_data3/rx3_channel1.csv" # ./experiments/[EXPERIMENT NAME]/[UNPACKED NAME.csv] +data_dir = Path("/home/guoyixu/OFDM_Sense/EXPERIMENTS/synthetic_trilateration2/trilat1_rx3_x0cm_y61_5cm_archive/channel1") #PATH TO RAW DATA TO BE UNPACKED +# ----------------- for dat_file in data_dir.glob("*.dat"): print(dat_file) From 47ce9d32e81335c2d6c4793add91bd7148618bfa Mon Sep 17 00:00:00 2001 From: osherlock1 Date: Tue, 31 Mar 2026 15:15:39 -0400 Subject: [PATCH 14/14] add comments --- scripts/experiment_scripts/unpack_experiment.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/experiment_scripts/unpack_experiment.py b/scripts/experiment_scripts/unpack_experiment.py index 33fd1816..1d19d187 100644 --- a/scripts/experiment_scripts/unpack_experiment.py +++ b/scripts/experiment_scripts/unpack_experiment.py @@ -24,7 +24,10 @@ USRP_CONFIG_PATH = "./configs/usrp_settings.yaml" #----- MODIFY ------ +# directory to where the unpacked data will be saved to. Specifiy the name of the csv in path + no files will automatically be created you have to make those manually UNPACKED_DATA_CSV = "./experiments/unpacked_data3/rx3_channel1.csv" # ./experiments/[EXPERIMENT NAME]/[UNPACKED NAME.csv] + +#Path to the raw data that needs to be unpacked data_dir = Path("/home/guoyixu/OFDM_Sense/EXPERIMENTS/synthetic_trilateration2/trilat1_rx3_x0cm_y61_5cm_archive/channel1") #PATH TO RAW DATA TO BE UNPACKED # -----------------