diff --git a/scripts/delay/calc_delay.py b/scripts/delay/calc_delay.py index 1e7c8c11..22d599c4 100644 --- a/scripts/delay/calc_delay.py +++ b/scripts/delay/calc_delay.py @@ -1,163 +1,34 @@ -import subprocess import numpy as np import os -import matplotlib.pyplot as plt -from scipy.interpolate import interp1d import json -from ofdm.channel import delay -from ofdm.modulation import qam -from ofdm.config import OFDMConfig -from ofdm.core import sync import scipy -import scipy.signal import argparse -from ofdm.utils import usrp +from ofdm.utils import usrp +from ofdm.modulation import qam +from ofdm.config import OFDMConfig +from ofdm.channel.delay import matched_filter_calc, scale_rx_signal, calculate_sub_sample_delay_parabolic, calculate_sub_sample_delay_zp #Config CALIBRATION_PATH = "metadata/calibration.json" RX_DATA_PATH = "data_files/rand_ofdm_packet.dat" TX_REF_PATH = "data_files/rand_ofdm_packet_ref.json" #Define OFDM Configuration -ofdm_conf = OFDMConfig() +OFDM_CONF = OFDMConfig() STORE_REF_PATH = "./metadata/ref_delay_calc.json" STORE_NO_REF_PATH = "./metadata/delay_calc.json" USRP_CONFIG_PATH = "./configs/usrp_settings.yaml" START_IDX_PATH = "./data_files/ofdm_performance.json" - - - C = scipy.constants.c #Speed of light -def calc_delay_with_ref(): - """ - Calculate delay between tx and rx using a wired references. - """ - #Get constant - with open(CALIBRATION_PATH, 'r') as f: - cali_data = json.load(f) - CONSTANT = cali_data['constant'] - - #Unpack Sivers RX and TX data - raw_rx_data = np.fromfile("data_files/rand_ofdm_packet_rx.01.dat", dtype=np.complex64) - - #Unpack wired RX data - wired_rx_data = np.fromfile("data_files/rand_ofdm_packet_rx.00.dat", dtype=np.complex64) - - #Unpack TX Pilot symbol - with open("data_files/rand_ofdm_packet_ref.json", "r") as f: - ref_data = json.load(f) - - #Get Tx pilot symbol - tx_pilot = np.array(ref_data['pilot_ref_real']) + 1j * np.array(ref_data['pilot_ref_imag']) - - #Scale raw_rx_data - scaled_wireless_rx = scale_rx_signal(raw_rx_data=raw_rx_data) - scaled_wired_rx = scale_rx_signal(raw_rx_data=wired_rx_data) - - #Upsampel Data - rx_wireless_upsampled = upsample(scaled_wireless_rx, scale_factor = 100) - rx_wired_upsampled = upsample(scaled_wired_rx, scale_factor=100) - tx_upsampled = upsample(tx_pilot, scale_factor=100) - - #Calculate Matched Filter Delay for wireless - z_wireless, lags_wireless = delay.matched_filter_calc(rx_iq = rx_wireless_upsampled, ref_iq=tx_upsampled, fs = (ofdm_conf.FS * 100)) - z_mag_wireless = np.abs(z_wireless) - - #Find Peak of correlation - peak_idx_wireless = np.argmax(z_mag_wireless) - fine_delay_wireless = lags_wireless[peak_idx_wireless] - - #Calculate Matched Filter Delay for wired - z_wired, lags_wired = delay.matched_filter_calc(rx_iq=rx_wired_upsampled, ref_iq=tx_upsampled,fs = (ofdm_conf.FS * 100)) - z_mag_wired = np.abs(z_wired) - - #Fine peak of correlatiokn - peak_idx_wired = np.argmax(z_mag_wired) - fine_delay_wired = lags_wired[peak_idx_wired] - - #print(f"Wired Delay: {fine_delay_wired}, Wireless Delay: {fine_delay_wireless}") - - #print(f"Wired Delay - Wireless Delay = {fine_delay_wired - fine_delay_wireless}") - caled_delay = ((fine_delay_wireless - fine_delay_wired) * 1e9) - print(f"Delay: {caled_delay:.5f}ns") - - #Calculate Distance - #print(f"Constant used is: {CONSTANT}") - raw_distance = ((fine_delay_wireless - fine_delay_wired) * C) - CONSTANT - print(f"Distance: {raw_distance:.5f}") - print(f"{caled_delay:5f},{raw_distance:.5f}") - - json_data = { - "delay":caled_delay, - "raw_distance":raw_distance, - "mode": "ref" - } - - os.makedirs(os.path.dirname(STORE_REF_PATH), exist_ok=True) - with open(STORE_REF_PATH, "w") as f: - json.dump(json_data, f, indent=2) - print(f"Stored delay calc with ref to {STORE_REF_PATH}") - - - - -def calc_delay(rx_path:str, channel:str, start_idx:list, do_DebugPlot:bool = False): - """ - Calculate delay for all cahnnels - """ - # - - #Unpack Sivers RX and TX data - raw_rx_data = np.fromfile(rx_path, dtype=np.complex64) - coarse_sample_idx = start_idx[1] - rx_data = clean_rx(raw_rx_data, coarse_sample_idx) - - #Unpack TX Pilot symbol - with open("data_files/rand_ofdm_packet_ref.json", "r") as f: - ref_data = json.load(f) - - #Get Tx pilot symbol - tx_pilot = np.array(ref_data['pilot_ref_real']) + 1j * np.array(ref_data['pilot_ref_imag']) - - #Scale raw_rx_data - scaled_wireless_rx = scale_rx_signal(raw_rx_data=rx_data) - - - #Calculate Matched Filter Delay for wireless - fine_delay_sec = calculate_precise_delay( - rx_signal=scaled_wireless_rx, - ref_signal=tx_pilot, - fs=ofdm_conf.FS, - debug_plot=do_DebugPlot - ) - - coarse_delay_sec = coarse_sample_idx / ofdm_conf.FS - - total_abosule_delay_sec = coarse_delay_sec + fine_delay_sec - - - #Calculatiosn - calced_delay_ns = ((total_abosule_delay_sec) * 1e9) - - print(f"Channel {channel}:") - print(f"--Delay: {calced_delay_ns:.5f}ns") - return calced_delay_ns - - - - def clean_rx(rx_raw:np.ndarray, start_idx:int)->np.ndarray: """ Removes leading and trialing zeros from the signal """ - - # ------ Prepare RX signal -------- with open(TX_REF_PATH, 'r') as f: ref_data = json.load(f) - # Get total samples in packet - sym_len = ofdm_conf.N + ofdm_conf.CP_LEN + sym_len = OFDM_CONF.N + OFDM_CONF.CP_LEN total_symbols = 1 + 1 + ref_data["n_data_symb"] total_samples = sym_len * total_symbols @@ -170,6 +41,21 @@ def clean_rx(rx_raw:np.ndarray, start_idx:int)->np.ndarray: return rx_raw[start:end] +def unpack_json(json_file_name:str)->dict: + json_file_path = f"data_files/{json_file_name}" + with open(json_file_path, "r") as f: + data = json.load(f) + print(f"[Success] Unpacked {json_file_path}") + return data + + +def binary_ref_to_iq(binary_string:str, n_samples:int)->np.ndarray: + full_string = "".join(binary_string) + word_len = 4 + binary_word_list = np.array([full_string[i:word_len + i] for i in range(0 ,len(full_string), word_len)]) + iq_array = [qam.binary_to_iq(word) for word in binary_word_list] + return np.array(iq_array) * np.sqrt(10) + def main(): parser = argparse.ArgumentParser() @@ -178,36 +64,30 @@ def main(): parser.add_argument("--file", type=str, help="Specify specific file to unpack") args = parser.parse_args() + usrp_conf = usrp.load_config(USRP_CONFIG_PATH) with open(CALIBRATION_PATH, 'r') as f: cali_data = json.load(f) constants = cali_data['constants'] - with open(START_IDX_PATH, 'r') as f: ofdm_performance_data = json.load(f) - start_idx_list = ofdm_performance_data['start_idx'] - + with open(TX_REF_PATH, "r") as f: + ref_data = json.load(f) + tx_pilot = np.array(ref_data['pilot_ref_real']) + 1j * np.array(ref_data['pilot_ref_imag']) + calced_delays = [] - usrp_conf = usrp.load_config(USRP_CONFIG_PATH) rx_channel_idx = usrp_conf.rx_channel_idx.replace(",", "") - if not args.file: for channel in rx_channel_idx: path = f"./data_files/rand_ofdm_packet_rx.0{channel}.dat" - calced_delays.append(calc_delay( - do_DebugPlot=args.plot, - rx_path=path, - start_idx=start_idx_list[int(channel)], - channel=channel)) + rx_data = scale_rx_signal(clean_rx(np.fromfile(path, dtype=np.complex64), start_idx_list[int(channel)][1])) + delay = calculate_sub_sample_delay_parabolic(rx_signal=rx_data, ref_signal=tx_pilot, fs = OFDM_CONF.FS) * 1e9 # convert to ns + calced_delays.append(delay) + print(f"Channel{channel} delay: {delay:.1f}ns") else: path = args.file - calced_delays.append(calc_delay( - do_DebugPlot=args.plot, - rx_path=path, - start_idx=start_idx_list[0], - channel=0 - )) - + rx_data = scale_rx_signal(clean_rx(np.fromfile(path, dtype=np.complex64), start_idx_list[int(0)][1])) + calced_delays.append(calculate_sub_sample_delay_parabolic(rx_signal=rx_data, ref_signal=tx_pilot, fs = OFDM_CONF.FS)) json_data = { "delays":calced_delays, @@ -219,196 +99,5 @@ def main(): json.dump(json_data, f, indent=2) print(f"Stored delay calc to {STORE_NO_REF_PATH}") - - - -def upsample(raw_data:np.ndarray, scale_factor:int = 100)->np.ndarray: - """ - Upsamples raw data based on the scale factor for matched filter delay esiamtion - - Args: - raw_data: Raw RX or Tx pilot data to be upsampled - scale_factor: Multiplication factor for upsamples i.ee to upsample from 100Mhz to 10Ghz use scale_factor = 100 - - Returns: - np.ndarray of upsampled data - """ - N = len(raw_data) - K = scale_factor - N_padded = N * K - - #Convert to freq - freq = np.fft.fftshift(np.fft.fft(raw_data)) - - total_zeros = N_padded - N #Calculate total number of zeros for upsampling - zeros_side = np.zeros(total_zeros // 2) #Get the number of requied zeros to append and prepend to original data - - freq_padded = np.concatenate([zeros_side, freq, zeros_side]) - freq_ready = np.fft.ifftshift(freq_padded) - - upsampled = np.fft.ifft(freq_ready) * K - return upsampled - - -def unpack_json(json_file_name:str)->dict: - """ - Unpacks a json file and returns the json dictionary - - Args: - json_file_name: name of file in the data_files dir (do not include data_files/) - - Returns: - json data dictionary - """ - json_file_path = f"data_files/{json_file_name}" - with open(json_file_path, "r") as f: - data = json.load(f) - print(f"[Success] Unpacked {json_file_path}") - return data - -def binary_ref_to_iq(binary_string:str, n_samples:int)->np.ndarray: - - full_string = "".join(binary_string) - - #Parse String into 4 bit words - word_len = 4 - binary_word_list = np.array([full_string[i:word_len + i] for i in range(0 ,len(full_string), word_len)]) - - #Convert to IQ - iq_array = [qam.binary_to_iq(word) for word in binary_word_list] - return np.array(iq_array) * np.sqrt(10) - -def scale_rx_signal(raw_rx_data:np.ndarray)->np.ndarray: - max_val = np.max(np.abs(raw_rx_data)) - - if max_val > 0: - scale_factor = 0.9 / max_val - scaled_rx_data = raw_rx_data * scale_factor - return scaled_rx_data - - -# def calculate_precise_delay(rx_signal, ref_signal, fs, upsample_factor=100, debug_plot:bool = False): -# """ -# Calculates delay using coarse correlation zero-padded FFT interpolation -# """ -# corr = scipy.signal.correlate(rx_signal, ref_signal, mode='full') -# lags = scipy.signal.correlation_lags(len(rx_signal), len(ref_signal), mode='full') - -# # Find coarse peak -# mag = np.abs(corr) -# coarse_idx = np.argmax(mag) - -# #Get small window -# radius = 16 -# start = max(0, coarse_idx - radius) -# end = min(len(corr), coarse_idx + radius) - -# window = corr[start:end] - -# #Zero padded interpolation -# window_fft = np.fft.fft(window) -# window_fft_shifted = np.fft.fftshift(window_fft) - -# #Zero pad -# n_original = len(window) -# n_padded = n_original * upsample_factor -# n_zeros = n_padded - n_original - -# zeros_half = np.zeros(n_zeros // 2, dtype=complex) -# fft_padded_shifted = np.concatenate([ -# zeros_half, -# window_fft_shifted, -# zeros_half -# ]) - -# #IFFT -# fft_padded_ready = np.fft.ifftshift(fft_padded_shifted) -# window_upsampled= np.fft.ifft(fft_padded_ready) * upsample_factor - -# #Find precise peak -# upsampled_mag = np.abs(window_upsampled) -# peak_upsampled_idx = np.argmax(upsampled_mag) - -# #Calculate total delay -# fractional_offset = peak_upsampled_idx / upsample_factor -# total_idx = start + fractional_offset - -# #Convert to time -# zero_lag_index = np.where(lags == 0)[0][0] -# final_lag_samples = total_idx - zero_lag_index - -# if debug_plot: -# plt.figure(figsize=(12, 5)) - -# # Plot 1: The full coarse correlation -# plt.subplot(1, 2, 1) -# plt.plot(lags, mag) -# plt.axvline(x=lags[coarse_idx], color='r', linestyle='--', alpha=0.7) -# plt.title("Full Coarse Correlation") -# plt.xlabel("Lags (Samples)") -# plt.ylabel("Magnitude") - -# # Plot 2: The zoomed-in interpolated peak -# plt.subplot(1, 2, 2) -# # Create a sub-sample x-axis for the upsampled window -# upsampled_lags = np.linspace(lags[start], lags[end-1], len(upsampled_mag)) -# plt.plot(upsampled_lags, upsampled_mag, label="Interpolated Curve") - -# plt.plot(lags[start:end], mag[start:end], 'ko', label="Coarse Samples") - -# precise_lag_val = lags[start] + fractional_offset -# plt.axvline(x=precise_lag_val, color='r', linestyle='--', label=f"True Peak: {precise_lag_val:.2f}") - -# plt.title(f"Interpolated Peak (100x)") -# plt.xlabel("Lags (Samples)") -# plt.legend() -# plt.tight_layout() -# plt.show() - - -# return final_lag_samples / fs - -def calculate_precise_delay(rx_signal, ref_signal, fs, debug_plot=False): - """ - Calculates sub-sample delay using precise Parabolic Interpolation. - """ - corr = scipy.signal.correlate(rx_signal, ref_signal, mode='full') - lags = scipy.signal.correlation_lags(len(rx_signal), len(ref_signal), mode='full') - - mag = np.abs(corr) - peak_idx = np.argmax(mag) - - # Parabolic Interpolation Formula - if peak_idx > 0 and peak_idx < len(mag) - 1: - alpha = mag[peak_idx - 1] - beta = mag[peak_idx] - gamma = mag[peak_idx + 1] - - # Calculate the fractional shift (-0.5 to +0.5 samples) - fractional_shift = 0.5 * (alpha - gamma) / (alpha - 2*beta + gamma) - else: - fractional_shift = 0.0 - - precise_lag = lags[peak_idx] + fractional_shift - - if debug_plot: - plt.figure(figsize=(8, 5)) - - # Plot 10 samples on either side of the peak - start = max(0, peak_idx - 10) - end = min(len(mag), peak_idx + 10) - - plt.plot(lags[start:end], mag[start:end], 'ko-', label="Coarse Samples") - plt.axvline(x=precise_lag, color='r', linestyle='--', label=f"Parabolic Peak: {precise_lag:.2f}") - - plt.title("Parabolic Interpolation Peak") - plt.xlabel("Lags (Samples)") - plt.ylabel("Correlation Magnitude") - plt.legend() - plt.grid(True, alpha=0.3) - plt.show() - - return precise_lag / fs - if __name__ == "__main__": main() diff --git a/scripts/experiment_scripts/unpack_experiment.py b/scripts/experiment_scripts/unpack_experiment.py index 966f4f46..1d19d187 100644 --- a/scripts/experiment_scripts/unpack_experiment.py +++ b/scripts/experiment_scripts/unpack_experiment.py @@ -22,12 +22,14 @@ REF_DELAY_DATA_PATH = "./metadata/ref_delay_calc.json" PERFORMANCE_DATA_PATH = "./data_files/ofdm_performance.json" USRP_CONFIG_PATH = "./configs/usrp_settings.yaml" -UNPACKED_DATA_CSV = "./experiments/unpacked_data3/rx3_channel1.csv" +#----- MODIFY ------ +# directory to where the unpacked data will be saved to. Specifiy the name of the csv in path + no files will automatically be created you have to make those manually +UNPACKED_DATA_CSV = "./experiments/unpacked_data3/rx3_channel1.csv" # ./experiments/[EXPERIMENT NAME]/[UNPACKED NAME.csv] - - -data_dir = Path("/home/guoyixu/OFDM_Sense/EXPERIMENTS/synthetic_trilateration2/trilat1_rx3_x0cm_y61_5cm_archive/channel1") +#Path to the raw data that needs to be unpacked +data_dir = Path("/home/guoyixu/OFDM_Sense/EXPERIMENTS/synthetic_trilateration2/trilat1_rx3_x0cm_y61_5cm_archive/channel1") #PATH TO RAW DATA TO BE UNPACKED +# ----------------- for dat_file in data_dir.glob("*.dat"): print(dat_file) diff --git a/scripts/trilateration.py b/scripts/trilateration.py index 4e6c0910..2a21f294 100644 --- a/scripts/trilateration.py +++ b/scripts/trilateration.py @@ -11,57 +11,37 @@ RX_COORDS = np.array([ [0.0, 0.0], # ANchor - [0.4699, 0.08255], # RX 1 - [0.53975, 0.14605], # RX 2 - [0.5461, 0.3048] # rx 3 + [0.0, 0.0], # RX 2 + [0.0, 0.0] # RX 3 ]) -TX_TRUE = np.array([0.565, 0.906]) +TX_TRUE = np.array([0.0, 0.0]) def load_tdoa_from_csvs(): - base_dir = "./experiments/multilat_data1" + base_dir = "./experiments/verification_data" - df_rx2_ch0 = pd.read_csv(os.path.join(base_dir, "rx1_channel0.csv")) - df_rx2_ch1 = pd.read_csv(os.path.join(base_dir, "rx1_channel1.csv")) + df_rx2_ch0 = pd.read_csv(os.path.join(base_dir, "rx2_channel0.csv")) + df_rx2_ch1 = pd.read_csv(os.path.join(base_dir, "rx2_channel1.csv")) - df_rx3_ch0 = pd.read_csv(os.path.join(base_dir, "rx2_channel0.csv")) - df_rx3_ch1 = pd.read_csv(os.path.join(base_dir, "rx2_channel1.csv")) - - df_rx4_ch0 = pd.read_csv(os.path.join(base_dir, "rx3_channel0.csv")) - df_rx4_ch1 = pd.read_csv(os.path.join(base_dir, "rx3_channel1.csv")) + df_rx3_ch0 = pd.read_csv(os.path.join(base_dir, "rx3_channel0.csv")) + df_rx3_ch1 = pd.read_csv(os.path.join(base_dir, "rx3_channel1.csv")) col_name = 'delay0' - raw_anchor_t1_ns = df_rx2_ch1[col_name].values # Ch 1 raw_rover_t1_ns = df_rx2_ch0[col_name].values # Ch 0 raw_anchor_t2_ns = df_rx3_ch1[col_name].values # Ch 1 raw_rover_t2_ns = df_rx3_ch0[col_name].values # Ch 0 - raw_anchor_t3_ns = df_rx4_ch0[col_name].values # Ch 1 - raw_rover_t3_ns = df_rx4_ch1[col_name].values # Ch 0 # calc tdoa - raw_dt_1_ns = raw_rover_t1_ns - raw_anchor_t1_ns - raw_dt_2_ns = raw_rover_t2_ns - raw_anchor_t2_ns - raw_dt_3_ns = raw_rover_t3_ns - raw_anchor_t3_ns - - # correct hardware delays - HARDWARE_BIAS_NS = 3.463 - bias_corrected_dt_1 = raw_dt_1_ns - HARDWARE_BIAS_NS - bias_corrected_dt_2 = raw_dt_2_ns - HARDWARE_BIAS_NS - bias_corrected_dt_3 = raw_dt_3_ns - HARDWARE_BIAS_NS - - # correct SDR clock slip - dt_1_ns = (bias_corrected_dt_1 + 5.0) % 10.0 - 5.0 - dt_2_ns = (bias_corrected_dt_2 + 5.0) % 10.0 - 5.0 - dt_3_ns = (bias_corrected_dt_3 + 5.0) % 10.0 - 5.0 - - # convert to seconds - dt_1_sec = dt_1_ns * 1e-9 - dt_2_sec = dt_2_ns * 1e-9 - dt_3_sec = dt_3_ns * 1e-9 - - return np.column_stack((dt_1_sec, dt_2_sec, dt_3_sec)) + raw_dt_1_ns = (raw_rover_t1_ns - raw_anchor_t1_ns) * 1e9 + raw_dt_2_ns = (raw_rover_t2_ns - raw_anchor_t2_ns) * 1e9 + + HARDWARE_BIAS_NS = 3.461 + dt_1_sec = (raw_dt_1_ns - HARDWARE_BIAS_NS) * 1e-9 + dt_2_sec = (raw_dt_2_ns - HARDWARE_BIAS_NS) * 1e-9 + + return np.column_stack((dt_1_sec, dt_2_sec)) @@ -90,7 +70,7 @@ def main(): method='lm' ) - if result.success: + if result.cost < 1e-6: est_x, est_y = result.x estimated_positions.append([est_x, est_y]) error_m = np.sqrt((est_x - TX_TRUE[0])**2 + (est_y - TX_TRUE[1])**2) @@ -99,6 +79,10 @@ def main(): estimated_positions = np.array(estimated_positions) errors = np.array(errors) + if len(estimated_positions) == 0: + print("No succesfful solves") + return + centroid_x, centroid_y = np.mean(estimated_positions, axis=0) mean_error = np.mean(errors) centroid_error = np.sqrt((centroid_x - TX_TRUE[0])**2 + (centroid_y - TX_TRUE[1])**2) diff --git a/src/ofdm/channel/delay.py b/src/ofdm/channel/delay.py index 01e9645d..d1422230 100644 --- a/src/ofdm/channel/delay.py +++ b/src/ofdm/channel/delay.py @@ -1,14 +1,118 @@ import numpy as np - - +import scipy +import scipy.signal def matched_filter_calc(rx_iq:np.ndarray, ref_iq:np.ndarray, fs:float) -> np.ndarray: """ - Calculated the delay through matched filter delay estimation + Calculate the delay through matched filter delay estimation + + Args: + rx_iq: Recieved IQ data + ref_iq: Reference IQ data + fs: Sampling Frequency + Returns: + z: Complex corss-correlation output + lags: Time lag corresponding to each element of z, in seconds """ z = np.correlate(rx_iq, ref_iq, mode="full") - lags = np.arange(-len(rx_iq) + 1, len(ref_iq)) / fs + return z, lags + +def zero_padded_upsample(raw_data:np.ndarray, scale_factor:int = 100)->np.ndarray: + """ + Upsamples raw data based on the scale factor for sub sampling matched filter delay estimation + + Args: + raw_data: Raw pilot symbol data to be upsampled + scale_factor: Multiplication factor for umsampling i.e. to upsample from 100MHz to 10Ghz use 100 - return z, lags \ No newline at end of file + Returns: + Upsampled np.ndarray data + """ + n_data = len(raw_data) + n_padded = n_data * scale_factor + + freq = np.fft.fftshift(np.fft.fft(raw_data)) + n_total_zeros = n_padded - n_data + zeros_one_side = np.zeros(n_total_zeros // 2) + freq_zero_padded_shifted = np.fft.ifftshift(np.concatenate([zeros_one_side, freq, zeros_one_side])) + return np.fft.ifft(freq_zero_padded_shifted) * scale_factor + +def scale_rx_signal(raw_rx_data:np.ndarray)->np.ndarray: + """ + Scales an array to +- 0.9 + """ + max_val = np.max(np.abs(raw_rx_data)) + if max_val > 0: + return raw_rx_data * (0.9) / max_val + return raw_rx_data + +def _correlate(rx_signal, ref_signal): + corr = scipy.signal.correlate(rx_signal, ref_signal, mode='full') + lags = scipy.signal.correlation_lags(len(rx_signal), len(ref_signal), mode='full') + peak_idx = np.argmax(np.abs(corr)) + return corr, lags, peak_idx + +def calculate_sub_sample_delay_parabolic(rx_signal:np.ndarray, ref_signal:np.ndarray, fs:float)->float: + """ + Calculates sub-sample delay using Parabolic Interpolation + + Correlates rx_signal against ref_signal, then upsamples using parabolic interpolation. + + Args: + rx_signal: Recieved IQ data + ref_signal: Known reference signal (e.g. TX pilot symbol) + fs: Sampling Frequency in HZ + + Returns: + Estimated delay in seconds + """ + corr, lags, peak_idx = _correlate(rx_signal, ref_signal) + corr_mag = np.abs(corr) + + if peak_idx > 0 and peak_idx < len(corr) - 1: + alpha = corr_mag[peak_idx - 1] + beta = corr_mag[peak_idx] + gamma = corr_mag[peak_idx + 1] + fractional_shift = 0.5 * (alpha - gamma) / (alpha - 2*beta + gamma) + else: + fractional_shift = 0.0 + + precise_lag = lags[peak_idx] + fractional_shift + return precise_lag / fs + + +def calculate_sub_sample_delay_zp(rx_signal:np.ndarray, ref_signal:np.ndarray, fs:float, upsample_factor:int = 100, search_radius:int = 16)->float: + """ + Calculates sub sample delay using zero-padded FFT interpolation + + Correlates rx_signal against ref_signal, then upsamples a window around the correlation peak via zero-padding + in the frequency domain to achieve sub-sample timing resolution + + Args: + rx_signal: Recieved IQ samples + ref_signal: Reference IQ samples (e.g. known TX pilot symbol) + fs: Sampling frequency in HZ + upsample_factor: Interpolation factor applied to the correlation window + search_radius: Nuber of samples on each side of the coarse peak to search + + Returns: + Estimated delay in seconds + """ + corr, lags, peak_idx = _correlate(rx_signal, ref_signal) + + start = max(0, peak_idx - search_radius) + end = min(len(corr), peak_idx + search_radius) + search_window = corr[start:end] + + window_upsampled = zero_padded_upsample(search_window, scale_factor=upsample_factor) + upsampled_peak_idx = np.argmax(window_upsampled) + + fractional_offset = upsampled_peak_idx / upsample_factor + final_idx = start + fractional_offset + + zero_lag_index = np.where(lags == 0)[0][0] + final_lag_samples = final_idx - zero_lag_index + return final_lag_samples / fs + diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..f3ac0e58 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,43 @@ +import numpy as np +import pytest + +from ofdm.config import OFDMConfig +from ofdm.utils.generator import DataGenerator +from ofdm.core.preamble import generate_pilot_symbol, generate_pilot_vals +from ofdm.core.payload import generate_ofdm_data_symbol +from ofdm.core.waveform import create_time_domain_symbol + +@pytest.fixture(scope="session") +def cfg(): + return OFDMConfig(seed=42) + +@pytest.fixture(scope="session") +def active_bins(cfg): + return np.unique(np.concatenate([cfg.data_carriers], cfg.pilot_carriers)) + +@pytest.fixture(scope="session") +def pilot_values(cfg): + return generate_pilot_vals(config=cfg) + +@pytest.fixture(scope="session") +def pilot_symbol_freq(cfg): + return generate_pilot_symbol(config=cfg) + +@pytest.fixture(scope="session") +def data_symbol_freq(cfg, pilot_vals): + gen = DataGenerator(config=cfg, seed = 0) + iq_tx, bits_tx = gen.generate_random_payload() + symbol = generate_ofdm_data_symbol(config=cfg, iq_data=iq_tx, pilot_data=pilot_vals) + return symbol, iq_tx, bits_tx + +@pytest.fixture(scope="session") +def pilot_symbol_time(cfg, pilot_symbol_freq): + return create_time_domain_symbol(pilot_symbol_freq, cp_len=cfg.CP_LEN) + +@pytest.fixture(scope="session") +def data_symbol_time(cfg, data_symbol_freq): + return create_time_domain_symbol(data_symbol_freq, cp_len=cfg.CP_LEN) + +@pytest.fixture +def generator(cfg): + return DataGenerator(config=cfg, seed=0) \ No newline at end of file diff --git a/tests/test_delay.py b/tests/test_delay.py index a294f8b5..be3b9065 100644 --- a/tests/test_delay.py +++ b/tests/test_delay.py @@ -1,25 +1,133 @@ import numpy as np -from ofdm.channel.delay import matched_filter_calc +from ofdm.channel.delay import matched_filter_calc, zero_padded_upsample, scale_rx_signal, calculate_sub_sample_delay_parabolic, calculate_sub_sample_delay_zp from ofdm.config import OFDMConfig import pytest def test_matched_filter_output_length(): rx = np.random.randn(100) + 1j * np.random.randn(100) ref = np.random.randn(100) + 1j * np.random.randn(100) - z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, -fs=100e6) + z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, fs=100e6) assert len(z) == len(rx) + len(ref) - 1 def test_matched_filter_lags_length(): rx = np.random.randn(100) + 1j * np.random.randn(100) ref = np.random.randn(100) + 1j * np.random.randn(100) - z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, -fs=100e6) + z, lags = matched_filter_calc(rx_iq=rx, ref_iq=ref, fs=100e6) assert len(lags) == len(z) def test_matched_filter_peak_at_zero_delay(): signal = np.random.randn(100) + 1j * np.random.randn(100) - z, lags = matched_filter_calc(rx_iq=signal, ref_iq=signal, -fs=100e6) + z, lags = matched_filter_calc(rx_iq=signal, ref_iq=signal, fs=100e6) peak_lag = lags[np.argmax(np.abs(z))] - assert peak_lag == 0.0 \ No newline at end of file + assert peak_lag == 0.0 + + +class TestUpsample: + def test_output_length(self): + x = np.ones(10, dtype=complex) + out = zero_padded_upsample(x, scale_factor=4) + assert len(out) == 40 + + def test_scale_factor_one_unchanged(self): + x = np.random.randn(64) + 1j * np.random.randn(64) + out = zero_padded_upsample(x, scale_factor=1) + np.testing.assert_allclose(np.abs(out), np.abs(x), atol=1e-10) + + def test_pure_tone_preserved(self): + N = 64 + f = 5 + t = np.arange(N) + x = np.exp(1j * 2 * np.pi * f * t / N) + out = zero_padded_upsample(x, scale_factor=4) + assert len(out) == N * 4 + +class TestScaleRxSignal: + def test_max_value_is_09(self): + x = np.array([0.5, 1.0, -2.0, 0.3], dtype=float) + out = scale_rx_signal(x) + assert np.max(np.abs(out)) == pytest.approx(0.9) + + def test_zero_signal_unchanged(self): + x = np.zeros(10) + out = scale_rx_signal(x) + np.testing.assert_array_equal(out, x) + +class TestCalculateSubSampleDelayParabolic: + + def test_zero_delay(self): + ref = np.random.randn(256) + 1j*np.random.randn(256) + delay = calculate_sub_sample_delay_parabolic(ref, ref, fs=100e6) + assert abs(delay) < 1 / 100e6 + + def test_known_integer_delay(self): + ref = np.zeros(256, dtype=complex) + ref[10:20] = 1.0 + rx = np.zeros(256, dtype=complex) + rx[20:30] = 1.0 + delay = calculate_sub_sample_delay_parabolic(rx_signal=rx, ref_signal=ref, fs=100e6) + expected = 10 / 100e6 + np.testing.assert_allclose(delay, expected, atol=2/100e6) + + def test_returns_float(self): + ref = np.zeros(256, dtype=complex) + delay = calculate_sub_sample_delay_parabolic(ref, ref, fs=100e6) + assert isinstance(delay, float) + + def test_sub_sample_delay(self): + fs = 100e6 + true_delay_samples = 10.3 + + t = np.arange(256) / fs + freq = 1e6 + ref = np.sin(2 * np.pi * freq * t).astype(complex) + N = len(ref) + freqs = np.fft.fftfreq(N, d=1/fs) + rx = np.fft.ifft(np.fft.fft(ref) * np.exp(-1j * 2 * np.pi * freqs * true_delay_samples / fs)) + delay = calculate_sub_sample_delay_parabolic(rx_signal=rx, ref_signal=ref, fs=fs) + expected = true_delay_samples / fs + np.testing.assert_allclose(delay, expected, atol=0.1/fs) + +class TestCalculateSubSampleDelayZeroPadded: + + def test_zero_delay(self): + ref = np.random.randn(256) + 1j * np.random.randn(256) + delay = calculate_sub_sample_delay_zp(ref, ref, fs=100e6) + assert abs(delay) < 1 / 100e6 + + def test_known_integer_delay(self): + ref = np.zeros(256, dtype=complex) + ref[10:20] = 1.0 + rx = np.zeros(256, dtype=complex) + rx[20:30] = 1.0 + delay = calculate_sub_sample_delay_zp(rx_signal=rx, ref_signal=ref, fs=100e6) + expected = 10 / 100e6 + np.testing.assert_allclose(delay, expected, atol=2/100e6) + + def test_upsample_factor_affects_precision(self): + # higher upsample factor should give equal or better accuracy + ref = np.zeros(256, dtype=complex) + ref[10:20] = 1.0 + rx = np.zeros(256, dtype=complex) + rx[20:30] = 1.0 + expected = 10 / 100e6 + + delay_low = calculate_sub_sample_delay_zp(rx, ref, fs=100e6, upsample_factor=10) + delay_high = calculate_sub_sample_delay_zp(rx, ref, fs=100e6, upsample_factor=100) + + err_low = abs(delay_low - expected) + err_high = abs(delay_high - expected) + assert err_high <= err_low + + def test_sub_sample_delay(self): + fs = 100e6 + true_delay_samples = 10.3 + + t = np.arange(256) / fs + freq = 1e6 + ref = np.sin(2 * np.pi * freq * t).astype(complex) + N = len(ref) + freqs = np.fft.fftfreq(N, d=1/fs) + rx = np.fft.ifft(np.fft.fft(ref) * np.exp(-1j * 2 * np.pi * freqs * true_delay_samples / fs)) + delay = calculate_sub_sample_delay_zp(rx_signal=rx, ref_signal=ref, fs=fs) + expected = true_delay_samples / fs + np.testing.assert_allclose(delay, expected, atol=0.1/fs) \ No newline at end of file