diff --git a/src/pint/models/__init__.py b/src/pint/models/__init__.py index 86adaf524..c2b8fabb8 100644 --- a/src/pint/models/__init__.py +++ b/src/pint/models/__init__.py @@ -47,6 +47,9 @@ PLSWNoise, PLRedNoise, ScaleToaError, + RidgeDMNoise, + SqExpDMNoise, + QuasiPeriodicDMNoise, ) from pint.models.phase_offset import PhaseOffset from pint.models.piecewise import PiecewiseSpindown diff --git a/src/pint/models/noise_model.py b/src/pint/models/noise_model.py index 6313f5eff..bc314ec63 100644 --- a/src/pint/models/noise_model.py +++ b/src/pint/models/noise_model.py @@ -1115,6 +1115,377 @@ def pl_rn_cov_matrix(self, toas: TOAs) -> np.ndarray: return np.dot(Fmat * phi[None, :], Fmat.T) +class RidgeDMNoise(NoiseComponent): + """Ridge regression (white noise) time-domain kernel for the noise covariance matrix.""" + + register = True + category = "ridge_DM_noise" + + introduces_correlated_errors = True + introduces_dm_errors = True + is_time_correlated = True + + def __init__( + self, + ): + super().__init__() + + self.add_param( + floatParameter( + name="TNDMDT", + units="", + aliases=[], + description="Linear interpolation time step for time-domain DM noise.", + convert_tcb2tdb=False, + ) + ) + self.add_param( + floatParameter( + name="TNDMLOGSIG", + units="", + aliases=[], + description="Amplitude of time-domain DM noise" " ridge kernel.", + convert_tcb2tdb=False, + ) + ) + + self.covariance_matrix_funcs += [self.ridge_dm_cov_matrix] + self.basis_funcs += [self.ridge_dm_basis_weight_pair] + + def get_ridge_vals(self) -> Tuple[float, float, float]: + """ + Retrieve ridge regression and time-domain parameters + from the model, substituting defaults if unspecified. + """ + if self.TNDMDT.value is None: + log.warning( + "TNDMDT is not set, using default value of 30 days for RidgeDMNoise" + ) + dt = 30.0 + else: + dt = self.TNDMDT.value + + if self.TNDMLOGSIG.value is not None: + log10_sigma = self.TNDMLOGSIG.value + else: + raise ValueError("TNDMDT and TNDMSIG must be set for RidgeDMNoise") + + return log10_sigma, dt + + def get_noise_basis(self, toas: TOAs) -> np.ndarray: + """Return a chromatic linear interpolation matrix for RidgeDMNoise. + + See the documentation for ridge_dm_basis_weight_pair function for details.""" + tbl = toas.table + t = (tbl["tdbld"].quantity * u.day).to(u.s).value + + fref = 1400 * u.MHz + freqs = self._parent.barycentric_radio_freq(toas).to(u.MHz) + + _, dt = self.get_ridge_vals() + Umat, _ = linear_interp_basis(t, dt=dt * 86400) + D = (fref.value / freqs.value) ** 2.0 + + return Umat * D[:, None] + + def get_noise_weights(self, toas: TOAs) -> np.ndarray: + """Return ridge regression time domain DM noise weights. + + See the documentation for ridge_dm_basis_weight_pair for details.""" + tbl = toas.table + t = (tbl["tdbld"].quantity * u.day).to(u.s).value + + log10_sigma, dt = self.get_ridge_vals() + _, avetoas = linear_interp_basis(t, dt=dt * 86400) + + return ridge_kernel(avetoas, log10_sigma) + + def ridge_dm_basis_weight_pair(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: + """Return a chromatic linear interpolation basis and ridge dm noise weights. + + Uses chromatic linear interpolation basis in the time domain to model dispersive delays. + Time domain GPs have associated covariance functions which are priors over functions. + The ridge regression or white noise covariance function is a common choice for modeling + smooth functions. It is defined as: + .. math:: + K(t_i, t_j) = \\sigma^2 * \\delta(t_i - t_j) + where :math:`\\sigma` is the amplitude of the noise, and :math:`\\delta(t_i - t_j)` is a delta function. + + """ + return (self.get_noise_basis(toas), self.get_noise_weights(toas)) + + def ridge_dm_cov_matrix(self, toas: TOAs) -> np.ndarray: + Umat, phi = self.ridge_dm_basis_weight_pair(toas) + return np.dot(Umat * phi, Umat.T) + + +class SqExpDMNoise(NoiseComponent): + """Squared expoentential time-domain kernel for the noise covariance matrix.""" + + register = True + category = "sqexp_DM_noise" + + introduces_correlated_errors = True + introduces_dm_errors = True + is_time_correlated = True + + def __init__( + self, + ): + super().__init__() + + self.add_param( + floatParameter( + name="TNDMDT", + units="", + aliases=[], + description="Linear interpolation time step for time-domain noise.", + convert_tcb2tdb=False, + ) + ) + self.add_param( + floatParameter( + name="TNDMLOGSIG", + units="", + aliases=[], + description="Amplitude of time-domain DM noise" + " square exponential kernel.", + convert_tcb2tdb=False, + ) + ) + self.add_param( + floatParameter( + name="TNDMLOGELL", + units="", + aliases=[], + description="Charateristic length scale of square exponential" + " time-domain DM noise in days.", + convert_tcb2tdb=False, + ) + ) + + self.covariance_matrix_funcs += [self.sq_exp_dm_cov_matrix] + self.basis_funcs += [self.sq_exp_dm_basis_weight_pair] + + def get_sqexp_vals(self) -> Tuple[float, float, float]: + """ + Retrieve square exponential and time-domain parameters + from the model, substituting defaults if unspecified. + """ + if self.TNDMDT.value is None: + log.warning( + "TNDMDT is not set, using default value of 30 days for SqExpDMNoise" + ) + dt = 30.0 + else: + dt = self.TNDMDT.value + + if self.TNDMLOGELL.value is not None or self.TNDMLOGSIG.value is not None: + log10_sigma = self.TNDMLOGSIG.value + log10_ell = self.TNDMLOGELL.value + else: + raise ValueError("TNDMDT and TNDMSIG must be set for SqExpDMNoise") + + return log10_sigma, log10_ell, dt + + def get_noise_basis(self, toas: TOAs) -> np.ndarray: + """Return a Fourier design matrix for red noise. + + See the documentation for pl_rn_basis_weight_pair function for details.""" + tbl = toas.table + t = (tbl["tdbld"].quantity * u.day).to(u.s).value + + fref = 1400 * u.MHz + freqs = self._parent.barycentric_radio_freq(toas).to(u.MHz) + + _, _, dt = self.get_sqexp_vals() + Umat, _ = linear_interp_basis(t, dt=dt * 86400) + D = (fref.value / freqs.value) ** 2.0 + + return Umat * D[:, None] + + def get_noise_weights(self, toas: TOAs) -> np.ndarray: + """Return square exponential time domain DM noise weights. + + See the documentation for sq_exp_dm_basis_weight_pair for details.""" + tbl = toas.table + t = (tbl["tdbld"].quantity * u.day).to(u.s).value + + (log10_sigma, log10_ell, dt) = self.get_sqexp_vals() + _, avetoas = linear_interp_basis(t, dt=dt * 86400) + + return se_dm_kernel(avetoas, log10_sigma, log10_ell) + + def sq_exp_dm_basis_weight_pair(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: + """Return a chromatic linear interpolation basis and square exponential dm noise weights. + + Uses chromatic linear interpolation basis in the time domain to model dispersive delays. + Time domain GPs have associated covariance functions which are priors over functions. + The square exponential covariance function is a common choice for modeling + smooth functions. It is defined as: + .. math:: + K(t_i, t_j) = \\sigma^2 \\exp\\left(-\\frac{(t_i - t_j)^2}{2 \\ell^2}\\right) + where :math:`\\sigma` is the amplitude of the noise, and :math:`\\ell` is the characteristic + length scale of the noise. + + """ + return (self.get_noise_basis(toas), self.get_noise_weights(toas)) + + def sq_exp_dm_cov_matrix(self, toas: TOAs) -> np.ndarray: + Umat, phi = self.sq_exp_dm_basis_weight_pair(toas) + return np.dot(Umat * phi, Umat.T) + + +class QuasiPeriodicDMNoise(NoiseComponent): + """Quasi-periodic time-domain kernel for the noise covariance matrix.""" + + register = True + category = "qp_DM_noise" + + introduces_correlated_errors = True + introduces_dm_errors = True + is_time_correlated = True + + def __init__( + self, + ): + super().__init__() + + self.add_param( + floatParameter( + name="TNDMDT", + units="", + aliases=[], + description="Linear interpolation time step for time-domain noise.", + convert_tcb2tdb=False, + ) + ) + self.add_param( + floatParameter( + name="TNDMLOGSIG", + units="", + aliases=[], + description="Amplitude of time-domain DM noise" + " quasi-periodic kernel.", + convert_tcb2tdb=False, + ) + ) + self.add_param( + floatParameter( + name="TNDMLOGELL", + units="", + aliases=[], + description="Charateristic length scale of quasi-periodic" + " time-domain DM noise in days.", + convert_tcb2tdb=False, + ) + ) + self.add_param( + floatParameter( + name="TNDMLOGGAMP", + units="", + aliases=[], + description="Mixing parameter for quasi-periodic time-domain DM noise.", + convert_tcb2tdb=False, + ) + ) + self.add_param( + floatParameter( + name="TNDMLOGP", + units="", + aliases=[], + description="Periodicity of quasi-periodic time-domain DM noise in years.", + convert_tcb2tdb=False, + ) + ) + + self.covariance_matrix_funcs += [self.quasi_periodic_dm_cov_matrix] + self.basis_funcs += [self.quasi_periodic_dm_basis_weight_pair] + + def get_quasi_periodic_vals(self) -> Tuple[float, float, float]: + """ + Retrieve quasi-periodic and time-domain parameters + from the model, substituting defaults if unspecified. + """ + if self.TNDMDT.value is None: + log.warning( + "TNDMDT is not set, using default value of 30 days for QuasiPeriodicDMNoise" + ) + dt = 30.0 + else: + dt = self.TNDMDT.value + + if ( + self.TNDMLOGP.value is not None + or self.TNDMLOGSIG.value is not None + or self.TNDMLOGELL.value is not None + or self.TNDMLOGGAMP.value is not None + or self.TNDMLOGP.value is not None + ): + log10_sigma = self.TNDMLOGSIG.value + log10_ell = self.TNDMLOGELL.value + log10_gamma_p = self.TNDMLOGGAMP.value + log10_p = self.TNDMLOGP.value + else: + raise ValueError( + "TNDMDT, TNDMLOGSIG, TNDMLOGELL, TNDMLOGGAMP, and TNDMLOGP must be set for QuasiPeriodicDMNoise" + ) + + return log10_sigma, log10_ell, log10_gamma_p, log10_p, dt + + def get_noise_basis(self, toas: TOAs) -> np.ndarray: + """Return a linear interpolation matrix for QuasiPeriodicDMNoise. + + See the documentation for quasi_periodic_dm_basis_weight_pair function for details. + """ + tbl = toas.table + t = (tbl["tdbld"].quantity * u.day).to(u.s).value + + fref = 1400 * u.MHz + freqs = self._parent.barycentric_radio_freq(toas).to(u.MHz) + + _, _, dt = self.get_quasi_periodic_vals() + Umat, _ = linear_interp_basis(t, dt=dt * 86400) + D = (fref.value / freqs.value) ** 2.0 + + return Umat * D[:, None] + + def get_noise_weights(self, toas: TOAs) -> np.ndarray: + """Return square exponential time domain DM noise weights. + + See the documentation for sq_exp_dm_basis_weight_pair for details.""" + tbl = toas.table + t = (tbl["tdbld"].quantity * u.day).to(u.s).value + + (log10_sigma, log10_ell, log10_gamma_p, log10_p, dt) = ( + self.get_quasi_periodic_vals() + ) + _, avetoas = linear_interp_basis(t, dt=dt * 86400) + + return periodic_kernel(avetoas, log10_sigma, log10_ell, log10_gamma_p, log10_p) + + def quasi_periodic_dm_basis_weight_pair( + self, toas: TOAs + ) -> Tuple[np.ndarray, np.ndarray]: + """Return a chromatic linear interpolation basis and quasi-periodic dm noise weights. + + Uses chromatic linear interpolation basis in the time domain to model dispersive delays. + Time domain GPs have associated covariance functions which are priors over functions. + The periodic covariance function is a common choice for modeling + smooth functions. It is defined as: + .. math:: + K(t_i, t_j) = K_{SE}(t_i, t_j) * + \\exp\\left(-\\Gamma_p\\sin\\left(\\frac{\\pi(t_i - t_j)^2}{p}\\right)^2\\right) + where :math:`K_{SE}` is the square exponential kernel, and :math:`p` is the periodicity. + + """ + return (self.get_noise_basis(toas), self.get_noise_weights(toas)) + + def quasi_periodic_dm_cov_matrix(self, toas: TOAs) -> np.ndarray: + Umat, phi = self.quasi_periodic_dm_basis_weight_pair(toas) + return np.dot(Umat * phi, Umat.T) + + def get_ecorr_epochs(toas_table: np.ndarray, dt: float = 1, nmin: int = 2) -> List[int]: """Find only epochs with more than 1 TOA for applying ECORR.""" if len(toas_table) == 0: @@ -1301,3 +1672,194 @@ def powerlaw( fyr = (1 / u.year).to_value(u.Hz) return A**2 / 12.0 / np.pi**2 * fyr ** (gamma - 3) * f ** (-gamma) * above_fl + + +def linear_interp_basis(toas, dt=30 * 86400): + """Provides a basis for linear interpolation. + + :param toas: Pulsar TOAs in seconds + :param dt: Linear interpolation step size in seconds. + + :returns: Linear interpolation basis and nodes + """ + + # evenly spaced points + x = np.arange(toas.min(), toas.max() + dt, dt) + M = np.zeros((len(toas), len(x))) + + # make linear interpolation basis + for ii in range(len(x) - 1): + idx = np.logical_and(toas >= x[ii], toas <= x[ii + 1]) + M[idx, ii] = (toas[idx] - x[ii + 1]) / (x[ii] - x[ii + 1]) + M[idx, ii + 1] = (toas[idx] - x[ii]) / (x[ii + 1] - x[ii]) + + # only return non-zero columns + idx = M.sum(axis=0) != 0 + + return M[:, idx], x[idx] + + +def linear_interp_basis_dm(toas, freqs, dt=30 * 86400): + + # get linear interpolation basis in time + U, avetoas = linear_interp_basis(toas, dt=dt) + + # scale with radio frequency + Dm = (1400 / freqs) ** 2 + + return U * Dm[:, None], avetoas + + +def linear_interp_basis_chromatic(toas, freqs, dt=30 * 86400, idx=4): + """Linear interpolation basis in time with nu^-chi scaling""" + # get linear interpolation basis in time + U, avetoas = linear_interp_basis(toas, dt=dt) + + # scale with radio frequency + Dm = (1400 / freqs) ** idx + + return U * Dm[:, None], avetoas + + +def linear_interp_basis_freq(freqs, df=64): + """Linear interpolation in radio frequency""" + return linear_interp_basis(freqs, dt=df) + + +def ridge_kernel(avetoas, log10_sigma=-7): + """Ridge regression kernel""" + sigma = 10**log10_sigma + return sigma**2 * np.ones_like(avetoas) + + +def periodic_kernel(avetoas, log10_sigma=-7, log10_ell=2, log10_gam_p=0, log10_p=0): + """Quasi-periodic kernel for DM""" + r = np.abs(avetoas[None, :] - avetoas[:, None]) + + # convert units to seconds + sigma = 10**log10_sigma + l = 10**log10_ell * 86400 + p = 10**log10_p * 3.16e7 + gam_p = 10**log10_gam_p + d = np.eye(r.shape[0]) * (sigma / 500) ** 2 + K = sigma**2 * np.exp(-(r**2) / 2 / l**2 - gam_p * np.sin(np.pi * r / p) ** 2) + d + return K + + +def se_dm_kernel(avetoas, log10_sigma=-7, log10_ell=2): + """Squared-exponential kernel for DM""" + r = np.abs(avetoas[None, :] - avetoas[:, None]) + + # Convert everything into seconds + l = 10**log10_ell * 86400 + sigma = 10**log10_sigma + d = np.eye(r.shape[0]) * (sigma / 500) ** 2 + K = sigma**2 * np.exp(-(r**2) / 2 / l**2) + d + return K + + +def get_tf_quantization_matrix(toas, freqs, dt=30 * 86400, df=None, dm=False, dm_idx=2): + """ + Quantization matrix in time and radio frequency to cut down on the kernel + size. + """ + if df is None: + dfs = [(600, 1000), (1000, 1900), (1900, 3000), (3000, 5000)] + else: + fmin = freqs.min() + fmax = freqs.max() + fs = np.arange(fmin, fmax + df, df) + dfs = [(fs[ii], fs[ii + 1]) for ii in range(len(fs) - 1)] + + Us, avetoas, avefreqs, masks = [], [], [], [] + for rng in dfs: + mask = np.logical_and(freqs >= rng[0], freqs < rng[1]) + if any(mask): + masks.append(mask) + # the ecorr quantization matrix should be the same as what we want here + U, _ = create_ecorr_quantization_matrix(toas[mask], dt=dt, nmin=1) + avetoa = np.array([toas[mask][idx.astype(bool)].mean() for idx in U.T]) + avefreq = np.array([freqs[mask][idx.astype(bool)].mean() for idx in U.T]) + Us.append(U) + avetoas.append(avetoa) + avefreqs.append(avefreq) + + nc = np.sum(U.shape[1] for U in Us) + U = np.zeros((len(toas), nc)) + avetoas = np.concatenate(avetoas) + idx = np.argsort(avetoas) + avefreqs = np.concatenate(avefreqs) + nctot = 0 + for ct, mask in enumerate(masks): + Umat = Us[ct] + nn = Umat.shape[1] + U[mask, nctot : nn + nctot] = Umat + nctot += nn + + if dm: + weights = (1400 / freqs) ** dm_idx + else: + weights = np.ones_like(freqs) + + return U[:, idx] * weights[:, None], { + "avetoas": avetoas[idx], + "avefreqs": avefreqs[idx], + } + + +def tf_kernel( + labels, + log10_sigma=-7, + log10_ell=2, + log10_gam_p=0, + log10_p=0, + log10_ell2=4, + log10_alpha_wgt=0, +): + """ + The product of a quasi-periodic time kernel and + a rational-quadratic frequency kernel. + """ + avetoas = labels["avetoas"] + avefreqs = labels["avefreqs"] + + r = np.abs(avetoas[None, :] - avetoas[:, None]) + r2 = np.abs(avefreqs[None, :] - avefreqs[:, None]) + + # convert units to seconds + sigma = 10**log10_sigma + l = 10**log10_ell * 86400 + l2 = 10**log10_ell2 + p = 10**log10_p * 3.16e7 + gam_p = 10**log10_gam_p + alpha_wgt = 10**log10_alpha_wgt + + d = np.eye(r.shape[0]) * (sigma / 500) ** 2 + Kt = sigma**2 * np.exp(-(r**2) / 2 / l**2 - gam_p * np.sin(np.pi * r / p) ** 2) + Kv = (1 + r2**2 / 2 / alpha_wgt / l2**2) ** (-alpha_wgt) + + return Kt * Kv + d + + +def sf_kernel(labels, log10_sigma=-7, log10_ell=2, log10_ell2=4, log10_alpha_wgt=0): + """ + The product of a squared-exponential time kernel and + a rational-quadratic frequency kernel. + """ + avetoas = labels["avetoas"] + avefreqs = labels["avefreqs"] + + r = np.abs(avetoas[None, :] - avetoas[:, None]) + r2 = np.abs(avefreqs[None, :] - avefreqs[:, None]) + + # Convert everything into seconds + l = 10**log10_ell * 86400 + sigma = 10**log10_sigma + l2 = 10**log10_ell2 + alpha_wgt = 10**log10_alpha_wgt + + d = np.eye(r.shape[0]) * (sigma / 500) ** 2 + Kt = sigma**2 * np.exp(-(r**2) / 2 / l**2) + Kv = (1 + r2**2 / 2 / alpha_wgt / l2**2) ** (-alpha_wgt) + + return Kt * Kv + d diff --git a/tests/test_noise_models.py b/tests/test_noise_models.py index bcdcfa6f9..1c4bcd375 100644 --- a/tests/test_noise_models.py +++ b/tests/test_noise_models.py @@ -23,7 +23,7 @@ ] -def add_DM_noise_to_model(model): +def add_PL_DM_noise_to_model(model): all_components = Component.component_types model.add_component(all_components["PLDMNoise"](), validate=False) model["TNDMAMP"].quantity = -13 @@ -34,6 +34,15 @@ def add_DM_noise_to_model(model): model.validate() +def add_sq_exp_DM_noise_to_model(model): + all_components = Component.component_types + model.add_component(all_components["PLDMNoise"](), validate=False) + model["TNDMLOGSIG"].quantity = -13 + model["TNDMLOGELL"].quantity = 2.0 + model["TNDMDT"].value = 30 + model.validate() + + def add_SW_noise_to_model(model): all_components = Component.component_types model.add_component(all_components["PLSWNoise"](), validate=False) @@ -65,12 +74,21 @@ def model_and_toas(): parfile = examplefile("B1855+09_NANOGrav_9yv1.gls.par") timfile = examplefile("B1855+09_NANOGrav_9yv1.tim") model, toas = get_model_and_toas(parfile, timfile) - add_DM_noise_to_model(model) + add_PL_DM_noise_to_model(model) add_chrom_noise_to_model(model) add_SW_noise_to_model(model) return model, toas +@pytest.fixture() +def time_domain_model_and_toas(): + parfile = examplefile("B1855+09_NANOGrav_9yv1.gls.par") + timfile = examplefile("B1855+09_NANOGrav_9yv1.tim") + model, toas = get_model_and_toas(parfile, timfile) + add_sq_exp_DM_noise_to_model(model) + return model, toas + + @pytest.mark.parametrize("component_label", noise_component_labels) def test_introduces_correlated_errors(component_label): """All `NoiseComponent` classes should have a Boolean attribute `introduces_correlated_errors`."""