diff --git a/CHANGELOG-unreleased.md b/CHANGELOG-unreleased.md index 7f1f2eeab..15b66dca7 100644 --- a/CHANGELOG-unreleased.md +++ b/CHANGELOG-unreleased.md @@ -14,6 +14,8 @@ the released changes. - `pintk` Diff/Unc calculation now uses post-fit uncertainties. ### Added - Plot whitened DM residuals in pintk. +- Time-domain solar wind GP noise components: ridge, squared-exponential, Matérn, and quasi-periodic kernels +- Regression tests for noise design-matrix caching, including multi-basis coverage (red, DMGP, SWGP, and chromatic GP) ### Fixed - `WidebandTOAFitter` raises a warning if the model has correlated errors (It used to give wrong results before). - Fixed bug where "include_bipm" flag was being ignored when loading Fermi TOAs with weights, now defaults to using EPHEM, CLOCK and PLANET_SHAPIRO from the timing model diff --git a/src/pint/fitter.py b/src/pint/fitter.py index 47ccc17a7..f9abbbb19 100644 --- a/src/pint/fitter.py +++ b/src/pint/fitter.py @@ -94,7 +94,7 @@ ) from pint.residuals import Residuals, WidebandTOAResiduals from pint.toa import TOAs -from pint.utils import FTest, normalize_designmatrix +from pint.utils import FTest, get_phiinv, normalize_designmatrix __all__ = [ "Fitter", @@ -1341,7 +1341,9 @@ def step(self) -> np.ndarray: else: M, params, units = self.model.full_designmatrix(self.fitter.toas) M, norm = normalize_designmatrix(M, params) - phiinv = 1 / self.model.full_basis_weight(self.fitter.toas) / norm**2 + phi = self.model.full_basis_weight(self.fitter.toas) + phiinv = get_phiinv(phi) + phiinv = (phiinv / norm).T / norm # normalize the phi^-1 matrix Nvec = ( self.model.scaled_toa_uncertainty(self.fitter.toas).to_value(u.s) ** 2 ) @@ -1902,7 +1904,9 @@ def fit_toas( else: M, params, units = self.model.full_designmatrix(self.toas) M, norm = normalize_designmatrix(M, params) - phiinv = 1 / self.model.full_basis_weight(self.toas) / norm**2 + phi = self.model.full_basis_weight(self.toas) + phiinv = get_phiinv(phi) + phiinv = (phiinv / norm).T / norm # normalize the phi^-1 matrix Nvec = self.model.scaled_toa_uncertainty(self.toas).to(u.s).value ** 2 mtcm, mtcy = get_gls_mtcm_mtcy(phiinv, Nvec, M, residuals) @@ -2205,13 +2209,19 @@ def fit_toas( phi = self.model.noise_model_basis_weight(self.toas) phiinv = np.zeros(M.shape[1]) if Mn is not None and phi is not None: - phiinv = np.concatenate((phiinv, 1 / phi)) + n_tm = M.shape[1] new_d_matrix = combine_design_matrices_by_param(d_matrix, Mn) M, params, units = ( new_d_matrix.matrix, new_d_matrix.derivative_params, new_d_matrix.param_units, ) + if np.ndim(phi) == 1: + phiinv = np.zeros(M.shape[1]) + phiinv[n_tm:] = 1 / phi + else: + phiinv = np.zeros((M.shape[1], M.shape[1])) + phiinv[n_tm:, n_tm:] = get_phiinv(phi) ntmpar = self.model.ntmpar @@ -2228,11 +2238,17 @@ def fit_toas( mtcy = np.dot(cm.T, residuals) # mtcm, mtcy = get_gls_mtcm_mtcy_fullcov(cov, M, residuals) else: - phiinv /= norm**2 + if np.ndim(phiinv) == 1: + phiinv /= norm**2 + else: + phiinv = (phiinv / norm).T / norm Nvec = self.scaled_all_sigma() ** 2 cinv = 1 / Nvec mtcm = np.dot(M.T, cinv[:, None] * M) - mtcm += np.diag(phiinv) + if np.ndim(phiinv) == 1: + mtcm += np.diag(phiinv) + else: + mtcm += phiinv mtcy = np.dot(M.T, cinv * residuals) # mtcm, mtcy = get_gls_mtcm_mtcy(phiinv, Nvec, M, residuals) @@ -2249,7 +2265,11 @@ def fit_toas( if full_cov: chi2 = np.dot(newres, scipy.linalg.cho_solve(cf, newres)) else: - chi2 = np.dot(newres, cinv * newres) + np.dot(xhat, phiinv * xhat) + if np.ndim(phiinv) == 1: + prior_chi2 = np.dot(xhat, phiinv * xhat) + else: + prior_chi2 = np.dot(xhat, phiinv @ xhat) + chi2 = np.dot(newres, cinv * newres) + prior_chi2 # compute absolute estimates, normalized errors, covariance matrix dpars = xhat / norm @@ -2631,7 +2651,10 @@ def get_gls_mtcm_mtcy( """ cinv = 1 / Nvec mtcmplain = np.dot(M.T, cinv[:, None] * M) - mtcm = mtcmplain + np.diag(phiinv) + if np.ndim(phiinv) == 1: + mtcm = mtcmplain + np.diag(phiinv) + else: + mtcm = mtcmplain + phiinv mtcy = np.dot(M.T, cinv * residuals) return (mtcm, mtcy) if not return_mtcmplain else (mtcm, mtcy, mtcmplain) diff --git a/src/pint/models/__init__.py b/src/pint/models/__init__.py index d13ceec8c..9c2af2c7a 100644 --- a/src/pint/models/__init__.py +++ b/src/pint/models/__init__.py @@ -49,6 +49,7 @@ PLRedNoise, ScaleToaError, ScaleDmError, + TimeDomainSWNoise, ) from pint.models.phase_offset import PhaseOffset from pint.models.piecewise import PiecewiseSpindown diff --git a/src/pint/models/model_builder.py b/src/pint/models/model_builder.py index 84a8f4c13..15d8148a6 100644 --- a/src/pint/models/model_builder.py +++ b/src/pint/models/model_builder.py @@ -274,6 +274,13 @@ def _validate_components(self): if v.category == "pulsar_system": # The pulsar system will be selected by parameter BINARY continue + elif self.all_components.components[superset].category == v.category: + # Same-category components are mutually exclusive alternatives. + # A superset component in the same category is selected over a + # subset when its extra parameters appear in the parfile, so + # there is no ambiguity. (E.g. SolarWindProxyRegression vs + # SolarWindDispersion, discriminated by SWPRBETA1.) + continue else: raise ComponentConflict(m) diff --git a/src/pint/models/noise_model.py b/src/pint/models/noise_model.py index 6db4c8da0..01a151e6e 100644 --- a/src/pint/models/noise_model.py +++ b/src/pint/models/noise_model.py @@ -6,10 +6,18 @@ import astropy.units as u import numpy as np +from scipy import interpolate from loguru import logger as log from pint import DMconst, dmu -from pint.models.parameter import Parameter, floatParameter, intParameter, maskParameter +from pint.models.parameter import ( + Parameter, + floatParameter, + intParameter, + maskParameter, + prefixParameter, + strParameter, +) from pint.models.timing_model import Component from pint.toa import TOAs @@ -76,6 +84,65 @@ def get_wideband_noise_basis(self, toas): return np.vstack((M_toa, M_dm)) +def project_basis_covariance(U: np.ndarray, Phi: np.ndarray) -> np.ndarray: + """Project basis-space covariance to data-space covariance.""" + if np.ndim(Phi) == 1: + return np.dot(U * Phi[None, :], U.T) + return np.dot(U, np.dot(Phi, U.T)) + + +def get_tdb_seconds(tbl) -> np.ndarray: + """Return TOA TDB times in seconds as float64.""" + return np.asarray((tbl["tdbld"].quantity * u.day).to(u.s).value, dtype=np.float64) + + +def _add_tdsw_node_component(model, node, index=None): + """Add one TDSWNODE_ prefix parameter to a time-domain SW noise component.""" + dct = model.get_prefix_mapping_component("TDSWNODE_") + if index is None: + available = [ + idx + for idx, par_name in dct.items() + if getattr(model, par_name).value is None + ] + if len(available) > 0: + index = int(np.min(available)) + else: + index = np.max(list(dct.keys())) + 1 + i = f"{int(index):04d}" + + if isinstance(node, u.quantity.Quantity): + node = node.to_value(u.day) + + if int(index) in dct: + par = getattr(model, dct[int(index)]) + if par.value is not None: + raise ValueError( + f"Index '{index}' is already in use in this model. Please choose another" + ) + par.value = node + else: + model.add_param( + prefixParameter( + name=f"TDSWNODE_{i}", + units="day", + value=node, + description="Interpolation node for time-domain SW noise basis (MJD).", + parameter_type="float", + convert_tcb2tdb=False, + ) + ) + model.setup() + + node_map = model.get_prefix_mapping_component("TDSWNODE_") + nset = sum( + getattr(model, node_name).value is not None for _, node_name in node_map.items() + ) + if nset >= 2: + model.validate() + return index + + class ScaleToaError(WhiteNoiseComponent): """Correct the reported TOA uncertainties. The corrections account for imperfections in the TOA measurement and pulse jitter. @@ -432,7 +499,7 @@ def get_noise_basis(self, toas: TOAs) -> np.ndarray: A quantization matrix maps TOAs to observing epochs. """ tbl = toas.table - t = (tbl["tdbld"].quantity * u.day).to(u.s).value + t = get_tdb_seconds(tbl) ecorrs = self.get_ecorrs() umats = [] for ec in ecorrs: @@ -458,7 +525,7 @@ def get_noise_weights(self, toas: TOAs, nweights: int = None) -> np.ndarray: """ ecorrs = self.get_ecorrs() if nweights is None: - ts = (toas.table["tdbld"].quantity * u.day).to(u.s).value + ts = get_tdb_seconds(toas.table) nweights = [ get_ecorr_nweights(ts[ec.select_toa_mask(toas)]) for ec in ecorrs ] @@ -598,7 +665,7 @@ def get_time_frequencies(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: """Return the frequencies of the noise model""" tbl = toas.table - t = (tbl["tdbld"].quantity * u.day).to(u.s).value + t = get_tdb_seconds(tbl) T = ( np.max(t) - np.min(t) if self.TNDMTSPAN.quantity is None @@ -653,7 +720,7 @@ def pl_dm_basis_weight_pair(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: def pl_dm_cov_matrix(self, toas: TOAs) -> np.ndarray: Fmat, phi = self.pl_dm_basis_weight_pair(toas) - return np.dot(Fmat * phi[None, :], Fmat.T) + return project_basis_covariance(Fmat, phi) class PLSWNoise(CorrelatedNoiseComponent): @@ -763,7 +830,7 @@ def get_time_frequencies(self, toas: TOAs) -> np.ndarray: """Return the frequencies of the noise model""" tbl = toas.table - t = (tbl["tdbld"].quantity * u.day).to(u.s).value + t = get_tdb_seconds(tbl) T = np.max(t) - np.min(t) (_, _, n_lin, n_log, f_min_ratio) = self.get_plc_vals() @@ -817,7 +884,7 @@ def pl_sw_basis_weight_pair(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: def pl_sw_cov_matrix(self, toas: TOAs) -> np.ndarray: Fmat, phi = self.pl_sw_basis_weight_pair(toas) - return np.dot(Fmat * phi[None, :], Fmat.T) + return project_basis_covariance(Fmat, phi) class PLChromNoise(CorrelatedNoiseComponent): @@ -942,7 +1009,7 @@ def get_time_frequencies(self, toas: TOAs) -> np.ndarray: """Return the frequencies of the noise model""" tbl = toas.table - t = (tbl["tdbld"].quantity * u.day).to(u.s).value + t = get_tdb_seconds(tbl) T = ( np.max(t) - np.min(t) if self.TNCHROMTSPAN.quantity is None @@ -998,7 +1065,7 @@ def pl_chrom_basis_weight_pair(self, toas: TOAs) -> np.ndarray: def pl_chrom_cov_matrix(self, toas: TOAs) -> np.ndarray: Fmat, phi = self.pl_chrom_basis_weight_pair(toas) - return np.dot(Fmat * phi[None, :], Fmat.T) + return project_basis_covariance(Fmat, phi) class PLRedNoise(CorrelatedNoiseComponent): @@ -1141,7 +1208,7 @@ def get_time_frequencies(self, toas: TOAs) -> np.ndarray: """Return the frequencies of the noise model""" tbl = toas.table - t = (tbl["tdbld"].quantity * u.day).to(u.s).value + t = get_tdb_seconds(tbl) T = ( np.max(t) - np.min(t) if self.TNREDTSPAN.quantity is None @@ -1190,7 +1257,479 @@ def pl_rn_basis_weight_pair(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: def pl_rn_cov_matrix(self, toas: TOAs) -> np.ndarray: Fmat, phi = self.pl_rn_basis_weight_pair(toas) - return np.dot(Fmat * phi[None, :], Fmat.T) + return project_basis_covariance(Fmat, phi) + + +class TimeDomainSWNoise(NoiseComponent): + """Time-domain solar wind noise model with a selectable GP kernel. + + Solar wind electron number density fluctuations produce dispersive delays + that vary in time. This component models those fluctuations as a Gaussian + Process (GP) in the time domain, using a linear interpolation basis + (controlled by ``TDSWDT`` or explicit ``TDSWNODE_*`` parameters) and a + kernel covariance function selected via ``TDSWKERNEL``. + + The basis matrix projects the GP onto each TOA using a solar-wind geometry + factor so that the effective delay at frequency :math:`f` is + + .. math:: + + \\delta t(f) = \\frac{\\mathrm{DM}_{\\odot}(t)}{f^2} \\cdot K_{\\mathrm{DM}} + + where :math:`\\mathrm{DM}_{\\odot}(t)` is the line-of-sight integral of the + solar wind electron density evaluated at each epoch by the parent + ``SolarWindDispersion`` component. + + Kernel definitions + ------------------ + Let :math:`\\tau = |t_i - t_j|` (in days at the interpolation nodes) and + :math:`\\sigma = 10^{\\mathtt{TDSWLOGSIG}}`, + :math:`\\ell = 10^{\\mathtt{TDSWLOGELL}}` (days). + + **ridge** (white-noise / diagonal) + + .. math:: + + K(t_i, t_j) = \\sigma^2 \\,\\delta_{ij} + + Only ``TDSWLOGSIG`` is required. + + **sqexp** (squared-exponential / RBF) + + .. math:: + + K(\\tau) = \\sigma^2 \\exp\\!\\left(-\\frac{\\tau^2}{2\\ell^2}\\right) + + Requires ``TDSWLOGSIG``, ``TDSWLOGELL``. + + **matern** (Matérn with half-integer smoothness :math:`\\nu`) + + For :math:`\\nu = 1/2`: + + .. math:: + + K(\\tau) = \\sigma^2 \\exp\\!\\left(-\\frac{\\tau}{\\ell}\\right) + + For :math:`\\nu = 3/2`: + + .. math:: + + K(\\tau) = \\sigma^2 \\left(1 + \\frac{\\sqrt{3}\\,\\tau}{\\ell}\\right) + \\exp\\!\\left(-\\frac{\\sqrt{3}\\,\\tau}{\\ell}\\right) + + For :math:`\\nu = 5/2`: + + .. math:: + + K(\\tau) = \\sigma^2 \\left(1 + \\frac{\\sqrt{5}\\,\\tau}{\\ell} + + \\frac{5\\tau^2}{3\\ell^2}\\right) + \\exp\\!\\left(-\\frac{\\sqrt{5}\\,\\tau}{\\ell}\\right) + + Requires ``TDSWLOGSIG``, ``TDSWLOGELL``. ``TDSWNU`` (default 1.5) selects + :math:`\\nu \\in \\{0.5, 1.5, 2.5\\}`. + + **quasi_periodic** (squared-exponential × periodic envelope) + + Let :math:`\\Gamma_p = 10^{\\mathtt{TDSWLOGGAMP}}` and + :math:`P = 10^{\\mathtt{TDSWLOGP}}` (years): + + .. math:: + + K(\\tau) = \\sigma^2 + \\exp\\!\\left(-\\frac{\\tau^2}{2\\ell^2}\\right) + \\exp\\!\\left(-\\Gamma_p \\sin^2\\!\\frac{\\pi\\tau}{P}\\right) + + Requires ``TDSWLOGSIG``, ``TDSWLOGELL``, ``TDSWLOGGAMP``, ``TDSWLOGP``. + + The kernel is evaluated at the interpolation nodes and the resulting weight + matrix is projected back onto the TOA residuals via the linear interpolation + basis, yielding the full :math:`N_{\\mathrm{TOA}} \\times N_{\\mathrm{TOA}}` + covariance contribution. + + Parameters supported: + + .. paramtable:: + :class: pint.models.noise_model.TimeDomainSWNoise + + Notes + ----- + * ``TimeDomainSWNoise`` requires a ``SolarWindDispersion`` component in the + timing model so that the solar wind geometry factor is available. + * The interpolation basis is built from either a uniform grid with spacing + ``TDSWDT`` (days) or an explicit set of ``TDSWNODE_NNNN`` parameters + (MJD). The two modes are mutually exclusive. + * ``register = False`` — attach this component explicitly via + :meth:`~pint.models.timing_model.TimingModel.add_component`. + + Examples + -------- + Add a time-domain solar wind GP with a Matérn-3/2 kernel to an existing + timing model, using a 14-day interpolation grid: + + >>> from pint.models.timing_model import Component + >>> from pint.models.noise_model import TimeDomainSWNoise + >>> all_components = Component.component_types + >>> if "SolarWindDispersion" not in model.components: + ... sw_det = all_components["SolarWindDispersion"]() + ... model.add_component(sw_det, validate=False) + ... model["NE_SW"].quantity = 4.0 + ... model["SWM"] = 1 + ... model["SWP"] = 2.0 + >>> sw_comp = TimeDomainSWNoise() + >>> model.add_component(sw_comp, validate=False) + >>> model["TDSWKERNEL"].value = "matern" + >>> model["TDSWLOGSIG"].value = -8.0 + >>> model["TDSWLOGELL"].value = 1.5 + >>> model["TDSWNU"].value = 1.5 + >>> model["TDSWDT"].value = 14.0 + >>> model["TDSWINTERP_KIND"].value = "linear" + >>> model.validate() + + Notes + ----- + The above example will appear in the par file as:: + + TDSWKERNEL matern + TDSWDT 14.0 + TDSWLOGSIG -8.0 + TDSWLOGELL 1.5 + TDSWNU 1.5 + TDSWINTERP_KIND linear + + To use explicit interpolation nodes instead of a uniform grid, set + ``TDSWNODE_NNNN`` parameters (MJD) via :meth:`add_tdsw_node_component`. + Kernel-specific parameters must be configured **before** adding nodes because + :meth:`add_tdsw_node_component` calls ``validate()`` internally once two or + more nodes are present:: + + >>> sw_comp = TimeDomainSWNoise() + >>> model.add_component(sw_comp, validate=False) + >>> model["TDSWKERNEL"].value = "ridge" + >>> model["TDSWLOGSIG"].value = -7.0 + >>> for i, mjd in enumerate(node_mjd_array, start=1): + ... sw_comp.add_tdsw_node_component(mjd, index=i) + >>> model.validate() + + References + ---------- + Stochastic solar wind modeling is introduced in PTA literature by Hazboun et al. 2022. + Time-domain Gaussian processes are introduced to the PTA literature in Hazboun et al. 2026. + - Hazboun et al. 2022 [1]_ + - Hazboun et al. 2026 [2]_ + + .. [1] https://iopscience.iop.org/article/10.3847/1538-4357/ac5829 + .. [2] https://iopscience.iop.org/article/10.3847/1538-4357/ae4ee0 + """ + + register = False + category = "SW_noise" + + introduces_correlated_errors = True + introduces_dm_errors = True + is_time_correlated = True + + #: Mapping from kernel name to required and optional parameter names. + KERNEL_PARAMS: dict = { + "ridge": {"required": ["TDSWLOGSIG"], "optional": []}, + "sqexp": {"required": ["TDSWLOGSIG", "TDSWLOGELL"], "optional": []}, + "matern": { + "required": ["TDSWLOGSIG", "TDSWLOGELL"], + "optional": ["TDSWNU"], + }, + "quasi_periodic": { + "required": ["TDSWLOGSIG", "TDSWLOGELL", "TDSWLOGGAMP", "TDSWLOGP"], + "optional": [], + }, + } + + ALLOWED_KERNELS: frozenset = frozenset(KERNEL_PARAMS) + + def __init__(self): + super().__init__() + + self.add_param( + strParameter( + name="TDSWKERNEL", + value="ridge", + description=( + "Kernel for time-domain SW noise GP. " + "Allowed values: 'ridge', 'sqexp', 'matern', 'quasi_periodic'." + ), + ) + ) + + self.add_param( + floatParameter( + name="TDSWDT", + units="day", + aliases=[], + value=30.0, + description="Linear interpolation time step for time-domain SW noise.", + convert_tcb2tdb=False, + ) + ) + + self.add_param( + floatParameter( + name="TDSWLOGSIG", + units="s", + aliases=[], + description="Log10 amplitude of time-domain SW noise kernel.", + convert_tcb2tdb=False, + ) + ) + + self.add_param( + floatParameter( + name="TDSWLOGELL", + units="", + aliases=[], + description=( + "Log10 characteristic length scale for sqexp / matern / " + "quasi_periodic time-domain SW noise (days)." + ), + convert_tcb2tdb=False, + ) + ) + + self.add_param( + floatParameter( + name="TDSWNU", + units="", + aliases=[], + value=1.5, + description="Matern smoothness parameter (supported: 0.5, 1.5, 2.5).", + convert_tcb2tdb=False, + ) + ) + + self.add_param( + floatParameter( + name="TDSWLOGGAMP", + units="", + aliases=[], + description="Log10 mixing parameter for quasi-periodic time-domain SW noise.", + convert_tcb2tdb=False, + ) + ) + + self.add_param( + floatParameter( + name="TDSWLOGP", + units="", + aliases=[], + description="Log10 periodicity of quasi-periodic time-domain SW noise (years).", + convert_tcb2tdb=False, + ) + ) + + self.add_param( + strParameter( + name="TDSWINTERP_KIND", + value="linear", + description="Interpolation kind passed to scipy.interpolate.interp1d.", + ) + ) + + self.add_param( + prefixParameter( + name="TDSWNODE_0001", + units="day", + value=None, + description="Interpolation node for time-domain SW noise basis (MJD).", + parameter_type="float", + convert_tcb2tdb=False, + ) + ) + + self.covariance_matrix_funcs += [self.sw_cov_matrix] + self.basis_funcs += [self.sw_basis_weight_pair] + + def add_tdsw_node_component(self, node, index=None): + """Add a TDSWNODE component. + + Parameters + ---------- + node : float or astropy.units.Quantity + Interpolation node in MJD (days). + index : int or None + Integer index label for the node. If None, increments max index by 1. + """ + return _add_tdsw_node_component(self, node=node, index=index) + + # ------------------------------------------------------------------ + # Validation + # ------------------------------------------------------------------ + + def validate(self): + super().validate() + + kernel = self.TDSWKERNEL.value + if kernel not in self.ALLOWED_KERNELS: + raise ValueError( + f"TimeDomainSWNoise TDSWKERNEL must be one of " + f"{sorted(self.ALLOWED_KERNELS)}, got '{kernel}'." + ) + + # Check that all required parameters for this kernel are set. + required = self.KERNEL_PARAMS[kernel]["required"] + missing = [p for p in required if getattr(self, p).value is None] + if missing: + raise ValueError( + f"TimeDomainSWNoise with kernel='{kernel}' requires " + f"parameter(s) {missing} to be set." + ) + + if kernel == "matern" and self.TDSWNU.value not in (0.5, 1.5, 2.5): + raise ValueError("TimeDomainSWNoise TDSWNU must be one of {0.5, 1.5, 2.5}.") + + allowed_kinds = { + "linear", + "nearest", + "nearest-up", + "zero", + "slinear", + "quadratic", + "cubic", + "previous", + "next", + } + if self.TDSWINTERP_KIND.value not in allowed_kinds: + raise ValueError( + f"TimeDomainSWNoise TDSWINTERP_KIND must be one of {sorted(allowed_kinds)}." + ) + + node_map = self.get_prefix_mapping_component("TDSWNODE_") + node_values = [] + for _, node_name in node_map.items(): + node_val = getattr(self, node_name).value + if node_val is not None: + node_values.append(float(node_val)) + + dt_val = self.TDSWDT.value + has_nodes = len(node_values) > 0 + has_nondefault_dt = dt_val is not None and dt_val != 30.0 + + if has_nodes and has_nondefault_dt: + raise ValueError( + "TimeDomainSWNoise requires exactly one interpolation mode: " + "set TDSWDT or set TDSWNODE_ parameters, but not both." + ) + + if 0 < len(node_values) < 2: + raise ValueError( + "TimeDomainSWNoise requires at least 2 TDSWNODE_ values when using " + "node-based interpolation. Set >=2 TDSWNODE_ parameters." + ) + + if len(node_values) >= 2: + nodes = np.asarray(node_values, dtype=float) + if not np.all(np.isfinite(nodes)): + raise ValueError("TimeDomainSWNoise TDSWNODE_ values must be finite.") + if len(np.unique(nodes)) != len(nodes): + raise ValueError("TimeDomainSWNoise TDSWNODE_ values must be unique.") + else: + if dt_val is not None and dt_val <= 0: + raise ValueError( + "TimeDomainSWNoise TDSWDT must be set to a positive value." + ) + + def _has_nodes(self) -> bool: + """Return True if any TDSWNODE_ parameter is set.""" + node_map = self.get_prefix_mapping_component("TDSWNODE_") + return any( + getattr(self, node_name).value is not None + for _, node_name in node_map.items() + ) + + def _get_nodes(self, toas: TOAs) -> np.ndarray: + """Return sorted interpolation nodes (MJD) from TDSWNODE_ parameters.""" + node_map = self.get_prefix_mapping_component("TDSWNODE_") + nodes = [] + for _, node_name in node_map.items(): + node_par = getattr(self, node_name) + if node_par.value is not None: + nodes.append(float(node_par.value)) + + if len(nodes) >= 2: + return np.array(sorted(nodes), dtype=float) + + raise ValueError( + "TimeDomainSWNoise node interpolation requires at least 2 TDSWNODE_ values." + ) + + def _get_basis_and_nodes(self, toas: TOAs): + """Return ``(Umat, nodes)`` from the linear interpolation basis.""" + tbl = toas.table + t = (tbl["tdbld"].quantity * u.day).to(u.s).value + interp_kind = self.TDSWINTERP_KIND.value + if self._has_nodes(): + nodes_in = self._get_nodes(toas) + Umat, nodes = make_interpolation_basis(t, nodes=nodes_in, kind=interp_kind) + else: + dt = 30.0 if self.TDSWDT.value is None else self.TDSWDT.value + Umat, nodes = make_interpolation_basis(t, dt=dt, kind=interp_kind) + return Umat, nodes + + # ------------------------------------------------------------------ + # Public noise interface + # ------------------------------------------------------------------ + + def get_noise_basis(self, toas: TOAs) -> np.ndarray: + """Return chromatic linear interpolation matrix for time-domain SW noise.""" + freqs = self._parent.barycentric_radio_freq(toas).to(u.MHz) + Umat, _ = self._get_basis_and_nodes(toas) + # Solar wind geometry from pint.models.solar_wind_dispersion.SolarWindDispersion. + # This is the SW DM contribution if n_earth = 1 cm^-3; the GP scales it. + solar_wind_geometry = self._parent.solar_wind_geometry(toas) + dt_DM = (solar_wind_geometry * DMconst / (freqs**2)).value + return Umat * dt_DM[:, None] + + def get_noise_weights(self, toas: TOAs) -> np.ndarray: + """Return GP prior weights for the selected kernel. + + The kernel is controlled by ``TDSWKERNEL``: + + * **ridge** + :math:`K(t_i, t_j) = \\sigma^2 \\delta(t_i - t_j)` + * **sqexp** + :math:`K(t_i, t_j) = \\sigma^2 \\exp\\!\\left(-\\frac{(t_i-t_j)^2}{2\\ell^2}\\right)` + * **matern** + Matern kernel with smoothness :math:`\\nu \\in \\{0.5, 1.5, 2.5\\}` + * **quasi_periodic** + :math:`K_{SE}(t_i,t_j) \\cdot \\exp\\!\\left(-\\Gamma_p \\sin^2\\!\\frac{\\pi(t_i-t_j)}{p}\\right)` + """ + _, nodes = self._get_basis_and_nodes(toas) + kernel = self.TDSWKERNEL.value + log10_sigma = self.TDSWLOGSIG.value + + if kernel == "ridge": + return ridge_kernel(nodes, log10_sigma) + elif kernel == "sqexp": + return se_kernel(nodes, log10_sigma, self.TDSWLOGELL.value) + elif kernel == "matern": + return matern_kernel( + nodes, log10_sigma, self.TDSWLOGELL.value, self.TDSWNU.value + ) + elif kernel == "quasi_periodic": + return periodic_kernel( + nodes, + log10_sigma, + self.TDSWLOGELL.value, + self.TDSWLOGGAMP.value, + self.TDSWLOGP.value, + ) + else: # unreachable after validate() + raise ValueError(f"TimeDomainSWNoise: unknown TDSWKERNEL '{kernel}'") + + def sw_basis_weight_pair(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: + """Return ``(basis, weights)`` for the time-domain SW noise GP.""" + return (self.get_noise_basis(toas), self.get_noise_weights(toas)) + + def sw_cov_matrix(self, toas: TOAs) -> np.ndarray: + """Return the covariance matrix for the time-domain SW noise GP.""" + Umat, phi = self.sw_basis_weight_pair(toas) + return project_basis_covariance(Umat, phi) def get_ecorr_epochs(toas_table: np.ndarray, dt: float = 1, nmin: int = 2) -> List[int]: @@ -1333,7 +1872,60 @@ def _get_loglin_freqs(logmode_, f_min_, n_log, n_lin, T): # Log + linear: nlog log-freqs + nmodes linear-freqs freqs = _get_loglin_freqs(logmode, f_min, nlog, nmodes, Tspan) - return freqs + return np.asarray(freqs, dtype=np.float64) + + +def make_interpolation_basis( + toas, + nodes=None, + dt=None, + kind="linear", +): + """ + Construct an interpolation basis for the given TOAs and interpolation parameters using + scipy.interpolate.interp1d. + + :param toas: array-like + Vector of time series (TOAs) in seconds. + :param nodes: array-like, optional + Vector of interpolation nodes in MJD. If None, nodes are generated from dt. + :param dt: float, optional + Time step in days for generating interpolation nodes if `nodes` is None. + :param kind: str, optional + Interpolation kind passed to scipy.interpolate.interp1d. Default is "linear". + See scipy.interpolate.interp1d documentation for allowed values. + + :return: + M : the achromatic interpolation design matrix of shape (len(toas), n_nodes). + nodes : the interpolation nodes in MJD corresponding to the columns of M. + """ + if nodes is None: + if dt is None: + raise ValueError( + "Must provide either nodes or dt for linear interpolation basis." + ) + t_min, t_max = np.min(toas) / 86400, np.max(toas) / 86400 + nodes = np.arange(t_min, t_max + dt, dt) + nodes = nodes * 86400 # MJD to seconds + basis = np.identity(len(nodes)) + interp = interpolate.interp1d( + nodes, + basis, + kind=kind, + axis=0, + bounds_error=False, + fill_value=0.0, + assume_sorted=True, + ) + M = interp(toas) + # only return non-zero columns for rank reduction + idx = M.sum(axis=0) != 0 + if not np.any(idx): + raise RuntimeError( + "Interpolation basis has no support in the TOA range. Perhaps check units." + ) + + return M[:, idx], nodes[idx] def create_fourier_design_matrix(t, f) -> np.ndarray: @@ -1378,8 +1970,80 @@ def powerlaw( :param f_low_cut: Minimum frequency to include [Hz] :return: Power spectral density """ + f = np.asarray(f, dtype=np.float64) f_low_cut = f_low_cut if f_low_cut is not None else np.min(f) above_fl = np.array(f >= f_low_cut, dtype=float) fyr = (1 / u.year).to_value(u.Hz) return A**2 / 12.0 / np.pi**2 * fyr ** (gamma - 3) * f ** (-gamma) * above_fl + + +def periodic_kernel(nodes, log10_sigma=-7, log10_ell=2, log10_gam_p=0, log10_p=0): + """Quasi-periodic kernel""" + nodes = np.asarray(nodes, dtype=np.float64) + r = np.abs(nodes[None, :] - nodes[:, None]) + + # convert units to seconds + sigma = 10**log10_sigma + l = 10**log10_ell * 86400 + p = 10**log10_p * 3.16e7 + gam_p = 10**log10_gam_p + d = np.eye(r.shape[0]) * (sigma / 50000) ** 2 + K = sigma**2 * np.exp(-(r**2) / 2 / l**2 - gam_p * np.sin(np.pi * r / p) ** 2) + d + return K + + +def se_kernel(nodes, log10_sigma=-7, log10_ell=2): + """ + Squared-exponential kernel + 50000 is a regularization term to prevent numerical instabilities. + """ + nodes = np.asarray(nodes, dtype=np.float64) + r = np.abs(nodes[None, :] - nodes[:, None]) + + # Convert everything into seconds + l = 10**log10_ell * 86400 + sigma = 10**log10_sigma + d = np.eye(r.shape[0]) * (sigma / 50000) ** 2 + K = sigma**2 * np.exp(-(r**2) / 2 / l**2) + d + return K + + +def matern_kernel(nodes, log10_sigma=-7, log10_ell=2, nu=1.5): + """Matern kernel. + + Supports nu values in {0.5, 1.5, 2.5}. + 50000 is a regularization term to prevent numerical instabilities. + + """ + if nu not in (0.5, 1.5, 2.5): + raise ValueError("matern_kernel currently supports nu in {0.5, 1.5, 2.5}.") + + nodes = np.asarray(nodes, dtype=np.float64) + r = np.abs(nodes[None, :] - nodes[:, None]) + + l = 10**log10_ell * 86400 + sigma = 10**log10_sigma + + rr = r / l + if nu == 0.5: + k = np.exp(-rr) + elif nu == 1.5: + c = np.sqrt(3.0) + k = (1.0 + c * rr) * np.exp(-c * rr) + else: # nu == 2.5 + c = np.sqrt(5.0) + k = (1.0 + c * rr + (5.0 / 3.0) * rr**2) * np.exp(-c * rr) + + d = np.eye(r.shape[0]) * (sigma / 50000) ** 2 + return sigma**2 * k + d + + +def ridge_kernel(nodes, log10_sigma=-7): + """Ridge kernel""" + nodes = np.asarray(nodes, dtype=np.float64) + r = np.abs(nodes[None, :] - nodes[:, None]) + + # Convert to seconds + sigma = 10 ** (log10_sigma * 2) + return np.eye(r.shape[0]) * sigma diff --git a/src/pint/models/solar_wind_dispersion.py b/src/pint/models/solar_wind_dispersion.py index ff8147117..1b010eff6 100644 --- a/src/pint/models/solar_wind_dispersion.py +++ b/src/pint/models/solar_wind_dispersion.py @@ -1250,3 +1250,413 @@ def set_ne_sws(self, ne_sws): getattr(self, SWXDM_mapping[ii]).quantity = ( self.conjunction_solar_wind_geometry(p) * ne_sws[j] ).to(u.pc / u.cm**3) + + +class SolarWindProxyRegression(SolarWindDispersion): + """Solar-wind dispersion model with a proxy time-series regression term. + + Subclasses :class:`SolarWindDispersion` and adds a linear regression term + on an external proxy observable (e.g. F10.7 cm radio flux or sunspot + number) to capture the long-term solar-cycle amplitude modulation. The + effective electron density used to compute the DM is: + + .. math:: + + n_E(t) = \\mathrm{NE\\_SW}(t) + \\sum_k \\mathrm{BETA1}_k \\cdot x_k(t - \\mathrm{LAG1}_k) + + where :math:`x_k(t)` is the k-th proxy normalised to zero mean and unit + variance, and the solar-wind DM follows as usual: + + .. math:: + + \\mathrm{DM}_{\\mathrm{SW}}(t) = n_E(t) \\cdot S(t) + + with :math:`S(t)` the path-length geometry factor inherited from the + parent class. ``NE_SW`` retains its physical meaning as the time-averaged + electron density at 1 AU; the proxy slope ``BETA1`` captures the + solar-cycle amplitude in the same cm^-3 units. + + The proxy centring (zero mean) ensures the proxy column is orthogonal to + :math:`S(t)` in the design matrix, preventing degeneracy between ``NE_SW`` + and ``BETA1``. + + Proxy data must be loaded at runtime via :meth:`set_proxy` before any + model evaluation. + + Parameters supported: + + .. paramtable:: + :class: pint.models.solar_wind_dispersion.SolarWindProxyRegression + + Notes + ----- + Multiple proxy series are supported by adding further + ``SWPRBETA1_k`` / ``SWPRLAG1_k`` parameter pairs (k = 2, 3, + ...) and calling ``set_proxy(..., index=k)``. + + The proxy lag derivative is computed by central finite differences with a + step of 0.1 days. + + References + ---------- + - Edwards et al. 2006, MNRAS, 372, 1549 + - Hazboun et al. 2022, ApJ, 929, 39 + """ + + register = False + category = "solar_wind" + + def __init__(self): + super().__init__() + + # Proxy slope: modulation amplitude in cm^-3 per unit of normalised proxy k. + # NE_SW (inherited) serves as the physical intercept; no separate BETA0 is + # needed, and adding one would be perfectly degenerate with NE_SW. + self.add_param( + prefixParameter( + name="SWPRBETA1", + units="cm^-3", + value=0.0, + description="Solar wind proxy regression slope for normalised proxy 1", + unit_template=lambda n: "cm^-3", + description_template=lambda n: ( + f"Solar wind proxy regression slope for normalised proxy {n}" + ), + type_match="float", + tcb2tdb_scale_factor=(const.c * DMconst), + ) + ) + # Optional time lag applied to proxy k before interpolation to TOA epochs. + self.add_param( + prefixParameter( + name="SWPRLAG1", + units="day", + value=0.0, + description="Time lag for proxy time series 1 [days]", + unit_template=lambda n: "day", + description_template=lambda n: ( + f"Time lag for proxy time series {n} [days]" + ), + type_match="float", + tcb2tdb_scale_factor=u.Quantity(1), + ) + ) + + # Internal proxy storage: int index -> dict with keys mjd, vals, + # raw_mean, raw_std. + self._proxy_data = {} + + def set_proxy(self, proxy_mjd, proxy_vals, index=1, smooth_days=0, normalize=True): + """Load a proxy time series into the model. + + The proxy is optionally smoothed with a boxcar filter. If + ``normalize=True``, the mean and std are computed and stored for + later normalization of the interpolated proxy to zero mean / unit + variance, so that ``NE_SW`` retains its physical meaning as the mean + n_E over the time span and ``BETA1`` quantifies the modulation + amplitude in cm^-3. + + Parameters + ---------- + proxy_mjd : array_like + MJD of proxy sample times. + proxy_vals : array_like + Proxy values (e.g. F10.7 cm flux in sfu, sunspot number). + index : int + Which proxy slot to populate. Must match the index of the + corresponding ``SWPRBETA1`` parameter. + smooth_days : int + Boxcar filter width in days applied before normalization. Zero + means no smoothing; 81 is standard for the F10.7 cm 81-day mean. + normalize : bool + If True (default), the proxy is normalized to zero mean and unit + variance at interpolation time using statistics computed from + all proxy samples. The mean and std are stored for potential + conversion back to physical coupling units. + """ + from scipy.ndimage import uniform_filter1d + + proxy_mjd = np.asarray(proxy_mjd, dtype=float) + proxy_vals = np.asarray(proxy_vals, dtype=float) + + if smooth_days > 1: + proxy_vals = uniform_filter1d( + proxy_vals, size=int(smooth_days), mode="nearest" + ) + + if normalize: + raw_mean = float(np.mean(proxy_vals)) + raw_std = float(np.std(proxy_vals)) + if raw_std == 0.0: + raw_std = 1.0 + else: + raw_mean, raw_std = 0.0, 1.0 + + # Store the RAW (pre-normalized) proxy for interpolation. + # Normalization is applied at interpolation time (in _get_proxy_at_toas). + self._proxy_data[index] = { + "mjd": proxy_mjd, + "vals": proxy_vals, # <-- stored raw, not normalized + "raw_mean": raw_mean, + "raw_std": raw_std, + } + + def _get_proxy_at_toas(self, toas, index, lag_days=0.0): + """Interpolate and normalize stored proxy *index* to TOA epochs. + + The raw proxy is interpolated to TOA times, then normalized using the + mean and std computed when set_proxy() was called. + + Parameters + ---------- + toas : pint.toa.TOAs + index : int + lag_days : float + Shift the proxy backward by this many days, i.e. evaluate + x(t - lag). + + Returns + ------- + numpy.ndarray + Shape (n_toa,), dimensionless normalised proxy values. + """ + if index not in self._proxy_data: + raise ValueError( + f"Proxy index {index} not loaded. " + f"Call set_proxy(index={index}) first." + ) + data = self._proxy_data[index] + toas_mjd = np.asarray(toas.table["tdbld"].data, dtype=float) + + # Interpolate raw proxy to TOA times. + proxy_at_toas = np.interp( + toas_mjd - lag_days, + data["mjd"], + data["vals"], + left=data["vals"][0], + right=data["vals"][-1], + ) + + # Normalize using stored mean/std. + raw_mean = data["raw_mean"] + raw_std = data["raw_std"] + if raw_std != 0.0: + return (proxy_at_toas - raw_mean) / raw_std + else: + return proxy_at_toas + + def _proxy_ne(self, toas): + """Return the total proxy contribution to n_E [cm^-3]. + + Computes the sum over all proxy slots: + sum_k BETA1_k * x_k(t - LAG1_k). + """ + BETA1_mapping = self.get_prefix_mapping_component("SWPRBETA1") + LAG1_mapping = self.get_prefix_mapping_component("SWPRLAG1") + ne_proxy = np.zeros(len(toas)) * u.cm**-3 + for k in sorted(BETA1_mapping.keys()): + beta1 = getattr(self, BETA1_mapping[k]).quantity + lag_days = ( + getattr(self, LAG1_mapping[k]).quantity.to_value(u.day) + if k in LAG1_mapping + else 0.0 + ) + xp = self._get_proxy_at_toas(toas, index=k, lag_days=lag_days) + ne_proxy = ne_proxy + beta1 * xp + return ne_proxy + + def solar_wind_dm(self, toas): + """Return the solar-wind DM [pc cm^-3] including the proxy regression term. + + Overrides the parent implementation to add the proxy contribution: + + .. math:: + + n_E(t) = \\mathrm{NE\\_SW}(t) + \\sum_k \\mathrm{BETA1}_k \\cdot x_k(t - \\mathrm{LAG1}_k) + + ``NE_SW(t)`` is the same Taylor-expanded electron density as in + :meth:`SolarWindDispersion.solar_wind_dm`. + """ + ne_sw_terms = self.get_NE_SW_terms() + + if len(ne_sw_terms) == 1: + ne_sw = self.NE_SW.quantity * np.ones(len(toas)) + else: + if any(t.value != 0 for t in ne_sw_terms[1:]): + SWEPOCH = self.SWEPOCH.value + if SWEPOCH is None: + raise ValueError( + f"SWEPOCH not set but some NE_SW derivatives are not zero: {ne_sw_terms}" + ) + dt = (toas["tdbld"] - SWEPOCH) * u.day + dt_value = dt.to_value(u.yr) + else: + dt_value = np.zeros(len(toas), dtype=np.longdouble) + ne_sw = ( + pint.utils.taylor_horner(dt_value, [d.value for d in ne_sw_terms]) + * self.NE_SW.units + ) + + ne_total = ne_sw + self._proxy_ne(toas) + + if np.all(ne_total.value == 0): + return np.zeros(len(toas)) * u.pc / u.cm**3 + + return (ne_total * self.solar_wind_geometry(toas)).to(u.pc / u.cm**3) + + def d_dm_d_swprbeta(self, toas, param_name, acc_delay=None): + """Derivative of DM_SW with respect to BETA1_k. + + dDM/dBETA1_k = x_k(t - LAG1_k) * S(t) + """ + par = getattr(self, param_name) + k = par.index + LAG1_mapping = self.get_prefix_mapping_component("SWPRLAG1") + lag_days = ( + getattr(self, LAG1_mapping[k]).quantity.to_value(u.day) + if k in LAG1_mapping + else 0.0 + ) + xp = self._get_proxy_at_toas(toas, index=k, lag_days=lag_days) + return (self.solar_wind_geometry(toas) * xp).to(u.pc / u.cm**3 / par.units) + + def d_delay_d_swprbeta(self, toas, param_name, acc_delay=None): + """Derivative of delay with respect to BETA1_k. + + Uses the chain rule: d(delay)/d(BETA1) = (DMconst/freq^2) * d(DM)/d(BETA1) + """ + try: + bfreq = self._parent.barycentric_radio_freq(toas) + except AttributeError: + from astropy import log + log.warning("Using topocentric frequency for dedispersion!") + bfreq = toas.table["freq"].quantity + + # Get the DM derivative directly + d_dm_d_beta = self.d_dm_d_swprbeta(toas, param_name) + + # Apply chain rule: delay = DMconst * DM / freq^2 + return DMconst * d_dm_d_beta / bfreq**2.0 + + def d_dm_d_swprlag(self, toas, param_name, acc_delay=None): + """Derivative of DM_SW with respect to LAG1_k via central finite differences. + + The chain rule gives: + + .. math:: + + \\frac{\\partial \\mathrm{DM}}{\\partial \\mathrm{LAG}_k} = + \\mathrm{BETA1}_k \\cdot \\frac{\\partial x_k(t - \\mathrm{LAG}_k)}{\\partial \\mathrm{LAG}_k} + \\cdot S(t) + + The proxy derivative is approximated as: + + .. math:: + + \\frac{\\partial x_k}{\\partial \\mathrm{LAG}} \\approx + \\frac{x_k(t - \\mathrm{LAG} - h) - x_k(t - \\mathrm{LAG} + h)}{2h} + + with h = 0.1 days. + """ + par = getattr(self, param_name) + k = par.index + BETA1_mapping = self.get_prefix_mapping_component("SWPRBETA1") + beta1 = ( + getattr(self, BETA1_mapping[k]).quantity + if k in BETA1_mapping + else 0.0 * u.cm**-3 + ) + lag_days = par.quantity.to_value(u.day) + + h = 0.1 # finite-difference step in days + xp_hi = self._get_proxy_at_toas(toas, index=k, lag_days=lag_days + h) + xp_lo = self._get_proxy_at_toas(toas, index=k, lag_days=lag_days - h) + dxp_dlag = (xp_hi - xp_lo) / (2.0 * h) + + return ( + beta1 * dxp_dlag / u.day * self.solar_wind_geometry(toas) + ).to(u.pc / u.cm**3 / par.units) + + def d_delay_d_swprlag(self, toas, param_name, acc_delay=None): + """Derivative of delay with respect to LAG1_k. + + Uses the chain rule: d(delay)/d(LAG1) = (DMconst/freq^2) * d(DM)/d(LAG1) + """ + try: + bfreq = self._parent.barycentric_radio_freq(toas) + except AttributeError: + from astropy import log + log.warning("Using topocentric frequency for dedispersion!") + bfreq = toas.table["freq"].quantity + + # Get the DM derivative directly + d_dm_d_lag = self.d_dm_d_swprlag(toas, param_name) + + # Apply chain rule: delay = DMconst * DM / freq^2 + return DMconst * d_dm_d_lag / bfreq**2.0 + + def d_dm_d_swp(self, toas, param_name, acc_delay=None): + """Derivative of DM_SW with respect to SWP. + + Overrides the parent to include the proxy contribution in n_E: + + dDM/dSWP = [NE_SW(t) + proxy_ne(t)] * dS(t)/dSWP + """ + d_geom_dp = self.d_solar_wind_geometry_d_swp(toas, param_name) + ne_sw_terms = self.get_NE_SW_terms() + if len(ne_sw_terms) == 1: + ne_sw = self.NE_SW.quantity * np.ones(len(toas)) + else: + if any(t.value != 0 for t in ne_sw_terms[1:]): + SWEPOCH = self.SWEPOCH.value or 0 + dt_value = ((toas["tdbld"] - SWEPOCH) * u.day).to_value(u.yr) + else: + dt_value = np.zeros(len(toas), dtype=np.longdouble) + ne_sw = ( + pint.utils.taylor_horner(dt_value, [d.value for d in ne_sw_terms]) + * self.NE_SW.units + ) + ne_total = ne_sw + self._proxy_ne(toas) + return (ne_total * d_geom_dp).to(u.pc / u.cm**3) + + def setup(self): + super().setup() + + # Register derivatives for the proxy slope and lag parameters. + # Directly check for SWPRBETA1, SWPRBETA2, ... parameters + for param_name in self.params: + if param_name.startswith("SWPRBETA"): + self.register_dm_deriv_funcs(self.d_dm_d_swprbeta, param_name) + self.register_deriv_funcs(self.d_delay_d_swprbeta, param_name) + elif param_name.startswith("SWPRLAG"): + self.register_dm_deriv_funcs(self.d_dm_d_swprlag, param_name) + self.register_deriv_funcs(self.d_delay_d_swprlag, param_name) + + # Override the inherited SWP derivative registration so that it uses + # the overridden d_dm_d_swp that accounts for the proxy contribution. + self.register_dm_deriv_funcs(self.d_dm_d_swp, "SWP") + + def validate(self): + super().validate() + BETA1_mapping = self.get_prefix_mapping_component("SWPRBETA1") + for k in sorted(BETA1_mapping.keys()): + if ( + getattr(self, BETA1_mapping[k]).quantity.value != 0.0 + and k not in self._proxy_data + ): + warn( + f"SWPRBETA1 index {k} is non-zero but no proxy data has " + f"been loaded for that index. Call set_proxy(index={k}) before " + "evaluating the model." + ) + + def print_par(self, format="pint"): + result = super().print_par(format=format) + BETA1_mapping = self.get_prefix_mapping_component("SWPRBETA1") + LAG1_mapping = self.get_prefix_mapping_component("SWPRLAG1") + for k in sorted(BETA1_mapping.keys()): + result += getattr(self, BETA1_mapping[k]).as_parfile_line(format=format) + if k in LAG1_mapping: + result += getattr(self, LAG1_mapping[k]).as_parfile_line(format=format) + return result + diff --git a/src/pint/models/timing_model.py b/src/pint/models/timing_model.py index f07e089de..b93fc559e 100644 --- a/src/pint/models/timing_model.py +++ b/src/pint/models/timing_model.py @@ -1739,7 +1739,7 @@ def toa_covariance_matrix(self, toas: TOAs) -> np.ndarray: N = np.diag(self.scaled_toa_uncertainty(toas).to_value(u.s) ** 2) U = self.noise_model_designmatrix(toas) Phi = self.noise_model_basis_weight(toas) - return N + np.dot(U * Phi[None, :], U.T) if U is not None else N + return N + self._project_basis_covariance(U, Phi) if U is not None else N def dm_covariance_matrix(self, toas: TOAs) -> np.ndarray: """Get the DM covariance matrix for the noise model. The matrix elements have @@ -1774,7 +1774,31 @@ def wideband_covariance_matrix(self, toas: TOAs) -> np.ndarray: N = np.diag(self.scaled_wideband_uncertainty(toas) ** 2) U = self.noise_model_wideband_designmatrix(toas) Phi = self.noise_model_basis_weight(toas) - return N + np.dot(U * Phi[None, :], U.T) if U is not None else N + return N + self._project_basis_covariance(U, Phi) if U is not None else N + + @staticmethod + def _project_basis_covariance(U: np.ndarray, Phi: np.ndarray) -> np.ndarray: + """Project basis-space covariance to data-space covariance. + + Supports both diagonal basis covariance (1D vector of variances) and + full basis covariance (2D matrix). + """ + if np.ndim(Phi) == 1: + return np.dot(U * Phi[None, :], U.T) + return np.dot(U, np.dot(Phi, U.T)) + + @staticmethod + def _block_diag(matrices: List[np.ndarray]) -> np.ndarray: + """Construct a dense block-diagonal matrix from square blocks.""" + size = sum(mat.shape[0] for mat in matrices) + dtype = np.result_type(*[mat.dtype for mat in matrices]) + out = np.zeros((size, size), dtype=dtype) + start = 0 + for mat in matrices: + stop = start + mat.shape[0] + out[start:stop, start:stop] = mat + start = stop + return out def scaled_toa_uncertainty(self, toas: TOAs) -> u.Quantity: """Get the scaled TOA data uncertainties noise models. @@ -1841,12 +1865,34 @@ def scaled_wideband_uncertainty(self, toas: TOAs) -> np.ndarray: derr = self.scaled_dm_uncertainty(toas).to_value(pint.dmu) return np.hstack((terr, derr)) + def _noise_designmatrix_cache_key(self, toas: TOAs) -> Tuple[int, int, Tuple]: + """Build a cache key for noise design-matrix evaluation.""" + noise_params = self.get_params_of_component_type("NoiseComponent") + noise_state = [] + for pname in noise_params: + par = getattr(self, pname) + noise_state.append( + ( + pname, + repr(getattr(par, "quantity", None)), + repr(getattr(par, "key", None)), + repr(getattr(par, "key_value", None)), + ) + ) + return (id(toas), len(toas), tuple(noise_state)) + def noise_model_designmatrix(self, toas: TOAs) -> np.ndarray: """Returns the joint design/basis matrix for all noise components.""" if len(self.basis_funcs) == 0: return None + key = self._noise_designmatrix_cache_key(toas) + cache = getattr(self, "_noise_designmatrix_cache", None) + if cache is not None and cache.get("key") == key: + return cache["U"] result = [nf(toas)[0] for nf in self.basis_funcs] - return np.hstack(result) + U = np.hstack(result) + self._noise_designmatrix_cache = {"key": key, "U": U} + return U def noise_model_dm_designmatrix(self, toas: TOAs) -> np.ndarray: """Returns the design/basis matrix for all noise components for wideband DMs. @@ -1920,14 +1966,32 @@ def full_wideband_designmatrix( return (M, par, M_units, Md_units) def noise_model_basis_weight(self, toas: TOAs) -> np.ndarray: - """Returns the joint weight vector for all noise components.""" + """Returns the joint basis covariance for all noise components. + + Returns a 1D vector when all components provide diagonal basis-space + covariance, otherwise returns a 2D block-diagonal covariance matrix. + """ if len(self.basis_funcs) == 0: return None + key = self._noise_designmatrix_cache_key(toas) + cache = getattr(self, "_noise_basis_weight_cache", None) + if cache is not None and cache.get("key") == key: + return cache["Phi"] + result = [nf(toas)[1] for nf in self.basis_funcs] - return np.hstack(list(result)) + has_matrix = any(np.ndim(phi) == 2 for phi in result) + if not has_matrix: + Phi = np.hstack(list(result)) + self._noise_basis_weight_cache = {"key": key, "Phi": Phi} + return Phi + + blocks = [np.diag(phi) if np.ndim(phi) == 1 else phi for phi in result] + Phi = self._block_diag(blocks) + self._noise_basis_weight_cache = {"key": key, "Phi": Phi} + return Phi def full_basis_weight(self, toas: TOAs) -> np.ndarray: - """Returns the joint weight vector for all timing and noise components. + """Returns the joint basis covariance for timing and noise components. The weights of the timing model parameters are set to be a large constant, representing an uninformative prior.""" noise_params = self.get_params_of_component_type("NoiseComponent") @@ -1938,8 +2002,12 @@ def full_basis_weight(self, toas: TOAs) -> np.ndarray: # The number 1e40 is chosen to be consistent with ENTERPRISE. phi_tm = np.ones(npar_tm) * 1e40 phi_nm = self.noise_model_basis_weight(toas) + if phi_nm is None: + return phi_tm + if np.ndim(phi_nm) == 1: + return np.hstack((phi_tm, phi_nm)) - return np.hstack((phi_tm, phi_nm)) if phi_nm is not None else phi_tm + return self._block_diag([np.diag(phi_tm), phi_nm]) def noise_model_dimensions(self, toas: TOAs) -> Dict[str, Tuple[int, int]]: """Number of basis functions for each noise model component. @@ -1960,7 +2028,7 @@ def noise_model_dimensions(self, toas: TOAs) -> Dict[str, Tuple[int, int]]: bfs = nc.basis_funcs if len(bfs) == 0: continue - nbf = sum(len(bf(toas)[1]) for bf in bfs) + nbf = sum(bf(toas)[1].shape[0] for bf in bfs) result[nc.category] = (ntot, nbf) ntot += nbf diff --git a/src/pint/residuals.py b/src/pint/residuals.py index ddafd6cd5..ea6856608 100644 --- a/src/pint/residuals.py +++ b/src/pint/residuals.py @@ -30,6 +30,7 @@ weighted_mean, woodbury_dot, anderson_darling, + get_phiinv, ) __all__ = [ @@ -595,8 +596,8 @@ def calc_whitened_resids(self) -> u.Quantity: if self.model.has_correlated_errors: U = self.model.noise_model_designmatrix(self.toas) - Phidiag = self.model.noise_model_basis_weight(self.toas) - return whiten_residuals(r, errs, U, Phidiag) + Phi = self.model.noise_model_basis_weight(self.toas) + return whiten_residuals(r, errs, U, Phi) else: return r / errs @@ -657,13 +658,21 @@ def _calc_gls_chi2(self, lognorm: bool = False) -> float: s = self.time_resids.to_value(u.s) Ndiag = self.get_data_error().to_value(u.s) ** 2 U = self.model.noise_model_designmatrix(self.toas) - Phidiag = self.model.noise_model_basis_weight(self.toas) + Phi = self.model.noise_model_basis_weight(self.toas) if "PHOFF" not in self.model.free_params: U = np.append(U, np.ones((len(self.toas), 1)), axis=1) - Phidiag = np.append(Phidiag, [1e40]) + if np.ndim(Phi) == 1: + Phi = np.append(Phi, [1e40]) + else: + phi_aug = np.zeros( + (Phi.shape[0] + 1, Phi.shape[1] + 1), dtype=Phi.dtype + ) + phi_aug[:-1, :-1] = Phi + phi_aug[-1, -1] = 1e40 + Phi = phi_aug - chi2, logdet_C = woodbury_dot(Ndiag, U, Phidiag, s, s) + chi2, logdet_C = woodbury_dot(Ndiag, U, Phi, s, s) return (chi2, logdet_C / 2) if lognorm else chi2 @@ -1392,8 +1401,8 @@ def calc_wideband_whitened_resids(self) -> np.ndarray: if self.model.has_correlated_errors: U = self.model.noise_model_wideband_designmatrix(self.toas) - Phidiag = self.model.noise_model_basis_weight(self.toas) - return whiten_residuals(r, errs, U, Phidiag) + Phi = self.model.noise_model_basis_weight(self.toas) + return whiten_residuals(r, errs, U, Phi) else: return r / errs @@ -1442,10 +1451,10 @@ def whitened_resids_adtest(self) -> Tuple[float, float]: def whiten_residuals( - y: np.ndarray, yerr: np.ndarray, U: np.ndarray, Phidiag: np.ndarray + y: np.ndarray, yerr: np.ndarray, U: np.ndarray, Phi: np.ndarray ) -> np.ndarray: """Whiten an array of residuals y given measurement uncertainties yerr, - noise basis matrix U, and noise weights Phidiag. + noise basis matrix U, and basis covariance Phi. Similar computation is done by the following methods: `pint.fitter.GLSFitter.fit_toas()`, `pint.fitter.WidebandTOAFitter.fit_toas()`, `pint.fitter.GLSState.step()`, @@ -1454,7 +1463,11 @@ def whiten_residuals( Ninv_U = U / Ndiag[:, None] UT_Ninv_U = U.T @ Ninv_U UT_Ninv_r = y @ Ninv_U - Sigmainv = UT_Ninv_U + np.diag(1 / Phidiag) + if np.ndim(Phi) == 1: + Phiinv = np.diag(1 / Phi) + else: + Phiinv = get_phiinv(Phi) + Sigmainv = UT_Ninv_U + Phiinv Sigmainv_cf = cho_factor(Sigmainv) ahat = cho_solve(Sigmainv_cf, UT_Ninv_r) diff --git a/src/pint/simulation.py b/src/pint/simulation.py index 97de3d3e1..401f99801 100644 --- a/src/pint/simulation.py +++ b/src/pint/simulation.py @@ -178,12 +178,16 @@ def make_fake_toas( if add_correlated_noise: U = model.noise_model_designmatrix(tsim) b = model.noise_model_basis_weight(tsim) - a = np.random.normal(size=len(b)) - delays += (U @ (b**0.5 * a)) << u.s + if np.ndim(b) == 1: + a = np.random.normal(size=len(b)) + coeffs = b**0.5 * a + else: + coeffs = np.random.multivariate_normal(np.zeros(b.shape[0]), b) + delays += (U @ coeffs) << u.s if tsim.wideband: Ud = model.noise_model_dm_designmatrix(tsim) - dms += (Ud @ (b**0.5 * a)) << pint.dmu + dms += (Ud @ coeffs) << pint.dmu tsim.adjust_TOAs(time.TimeDelta(delays)) diff --git a/src/pint/utils.py b/src/pint/utils.py index db4223d15..b648a0599 100644 --- a/src/pint/utils.py +++ b/src/pint/utils.py @@ -3044,6 +3044,28 @@ def bayesian_information_criterion( ) +def get_phiinv(phi: np.ndarray) -> np.ndarray: + """Invert the phi matrix in a stable way. + We relegate phi to double precision before inverting because we don't care about the precision of the noise parameters, + and this allows us to use the more stable double precision Cholesky decomposition for inversion. + Uses Cholesky-based inversion for SPD covariance matrices, with fallback to + direct inversion when Cholesky fails. + """ + if np.ndim(phi) == 1: + return 1 / phi + arr = np.asarray(phi) + if arr.dtype == np.longdouble: + arr = arr.astype(np.float64) + elif np.issubdtype(arr.dtype, np.clongdouble): + arr = arr.astype(np.complex128) + try: + cf = scipy.linalg.cho_factor(arr, lower=True) + phiinv = scipy.linalg.cho_solve(cf, np.eye(arr.shape[0], dtype=arr.dtype)) + except scipy.linalg.LinAlgError: + phiinv = np.linalg.inv(arr) + return phiinv + + def sherman_morrison_dot( Ndiag: np.ndarray, v: np.ndarray, w: float, x: np.ndarray, y: np.ndarray ) -> Tuple[float, float]: @@ -3095,14 +3117,14 @@ def sherman_morrison_dot( def woodbury_dot( - Ndiag: np.ndarray, U: np.ndarray, Phidiag: np.ndarray, x: np.ndarray, y: np.ndarray + Ndiag: np.ndarray, U: np.ndarray, Phi: np.ndarray, x: np.ndarray, y: np.ndarray ) -> Tuple[float, float]: """ Compute an inner product of the form (x| C^-1 |y) where C = N + U Phi U^T , - N and Phi are diagonal matrices, using the Woodbury + N is diagonal and Phi can be diagonal or full, using the Woodbury identity C^-1 = N^-1 - N^-1 - N^-1 U Sigma^-1 U^T N^-1 where @@ -3117,8 +3139,8 @@ def woodbury_dot( Diagonal elements of the diagonal matrix N U: array-like A matrix that represents a rank-n update to N - Phidiag: array-like - Weights associated with the rank-n update + Phi: array-like + Basis-space covariance associated with the rank-n update x: array-like Vector 1 for the inner product y: array-like @@ -3135,13 +3157,19 @@ def woodbury_dot( x_Ninv_y = np.sum(x * y / Ndiag) x_Ninv_U = (x / Ndiag) @ U y_Ninv_U = (y / Ndiag) @ U - Sigma = np.diag(1 / Phidiag) + (U.T / Ndiag) @ U + if np.ndim(Phi) == 1: + Phiinv = np.diag(1 / Phi) + logdet_Phi = np.sum(np.log(Phi)) + else: + Phiinv = get_phiinv(Phi) + _, logdet_Phi = np.linalg.slogdet(Phi.astype(float)) + + Sigma = Phiinv + (U.T / Ndiag) @ U Sigma_cf = cho_factor(Sigma) x_Cinv_y = x_Ninv_y - x_Ninv_U @ cho_solve(Sigma_cf, y_Ninv_U) logdet_N = np.sum(np.log(Ndiag)) - logdet_Phi = np.sum(np.log(Phidiag)) _, logdet_Sigma = np.linalg.slogdet(Sigma.astype(float)) logdet_C = logdet_N + logdet_Phi + logdet_Sigma diff --git a/tests/test_noise_designmatrix_cache_regression.py b/tests/test_noise_designmatrix_cache_regression.py new file mode 100644 index 000000000..d9da69c09 --- /dev/null +++ b/tests/test_noise_designmatrix_cache_regression.py @@ -0,0 +1,158 @@ +import numpy as np + +from pint.config import examplefile +from pint.models.timing_model import Component +from pint.models import get_model_and_toas + + +def _model_and_toas(): + parfile = examplefile("B1855+09_NANOGrav_9yv1.gls.par") + timfile = examplefile("B1855+09_NANOGrav_9yv1.tim") + return get_model_and_toas(parfile, timfile) + + +def _multi_basis_model_and_toas(): + model, toas = _model_and_toas() + all_components = Component.component_types + + model.add_component(all_components["PLDMNoise"](), validate=False) + model["TNDMAMP"].quantity = -13 + model["TNDMGAM"].quantity = 1.2 + model["TNDMC"].value = 8 + + model.add_component(all_components["PLSWNoise"](), validate=False) + model["TNSWAMP"].quantity = -12 + model["TNSWGAM"].quantity = -2.0 + model["TNSWC"].value = 8 + + model.add_component(all_components["PLChromNoise"](), validate=False) + model["TNCHROMAMP"].quantity = -14 + model["TNCHROMGAM"].quantity = 1.2 + model["TNCHROMC"].value = 8 + + model.add_component(all_components["ChromaticCM"](), validate=False) + model["TNCHROMIDX"].value = 4 + + model.validate() + return model, toas + + +def test_noise_designmatrix_cache_hit_avoids_recomputing_basis(monkeypatch): + model, toas = _model_and_toas() + component = model.components["PLRedNoise"] + + calls = {"n": 0} + original = component.get_noise_basis + + def wrapped_get_noise_basis(current_toas): + calls["n"] += 1 + return original(current_toas) + + monkeypatch.setattr(component, "get_noise_basis", wrapped_get_noise_basis) + + first = model.noise_model_designmatrix(toas) + second = model.noise_model_designmatrix(toas) + + assert first is second + assert calls["n"] == 1 + + +def test_noise_designmatrix_cache_invalidates_on_noise_parameter_change(): + model, toas = _model_and_toas() + + first = model.noise_model_designmatrix(toas) + + old_value = int(model.TNREDC.value) if model.TNREDC.value is not None else 30 + model.TNREDC.value = old_value + 1 + model.setup() + model.validate() + + second = model.noise_model_designmatrix(toas) + third = model.noise_model_designmatrix(toas) + + assert second is third + assert second.shape[1] == first.shape[1] + 2 + assert not np.array_equal(first, second) + + +def test_noise_designmatrix_cache_multi_basis_hit_and_invalidation(monkeypatch): + model, toas = _multi_basis_model_and_toas() + + component_names = ["PLRedNoise", "PLDMNoise", "PLSWNoise", "PLChromNoise"] + calls = {name: 0 for name in component_names} + + originals = { + name: model.components[name].get_noise_basis for name in component_names + } + + for name in component_names: + component = model.components[name] + original = originals[name] + + def make_wrapper(component_name, original_func): + def wrapped_get_noise_basis(current_toas): + calls[component_name] += 1 + return original_func(current_toas) + + return wrapped_get_noise_basis + + monkeypatch.setattr(component, "get_noise_basis", make_wrapper(name, original)) + + first = model.noise_model_designmatrix(toas) + second = model.noise_model_designmatrix(toas) + + assert first is second + for name in component_names: + assert calls[name] == 1 + + model.TNCHROMC.value = int(model.TNCHROMC.value) + 1 + model.setup() + model.validate() + + third = model.noise_model_designmatrix(toas) + fourth = model.noise_model_designmatrix(toas) + + assert third is fourth + assert calls["PLRedNoise"] == 2 + assert calls["PLDMNoise"] == 2 + assert calls["PLSWNoise"] == 2 + assert calls["PLChromNoise"] == 2 + assert third.shape[1] == first.shape[1] + 2 + + +def test_noise_basis_weight_cache_hit_avoids_recomputing_weights(monkeypatch): + model, toas = _model_and_toas() + component = model.components["PLRedNoise"] + + calls = {"n": 0} + original = component.get_noise_weights + + def wrapped_get_noise_weights(current_toas): + calls["n"] += 1 + return original(current_toas) + + monkeypatch.setattr(component, "get_noise_weights", wrapped_get_noise_weights) + + first = model.noise_model_basis_weight(toas) + second = model.noise_model_basis_weight(toas) + + assert first is second + assert calls["n"] == 1 + + +def test_noise_basis_weight_cache_invalidates_on_noise_parameter_change(): + model, toas = _model_and_toas() + + first = model.noise_model_basis_weight(toas) + + old_value = int(model.TNREDC.value) if model.TNREDC.value is not None else 30 + model.TNREDC.value = old_value + 1 + model.setup() + model.validate() + + second = model.noise_model_basis_weight(toas) + third = model.noise_model_basis_weight(toas) + + assert second is third + assert second.shape[0] == first.shape[0] + 2 + assert not np.array_equal(first, second) diff --git a/tests/test_noise_model_dtype_regression.py b/tests/test_noise_model_dtype_regression.py new file mode 100644 index 000000000..15e7ddf69 --- /dev/null +++ b/tests/test_noise_model_dtype_regression.py @@ -0,0 +1,39 @@ +import numpy as np + +from pint.models.noise_model import ( + create_fourier_design_matrix, + get_rednoise_freqs, + matern_kernel, + periodic_kernel, + powerlaw, + ridge_kernel, + se_kernel, +) + + +def test_rednoise_freqs_from_longdouble_times_are_float64(): + t = np.array([0.0, 10.0, 20.0], dtype=np.longdouble) + freqs = get_rednoise_freqs(t, nmodes=4, Tspan=np.max(t) - np.min(t)) + assert np.asarray(freqs).dtype == np.float64 + + +def test_powerlaw_from_longdouble_freqs_is_float64(): + freqs = np.array([0.1, 0.2, 0.3], dtype=np.longdouble) + weights = powerlaw(freqs, A=1e-15, gamma=4.0) + assert np.asarray(weights).dtype == np.float64 + + +def test_fourier_design_matrix_dtype_remains_float64(): + t = np.array([1.0, 2.0, 3.0], dtype=np.longdouble) + freqs = np.array([0.1, 0.2], dtype=np.longdouble) + design = create_fourier_design_matrix(t, freqs) + assert design.dtype == np.float64 + + +def test_time_domain_kernel_weights_from_longdouble_nodes_are_float64(): + nodes = np.array([1.0, 2.0, 4.0], dtype=np.longdouble) + + assert ridge_kernel(nodes).dtype == np.float64 + assert se_kernel(nodes).dtype == np.float64 + assert matern_kernel(nodes).dtype == np.float64 + assert periodic_kernel(nodes).dtype == np.float64 diff --git a/tests/test_noise_models.py b/tests/test_noise_models.py index 3831dc3ff..7baf67d5c 100644 --- a/tests/test_noise_models.py +++ b/tests/test_noise_models.py @@ -7,6 +7,10 @@ from pint.models import get_model_and_toas, get_model from pint.models.timing_model import Component from pint.models.noise_model import NoiseComponent +from pint.models.noise_model import ( + TimeDomainSWNoise, + project_basis_covariance, +) from pint.simulation import make_fake_toas_uniform from io import StringIO @@ -58,6 +62,47 @@ def add_chrom_noise_to_model(model): model.validate() +def _base_model_and_toas(): + """Load a clean real dataset used for integration-style noise component tests.""" + parfile = examplefile("B1855+09_NANOGrav_9yv1.gls.par") + timfile = examplefile("B1855+09_NANOGrav_9yv1.tim") + return get_model_and_toas(parfile, timfile) + + +def _add_time_domain_sw_component(model, kernel): + """Attach a TimeDomainSWNoise component configured for the given kernel. + + Parameters + ---------- + kernel : str + One of ``'ridge'``, ``'sqexp'``, ``'matern'``, ``'quasi_periodic'``. + """ + component = TimeDomainSWNoise() + model.add_component(component, validate=False) + + model["TDSWKERNEL"].value = kernel + model["TDSWDT"].value = 14.0 + model["TDSWINTERP_KIND"].value = "linear" + model["TDSWLOGSIG"].value = -7.0 + + if kernel == "ridge": + pass # only TDSWLOGSIG is required + elif kernel == "sqexp": + model["TDSWLOGELL"].value = 1.2 + elif kernel == "matern": + model["TDSWLOGELL"].value = 1.0 + model["TDSWNU"].value = 1.5 + elif kernel == "quasi_periodic": + model["TDSWLOGELL"].value = 1.1 + model["TDSWLOGGAMP"].value = -0.2 + model["TDSWLOGP"].value = 1.5 + else: + raise ValueError(f"Unsupported time-domain SW kernel: {kernel}") + + model.validate() + return component + + @pytest.fixture(scope="module") def model_and_toas(): parfile = examplefile("B1855+09_NANOGrav_9yv1.gls.par") @@ -148,6 +193,40 @@ def test_noise_basis_weights_funcs(model_and_toas, component_label): assert np.allclose(basis_, basis) and np.allclose(weights, weights_) +@pytest.mark.parametrize("kernel", ["ridge", "sqexp", "matern", "quasi_periodic"]) +def test_noise_weights_sign_time_domain_sw_integration(kernel): + """Integration test: each time-domain SW kernel should produce non-negative weights. + + This extends the existing ``test_noise_weights_sign`` coverage to the unified + TimeDomainSWNoise component with each supported kernel. + """ + + model, toas = _base_model_and_toas() + component = _add_time_domain_sw_component(model, kernel) + + basis, weights = component.basis_funcs[0](toas) + + assert basis.shape == (len(toas), len(weights)) + assert np.all(weights >= 0) + + +@pytest.mark.parametrize("kernel", ["ridge", "sqexp", "matern", "quasi_periodic"]) +def test_time_domain_sw_covariance_matches_basis_weights(kernel): + """Integration test: covariance must equal basis*weights*basis^T for each kernel. + + Mirrors ``test_covariance_matrix_relation`` for the unified TimeDomainSWNoise. + """ + + model, toas = _base_model_and_toas() + component = _add_time_domain_sw_component(model, kernel) + + basis, weights = component.basis_funcs[0](toas) + cov = component.covariance_matrix_funcs[0](toas) + cov_from_basis = project_basis_covariance(basis, weights) + + assert np.allclose(cov, cov_from_basis) + + def test_white_noise_model_derivs(): par = """ ELAT 1.3 1 @@ -179,3 +258,245 @@ def test_white_noise_model_derivs(): assert np.isclose( dq2[-1], sigma[-1] * m.EQUAD2.quantity * (m.EFAC2.quantity / sigma[-1]) ** 2 ) + + +# --------------------------------------------------------------------------- +# TimeDomainSWNoise – kernel parameter validation tests +# --------------------------------------------------------------------------- + + +def test_time_domain_sw_invalid_kernel_rejected(): + """TimeDomainSWNoise should raise ValueError for an unknown kernel name.""" + model, _ = _base_model_and_toas() + component = TimeDomainSWNoise() + model.add_component(component, validate=False) + model["TDSWKERNEL"].value = "bogus_kernel" + model["TDSWLOGSIG"].value = -7.0 + with pytest.raises(ValueError, match="TDSWKERNEL"): + model.validate() + + +@pytest.mark.parametrize( + "kernel, missing_param", + [ + ("sqexp", "TDSWLOGELL"), + ("matern", "TDSWLOGELL"), + ("quasi_periodic", "TDSWLOGELL"), + ], +) +def test_time_domain_sw_missing_required_param(kernel, missing_param): + """TimeDomainSWNoise validate() should raise when a kernel-required param is absent.""" + model, _ = _base_model_and_toas() + component = TimeDomainSWNoise() + model.add_component(component, validate=False) + model["TDSWKERNEL"].value = kernel + model["TDSWLOGSIG"].value = -7.0 + # deliberately do NOT set missing_param (or extra required params for qp) + # For quasi_periodic we need TDSWLOGELL at minimum to fail – leave it None. + with pytest.raises(ValueError, match=missing_param): + model.validate() + + +# --------------------------------------------------------------------------- +# TimeDomainSWNoise – node-based interpolation tests +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("kernel", ["ridge", "sqexp", "matern", "quasi_periodic"]) +def test_time_domain_sw_node_based_interpolation(kernel): + """Node-based TDSWNODE_ interpolation: basis/weights/cov consistent for all kernels. + + Kernel-specific parameters must be set *before* adding nodes because + ``_add_tdsw_node_component`` calls ``component.validate()`` internally as + soon as two or more nodes are present. + """ + model, toas = _base_model_and_toas() + component = TimeDomainSWNoise() + model.add_component(component, validate=False) + + t_mjd = toas.get_mjds().value + step = (t_mjd.max() - t_mjd.min()) / 20 + nodes = np.arange(t_mjd.min() - step, t_mjd.max() + step, step) + + # Set ALL kernel parameters before adding any nodes. + # _add_tdsw_node_component calls component.validate() once nset >= 2, so + # missing required params would raise inside the loop otherwise. + component.TDSWKERNEL.value = kernel + component.TDSWLOGSIG.value = -7.0 + component.TDSWINTERP_KIND.value = "linear" + if kernel == "sqexp": + component.TDSWLOGELL.value = 1.2 + elif kernel == "matern": + component.TDSWLOGELL.value = 1.0 + component.TDSWNU.value = 1.5 + elif kernel == "quasi_periodic": + component.TDSWLOGELL.value = 1.1 + component.TDSWLOGGAMP.value = -0.2 + component.TDSWLOGP.value = 1.5 + + for i, node in enumerate(nodes): + component.add_tdsw_node_component(float(node), index=i + 1) + + basis, weights = component.basis_funcs[0](toas) + cov = component.covariance_matrix_funcs[0](toas) + cov_from_basis = project_basis_covariance(basis, weights) + + assert basis.shape == (len(toas), len(weights)) + assert np.all(weights >= 0) + assert np.allclose(cov, cov_from_basis) + + +# --------------------------------------------------------------------------- +# TimeDomainSWNoise – GLS fitter integration +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("kernel", ["ridge", "sqexp", "matern", "quasi_periodic"]) +def test_time_domain_sw_gls_fitter_runs(kernel): + """GLSFitter must instantiate and produce a finite chi-squared for each kernel. + + A full convergent fit is not required (real data may trigger numerical + step failures), but residuals and chi-squared must be computable after at + most two iterations. + """ + from pint import fitter + + model, toas = _base_model_and_toas() + _add_time_domain_sw_component(model, kernel) + + f = fitter.GLSFitter(toas, model) + try: + f.fit_toas(maxiter=2) + except Exception: + pass # numerical issues are acceptable; we only check chi2 below + + chi2 = f.resids.chi2 + assert np.isfinite(chi2), f"chi2 is not finite for kernel '{kernel}'" + + +# --------------------------------------------------------------------------- +# TimeDomainSWNoise – validation edge cases +# --------------------------------------------------------------------------- + + +def test_time_domain_sw_invalid_interp_kind_rejected(): + """An unsupported TDSWINTERP_KIND value must raise ValueError.""" + model, _ = _base_model_and_toas() + component = TimeDomainSWNoise() + model.add_component(component, validate=False) + model["TDSWKERNEL"].value = "ridge" + model["TDSWLOGSIG"].value = -7.0 + model["TDSWDT"].value = 30.0 + model["TDSWINTERP_KIND"].value = "not_a_kind" + with pytest.raises(ValueError, match="TDSWINTERP_KIND"): + model.validate() + + +@pytest.mark.parametrize("bad_nu", [0.0, 1.0, 2.0, 3.0]) +def test_time_domain_sw_invalid_matern_nu_rejected(bad_nu): + """TDSWNU values outside {0.5, 1.5, 2.5} must raise ValueError for matern kernel.""" + model, _ = _base_model_and_toas() + component = TimeDomainSWNoise() + model.add_component(component, validate=False) + model["TDSWKERNEL"].value = "matern" + model["TDSWLOGSIG"].value = -7.0 + model["TDSWLOGELL"].value = 1.0 + model["TDSWDT"].value = 30.0 + model["TDSWNU"].value = bad_nu + with pytest.raises(ValueError, match="TDSWNU"): + model.validate() + + +def test_time_domain_sw_conflicting_dt_and_nodes_rejected(): + """Setting both a non-default TDSWDT and TDSWNODE_ parameters must raise ValueError. + + ``_add_tdsw_node_component`` calls ``component.validate()`` internally once + two nodes are present, so the exception fires on the second ``add_tdsw_node_component`` + call rather than on an explicit ``model.validate()``. + """ + model, _ = _base_model_and_toas() + component = TimeDomainSWNoise() + model.add_component(component, validate=False) + component.TDSWKERNEL.value = "ridge" + component.TDSWLOGSIG.value = -7.0 + component.TDSWDT.value = 14.0 # non-default: conflicts with nodes + component.add_tdsw_node_component(55000.0, index=1) # nset=1, no validate yet + # Second node addition triggers internal validate() -> must raise. + with pytest.raises(ValueError, match="interpolation mode"): + component.add_tdsw_node_component(55200.0, index=2) + + +def test_time_domain_sw_single_node_rejected(): + """Exactly one TDSWNODE_ value (fewer than the required 2) must raise ValueError.""" + model, _ = _base_model_and_toas() + component = TimeDomainSWNoise() + model.add_component(component, validate=False) + model["TDSWKERNEL"].value = "ridge" + model["TDSWLOGSIG"].value = -7.0 + component.add_tdsw_node_component(55000.0, index=1) + with pytest.raises(ValueError, match="at least 2"): + model.validate() + + +def test_time_domain_sw_duplicate_nodes_rejected(): + """Duplicate TDSWNODE_ values must raise ValueError. + + The second ``add_tdsw_node_component`` call triggers internal validation + (nset >= 2) which should detect the duplicate and raise. + """ + model, _ = _base_model_and_toas() + component = TimeDomainSWNoise() + model.add_component(component, validate=False) + component.TDSWKERNEL.value = "ridge" + component.TDSWLOGSIG.value = -7.0 + component.add_tdsw_node_component(55000.0, index=1) # nset=1, no validate yet + # Second addition with same MJD triggers internal validate() -> must raise. + with pytest.raises(ValueError, match="unique"): + component.add_tdsw_node_component(55000.0, index=2) + + +def test_time_domain_sw_negative_dt_rejected(): + """A non-positive TDSWDT must raise ValueError.""" + model, _ = _base_model_and_toas() + component = TimeDomainSWNoise() + model.add_component(component, validate=False) + model["TDSWKERNEL"].value = "ridge" + model["TDSWLOGSIG"].value = -7.0 + model["TDSWDT"].value = -5.0 + with pytest.raises(ValueError, match="TDSWDT"): + model.validate() + + +# --------------------------------------------------------------------------- +# TimeDomainSWNoise – par-file serialisation roundtrip +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("kernel", ["ridge", "sqexp", "matern", "quasi_periodic"]) +def test_time_domain_sw_parfile_serialises_params(kernel): + """All TimeDomainSWNoise parameters appear in the serialised par string. + + ``TimeDomainSWNoise`` has ``register = False`` so ``get_model()`` will not + reconstruct it from a par file. This test therefore validates that the + parameters are *written* correctly rather than testing a full read-back + roundtrip. + """ + import re + + model, _ = _base_model_and_toas() + _add_time_domain_sw_component(model, kernel) + + par_str = model.as_parfile() + + assert re.search( + rf"TDSWKERNEL\s+{re.escape(kernel)}", par_str + ), f"Expected 'TDSWKERNEL {kernel}' in par string" + assert "TDSWLOGSIG" in par_str + assert "TDSWDT" in par_str + if kernel in ("sqexp", "matern", "quasi_periodic"): + assert "TDSWLOGELL" in par_str + if kernel == "matern": + assert "TDSWNU" in par_str + if kernel == "quasi_periodic": + assert "TDSWLOGGAMP" in par_str + assert "TDSWLOGP" in par_str diff --git a/tests/test_solar_wind.py b/tests/test_solar_wind.py index 70b57f539..5a4c0bb06 100644 --- a/tests/test_solar_wind.py +++ b/tests/test_solar_wind.py @@ -2,6 +2,7 @@ import os import copy +import warnings from io import StringIO import pytest import numpy as np @@ -12,7 +13,7 @@ from pint.models import get_model, get_model_and_toas from pint.fitter import Fitter from pint.simulation import make_fake_toas_uniform -from pint.models.solar_wind_dispersion import SolarWindDispersionX +from pint.models.solar_wind_dispersion import SolarWindDispersionX, SolarWindProxyRegression import pint.utils from pinttestdata import datadir @@ -456,3 +457,105 @@ def test_expression(): )[-1], (m.NE_SW.quantity + dt * m.NE_SW1.quantity), ) + +# --------------------------------------------------------------------------- +# SolarWindProxyRegression tests +# --------------------------------------------------------------------------- + +# Par string shared by proxy tests. ELAT != 0 so S(t) is not degenerate. +_PROXY_PAR = """ +PSR J1234+5678 +F0 1 +DM 10 +ELAT 10 +ELONG 0 +PEPOCH 54000 +""" + +_TSTART = 54000 +_TEND = 54000 + 365.25 * 2 # two years + +_NE_SW_TRUE = 7.9 # cm^-3 +_BETA1_TRUE = 2.1 # cm^-3 (per unit normalised proxy) + + +def _make_proxy(toas): + """Return (proxy_mjd, proxy_vals) — a sinusoidal solar-cycle proxy.""" + mjds = np.linspace(_TSTART - 100, _TEND + 100, 2000) + vals = 5.0 + 3.0 * np.sin(2 * np.pi * (mjds - _TSTART) / (11 * 365.25)) + return mjds, vals + + +def _proxy_model(ne_sw=_NE_SW_TRUE, beta1=_BETA1_TRUE, swm=0): + """Build a SolarWindProxyRegression model with one proxy loaded.""" + base = get_model(StringIO(_PROXY_PAR)) + comp = SolarWindProxyRegression() + base.add_component(comp) + base.NE_SW.value = ne_sw + base.SWPRBETA1.value = beta1 + base.SWM.value = swm + toas = make_fake_toas_uniform(_TSTART, _TEND, 50, base, obs="gbt") + proxy_mjd, proxy_vals = _make_proxy(toas) + base.components["SolarWindProxyRegression"].set_proxy(proxy_mjd, proxy_vals) + return base, toas, proxy_mjd, proxy_vals + + +def test_sw_proxy_instantiation(): + """Component can be programmatically added and has the expected parameters.""" + model = get_model(StringIO(_PROXY_PAR)) + model.add_component(SolarWindProxyRegression()) + assert "SolarWindProxyRegression" in model.components + assert hasattr(model, "NE_SW") + assert hasattr(model, "SWPRBETA1") + assert hasattr(model, "SWPRLAG1") + + +def test_sw_proxy_dm_zero_when_ne_zero(): + """DM is zero when both NE_SW and BETA1 are zero.""" + model, toas, _, _ = _proxy_model(ne_sw=0.0, beta1=0.0) + dm = model.components["SolarWindProxyRegression"].solar_wind_dm(toas) + assert np.all(dm.value == 0.0) + + +def test_sw_proxy_dm_positive(): + """DM values are positive when NE_SW > 0 with a loaded proxy.""" + model, toas, _, _ = _proxy_model() + dm = model.components["SolarWindProxyRegression"].solar_wind_dm(toas) + assert np.all(dm.value > 0) + + +def test_sw_proxy_d_dm_d_beta1_units(): + """d_dm_d_beta1 has units pc cm^-3 / cm^-3 = pc and correct shape.""" + model, toas, _, _ = _proxy_model() + comp = model.components["SolarWindProxyRegression"] + + d = comp.d_dm_d_swprbeta(toas, "SWPRBETA1") + assert d.shape == (len(toas),) + assert d.unit.is_equivalent(u.pc / u.cm**3 / (u.cm**-3)) + + +def test_sw_proxy_d_dm_d_lag1_shape(): + """d_dm_d_lag1 has the right shape.""" + model, toas, _, _ = _proxy_model() + # Give the lag a non-trivial value for a meaningful derivative + model.SWPRLAG1.value = 5.0 + comp = model.components["SolarWindProxyRegression"] + d = comp.d_dm_d_swprlag(toas, "SWPRLAG1") + assert d.shape == (len(toas),) + + +def test_sw_proxy_print_par_includes_proxy_params(): + """print_par output includes SWPRBETA1 and SWPRLAG1 lines.""" + model, _, _, _ = _proxy_model() + par_text = model.components["SolarWindProxyRegression"].print_par() + assert "SWPRBETA1" in par_text + assert "SWPRLAG1" in par_text + assert "NE_SW" in par_text + + +def test_sw_proxy_delay_positive(): + """solar_wind_delay is positive (delays are positive for dispersion).""" + model, toas, _, _ = _proxy_model() + delay = model.components["SolarWindProxyRegression"].solar_wind_delay(toas) + assert np.all(delay.to_value(u.s) > 0) +