"""Transient chromatic events due to, e.g., profile change events in J1713+0747 or extreme scattering events (ESE)"""
from typing import List, Optional, Union
import numpy as np
import astropy.units as u
from astropy.time import Time
from pint.models.parameter import floatParameter, prefixParameter
from pint.models.timing_model import DelayComponent
from pint.toa import TOAs
[docs]class SimpleExponentialDip(DelayComponent):
"""Simple chromatic exponential dip model for events like the profile changes
in J1713+0747.
The dip is modeled as a logistic function multiplied by an exponential decay.
This is different from the tempo2 implementation where the exponential decay
is multiplied by a Heaviside step function. We cannot fit for the event epoch
while using the latter because it is not differentiable at the event epoch.
This implementation approaches the tempo2 version in the limit EXPDIPEPS -> 0.
The parameter names are also different between this implementation and tempo2.
The tempo2 parameter names have been added here as aliases so that tempo2 par files
can be read.
The explicit mathematical form of the dips is as follows.
.. math::
\\Delta_{\\text{dip}}(t)=-\\sum_{i}A_{i}\\left(\\frac{f}{f_{\\text{ref}}}\\right)^{\\gamma_{i}}\\left(\\frac{\\tau_{i}}{\\epsilon}\\right)^{\\epsilon/\\tau_{i}}\\left(\\frac{\\tau_{i}}{\\tau_{i}-\\epsilon}\\right)^{\\frac{\\tau_{i}-\\epsilon}{\\tau_{i}}}\\frac{\\exp\\left[-\\frac{t-T_{i}}{\\tau_{i}}\\right]}{1+\\exp\\left[-\\frac{t-T_{i}}{\\epsilon}\\right]}
An exponential dip event is normalized such that the EXPDIPAMP_ is its extremum
value. Note that the extremum occurs slightly after the event epoch.
Parameters supported:
.. paramtable::
:class: pint.models.transient_events.SimpleExponentialDip
"""
register = True
category = "simple_exp_dip"
def __init__(self):
super().__init__()
self.add_param(
floatParameter(
name=f"EXPDIPEPS",
units="day",
description="Chromatic exponential dip step timescale",
value=1e-3,
frozen=True,
tcb2tdb_scale_factor=1,
)
)
self.add_param(
floatParameter(
name=f"EXPDIPFREF",
units="MHz",
description="Chromatic exponential dip reference frequency",
value=1400,
frozen=True,
tcb2tdb_scale_factor=1,
)
)
self.add_exp_dip(None, 0, None, None, index=1, frozen=True)
self.delay_funcs_component += [self.expdip_delay]
[docs] def add_exp_dip(
self,
epoch: Union[float, u.Quantity, Time],
ampl: Union[float, u.Quantity],
gamma: Union[float, u.Quantity],
tau: Union[float, u.Quantity],
index: Optional[int] = None,
frozen: bool = True,
) -> int:
"""Add an exponential dip to the model."""
if index is None:
dct = self.get_prefix_mapping_component("EXPEP_")
index = np.max(list(dct.keys())) + 1
elif int(index) in self.get_prefix_mapping_component("EXPEP_"):
raise ValueError(
f"Index '{index}' is already in use in this model. Please choose another."
)
if isinstance(epoch, Time):
epoch = epoch.mjd
elif isinstance(epoch, u.Quantity):
epoch = epoch.value
self.add_param(
prefixParameter(
name=f"EXPDIPEP_{index}",
units="MJD",
description="Chromatic exponential dip epoch",
parameter_type="MJD",
time_scale="utc",
value=epoch,
frozen=frozen,
tcb2tdb_scale_factor=1,
prefix_aliases=["EXPEP_"],
)
)
if isinstance(ampl, u.Quantity):
ampl = ampl.to_value(u.s)
self.add_param(
prefixParameter(
name=f"EXPDIPAMP_{index}",
units="s",
value=ampl,
description="Chromatic exponential dip amplitude",
parameter_type="float",
frozen=frozen,
tcb2tdb_scale_factor=1,
prefix_aliases=["EXPPH_"],
)
)
if isinstance(gamma, u.Quantity):
gamma = gamma.to_value(u.s)
self.add_param(
prefixParameter(
name=f"EXPDIPIDX_{index}",
units="",
value=gamma,
description="Chromatic exponential dip index",
parameter_type="float",
frozen=frozen,
tcb2tdb_scale_factor=1,
prefix_aliases=["EXPINDEX_"],
)
)
if isinstance(tau, u.Quantity):
tau = tau.to_value(u.s)
self.add_param(
prefixParameter(
name=f"EXPDIPTAU_{index}",
units="day",
value=tau,
description="Chromatic exponential dip decay timescale",
parameter_type="float",
frozen=frozen,
tcb2tdb_scale_factor=1,
prefix_aliases=["EXPTAU_"],
)
)
self.setup()
self.validate()
return index
[docs] def remove_exp_dip(self, index: Union[float, int, List[int], np.ndarray]) -> None:
"""Removes all exp dip parameters associated with a given index/list of indices.
Parameters
----------
index : float, int, list, np.ndarray
Number or list/array of numbers corresponding to DMX indices to be removed from model.
"""
if isinstance(index, (int, float, np.int64)):
indices = [index]
elif isinstance(index, (list, set, np.ndarray)):
indices = index
else:
raise TypeError(
f"index must be a float, int, set, list, or array - not {type(index)}"
)
for index in indices:
index_rf = f"{int(index):d}"
for prefix in ["EXPDIPEP_", "EXPDIPAMP_", "EXPDIPTAU_", "EXPDIPIDX_"]:
self.remove_param(f"{prefix}{index_rf}")
self.validate()
[docs] def get_indices(self) -> np.ndarray:
"""Returns an array of integers corresponding to exp dip parameters.
Returns
-------
inds : np.ndarray
Array of exp dip indices in model.
"""
inds = [int(p.split("_")[-1]) for p in self.params if "EXPDIPEP_" in p]
return np.array(inds)
[docs] def setup(self) -> None:
super().setup()
# Get DMX mapping.
# Register the DMX derivatives
for prefix_par in self.get_params_of_type("prefixParameter"):
if prefix_par.startswith("EXPDIPEP_"):
self.register_deriv_funcs(self.d_delay_d_T, prefix_par)
elif prefix_par.startswith("EXPDIPAMP_"):
self.register_deriv_funcs(self.d_delay_d_A, prefix_par)
elif prefix_par.startswith("EXPDIPTAU_"):
self.register_deriv_funcs(self.d_delay_d_tau, prefix_par)
elif prefix_par.startswith("EXPDIPIDX_"):
self.register_deriv_funcs(self.d_delay_d_gamma, prefix_par)
[docs] def get_ffac(self, toas: TOAs) -> np.ndarray:
"""Compute f/fref where f is the observing frequency."""
f = self._parent.barycentric_radio_freq(toas)
fref = self.EXPDIPFREF.quantity
return (f / fref).to_value(u.dimensionless_unscaled)
[docs] def expdip_delay_term(
self, t_mjd: np.ndarray, ffac: np.ndarray, ii: int
) -> u.Quantity:
"""Compute the delay for a single exponential dip event."""
T = getattr(self, f"EXPDIPEP_{ii}").value
dt = (t_mjd - T) * u.day
A = getattr(self, f"EXPDIPAMP_{ii}").quantity
gamma = getattr(self, f"EXPDIPIDX_{ii}").quantity
tau = getattr(self, f"EXPDIPTAU_{ii}").quantity
eps = self.EXPDIPEPS.quantity
# Done this way to avoid overflow in exp
expfac = np.zeros(len(dt))
expfac[dt >= 0] = np.exp(-dt[dt >= 0] / tau) / (1 + np.exp(-dt[dt >= 0] / eps))
expfac[dt < 0] = np.exp(dt[dt < 0] * (tau - eps) / (tau * eps)) / (
1 + np.exp(dt[dt < 0] / eps)
)
return (
-A
* ffac**gamma
* (tau / eps) ** (eps / tau)
* (tau / (tau - eps)) ** ((tau - eps) / tau)
* expfac
)
[docs] def expdip_delay(self, toas: TOAs, acc_delay=None) -> u.Quantity:
"""Total exponential dip delay."""
indices = self.get_indices()
ffac = self.get_ffac(toas)
t_mjd = toas["tdbld"]
delay = np.zeros(len(toas)) * u.s
for ii in indices:
delay += self.expdip_delay_term(t_mjd, ffac, ii)
return delay
[docs] def d_delay_d_A(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity:
"""Derivative of delay w.r.t. exponential dip amplitude."""
ii = getattr(self, param).index
ffac = self.get_ffac(toas)
A = getattr(self, f"EXPDIPAMP_{ii}").quantity
return self.expdip_delay_term(toas["tdbld"], ffac, ii) / A
[docs] def d_delay_d_gamma(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity:
"""Derivative of delay w.r.t. exponential dip chromatic index."""
ii = getattr(self, param).index
ffac = self.get_ffac(toas)
return self.expdip_delay_term(toas["tdbld"], ffac, ii) * np.log(ffac)
[docs] def d_delay_d_tau(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity:
"""Derivative of delay w.r.t. exponential dip decay timescale."""
ii = getattr(self, param).index
ffac = self.get_ffac(toas)
t0_mjd = getattr(self, f"EXPDIPEP_{ii}").value
dt = (toas["tdbld"] - t0_mjd) * u.day
tau = getattr(self, f"EXPDIPTAU_{ii}").quantity
eps = self.EXPDIPEPS.quantity
return (
self.expdip_delay_term(toas["tdbld"], ffac, ii)
* (dt + eps * np.log(eps / (tau - eps)))
/ tau**2
)
[docs] def d_delay_d_T(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity:
"""Derivative of delay w.r.t. exponential dip epoch."""
ii = getattr(self, param).index
ffac = self.get_ffac(toas)
T = getattr(self, f"EXPDIPEP_{ii}").value
dt = (toas["tdbld"] - T) * u.day
tau = getattr(self, f"EXPDIPTAU_{ii}").quantity
eps = self.EXPDIPEPS.quantity
# Done this way to avoid overflow in exp
expfac1 = np.zeros(len(dt))
expfac1[dt >= 0] = np.exp(-dt[dt >= 0] / eps) / (1 + np.exp(-dt[dt >= 0] / eps))
expfac1[dt < 0] = 1 / (1 + np.exp(dt[dt < 0] / eps))
return self.expdip_delay_term(toas["tdbld"], ffac, ii) * (
(1 / tau) - (1 / eps) * expfac1
)
[docs]class ChromaticGaussianEvent(DelayComponent):
r"""Simple chromatic Gaussian model for extreme scattering events or other chromatic transient features
This phenomenological model is defined as a Gaussian in time multiplied by a power law in radio frequency with variable chromaticity.
The model parameters include the event epoch, sign, log10 amplitude, chromatic index, and log10 standard deviation of the Gaussian.
See Coles et al. 2015 (https://arxiv.org/abs/1506.07948) for more details at extreme scattering events.
See Reardon et al. 2023 (https://arxiv.org/abs/2306.16229) for an application of this model in PTAs,
but note a typo in Reardon et al., which lacks a negative sign in the exponential.
The explicit mathematical form of the Gaussian event is as follows.
.. math::
\Delta_{\text{Gaussian}}(t) = \sum_{i} \text{SIGN}_{i} \cdot 10^{\text{LOGAMP}_{i}}
\left(\frac{f}{f_{\text{ref}}}\right)^{-\text{CHROMIDX}_{i}}
\exp\left[-\frac{(t-\text{EPOCH}_{i})^2}{2\left(10^{\text{LOGSIG}_{i}}\right)^2}\right]
where :math:`\text{LOGAMP}_{i} = \log_{10}(A_i / \text{s})`,
:math:`\text{LOGSIG}_{i} = \log_{10}(\sigma_i / \text{days})`,
:math:`\text{EPOCH}_{i}` is in MJD, :math:`f_{\text{ref}} = \text{CHROMGAUSS_FREF}` in MHz,
and :math:`\text{SIGN}_{i} \in \{-1, +1\}`.
Parameters supported:
.. paramtable::
:class: pint.models.transient_events.ChromaticGaussianEvent
Examples
--------
Add a chromatic Gaussian event to an existing PINT timing model:
>>> import numpy as np
>>> from pint.models.transient_events import ChromaticGaussianEvent
>>> chrom_gauss = ChromaticGaussianEvent()
>>> model.add_component(chrom_gauss)
>>> chrom_comp = model.components['ChromaticGaussianEvent']
>>> chrom_comp.add_chromatic_gaussian_event(
... 55000, # EPOCH (MJD)
... np.log10(5e-6), # LOGAMP log10(s)
... 4, # CHROMIDX: chromatic index (f/fref)^-chromidx
... np.log10(300), # LOGSIGMA log10(days)
... 1, # SIGN: event sign
... index=1,
... force=True,
... )
Notes
-----
The above example will appear in the par file as such::
CHROMGAUSS_FREF 1400
CHROMGAUSS_EPOCH_1 55000.0 1
CHROMGAUSS_LOGAMP_1 -5.301 1
CHROMGAUSS_CHROMIDX_1 4.0 0
CHROMGAUSS_LOGSIG_1 2.477 1
CHROMGAUSS_SIGN_1 1 0
For an additional event replace the ``_1`` suffix with ``_2``.
The derivatives w.r.t SIGN and EPOCH can be quite large so it is recommended to fix SIGN and start from a good estimate of EPOCH.
"""
register = True
category = "chromatic_gaussian_event"
def __init__(self):
super().__init__()
self.add_param(
floatParameter(
name=f"CHROMGAUSS_FREF",
units="MHz",
description="Chromatic Gaussian event reference frequency",
value=1400,
frozen=True,
tcb2tdb_scale_factor=1,
)
)
# Dummy event so the model builder can discover this component
self.add_chromatic_gaussian_event(None, 0, 0, 0, 1, index=1, frozen=True)
# Register delay function once (it checks for events internally)
self.delay_funcs_component += [self.chrom_gauss_delay]
[docs] def add_chromatic_gaussian_event(
self,
epoch: Union[float, u.Quantity, Time],
log10amp: Union[float, u.Quantity],
chromidx: Union[float, u.Quantity],
log10sigma: Union[float, u.Quantity],
sign: Union[int, float, u.Quantity],
index: Optional[int] = 1,
frozen: bool = True,
force: bool = False,
) -> int:
"""
Add a chromatic Gaussian event to the model.
Use force = True to overwrite existing event at given index.
"""
if index is None:
dct = self.get_prefix_mapping_component("CHROMGAUSS_EPOCH_")
if dct:
index = np.max(list(dct.keys())) + 1
else:
index = 1 # Start at 1 if no events exist yet
elif int(index) in self.get_prefix_mapping_component("CHROMGAUSS_EPOCH_"):
if not force:
raise ValueError(
f"Index '{index}' is already in use in this model. Please choose another."
)
else:
# Remove existing event so we can re-add with new values
self.remove_chromatic_gaussian_event(int(index))
if isinstance(epoch, Time):
epoch = epoch.mjd
elif isinstance(epoch, u.Quantity):
epoch = epoch.value
self.add_param(
prefixParameter(
name=f"CHROMGAUSS_EPOCH_{index}",
units="MJD",
description="Chromatic Gaussian event epoch",
parameter_type="MJD",
time_scale="utc",
value=epoch,
frozen=frozen,
tcb2tdb_scale_factor=1,
)
)
self.add_param(
prefixParameter(
name=f"CHROMGAUSS_LOGAMP_{index}",
units="",
value=log10amp,
description="Log10 Chromatic Gaussian event amplitude (log10 in seconds)",
parameter_type="float",
frozen=frozen,
tcb2tdb_scale_factor=1,
)
)
self.add_param(
prefixParameter(
name=f"CHROMGAUSS_CHROMIDX_{index}",
units="",
value=chromidx,
description="Chromatic Gaussian event chromatic index",
parameter_type="float",
frozen=frozen,
tcb2tdb_scale_factor=1,
)
)
if isinstance(sign, u.Quantity):
sign = sign.value
sign = np.sign(sign) # get the actual sign
self.add_param(
prefixParameter(
name=f"CHROMGAUSS_SIGN_{index}",
units="",
value=sign,
description="Chromatic Gaussian event sign",
parameter_type="float",
frozen=True,
tcb2tdb_scale_factor=1,
)
)
self.add_param(
prefixParameter(
name=f"CHROMGAUSS_LOGSIG_{index}",
units="",
value=log10sigma,
description="Log10 Chromatic Gaussian event standard deviation (log10 in days)",
parameter_type="float",
frozen=frozen,
tcb2tdb_scale_factor=1,
)
)
self.setup()
self.validate()
return index
[docs] def remove_chromatic_gaussian_event(
self, index: Union[float, int, List[int], np.ndarray]
) -> None:
"""Removes all chromatic Gaussian event parameters associated with a given index/list of indices.
Parameters
----------
index : float, int, list, np.ndarray
Number or list/array of numbers corresponding to chromatic Gaussian event indices to be removed from model.
"""
if isinstance(index, (int, float, np.int64)):
indices = [index]
elif isinstance(index, (list, set, np.ndarray)):
indices = index
else:
raise TypeError(
f"index must be a float, int, set, list, or array - not {type(index)}"
)
for index in indices:
index_rf = f"{int(index):d}"
for prefix in [
"CHROMGAUSS_EPOCH_",
"CHROMGAUSS_LOGAMP_",
"CHROMGAUSS_LOGSIG_",
"CHROMGAUSS_CHROMIDX_",
"CHROMGAUSS_SIGN_",
]:
param_name = f"{prefix}{index_rf}"
if param_name in self.params:
self.remove_param(param_name)
self.setup()
self.validate()
# Also re-setup parent model if attached, so model-level derivative
# registry is cleaned up properly.
if hasattr(self, "_parent") and self._parent is not None:
self._parent.setup()
[docs] def get_indices(self) -> np.ndarray:
"""Returns an array of integers corresponding to chromatic Gaussian event parameters.
Returns
-------
inds : np.ndarray
Array of chromatic Gaussian event indices in model.
"""
inds = [int(p.split("_")[-1]) for p in self.params if "CHROMGAUSS_EPOCH_" in p]
return np.array(inds)
[docs] def setup(self) -> None:
super().setup()
# Get mapping.
# Register the ChromGauss derivatives
for prefix_par in self.get_params_of_type("prefixParameter"):
if prefix_par.startswith("CHROMGAUSS_EPOCH_"):
self.register_deriv_funcs(self.d_delay_d_epoch, prefix_par)
elif prefix_par.startswith("CHROMGAUSS_LOGAMP_"):
self.register_deriv_funcs(self.d_delay_d_log10amp, prefix_par)
elif prefix_par.startswith("CHROMGAUSS_LOGSIG_"):
self.register_deriv_funcs(self.d_delay_d_log10sigma, prefix_par)
elif prefix_par.startswith("CHROMGAUSS_CHROMIDX_"):
self.register_deriv_funcs(self.d_delay_d_chromidx, prefix_par)
elif prefix_par.startswith("CHROMGAUSS_SIGN_"):
self.register_deriv_funcs(self.d_delay_d_sign, prefix_par)
[docs] def get_ffac(self, toas: TOAs) -> np.ndarray:
"""Compute f/fref where f is the observing frequency."""
f = self._parent.barycentric_radio_freq(toas)
fref = self.CHROMGAUSS_FREF.quantity
return (f / fref).to_value(u.dimensionless_unscaled)
[docs] def chrom_gauss_delay_term(
self, t_mjd: np.ndarray, ffac: np.ndarray, ii: int
) -> u.Quantity:
"""Compute the delay for a single chromatic Gaussian event."""
T = getattr(self, f"CHROMGAUSS_EPOCH_{ii}").value
dt = (t_mjd - T) * u.day
log10Amp = getattr(self, f"CHROMGAUSS_LOGAMP_{ii}").value
chromidx = getattr(self, f"CHROMGAUSS_CHROMIDX_{ii}").value
log10sigma = getattr(self, f"CHROMGAUSS_LOGSIG_{ii}").value
sign = getattr(self, f"CHROMGAUSS_SIGN_{ii}").value
sigma = 10 ** (log10sigma) * u.day
return (
sign
* 10**log10Amp
* u.s
* np.exp(-0.5 * (dt.value) ** 2 / (sigma.value) ** 2)
* ffac ** (-chromidx)
)
[docs] def chrom_gauss_delay(self, toas: TOAs, acc_delay=None) -> u.Quantity:
"""Total delay across all chromatic Gaussian events."""
indices = self.get_indices()
ffac = self.get_ffac(toas)
t_mjd = toas["tdbld"]
delay = np.zeros(len(toas)) * u.s
for ii in indices:
delay += self.chrom_gauss_delay_term(t_mjd, ffac, ii)
return delay
[docs] def d_delay_d_log10amp(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity:
"""Derivative of delay w.r.t. chromatic Gaussian amplitude."""
ii = getattr(self, param).index
ffac = self.get_ffac(toas)
return self.chrom_gauss_delay_term(toas["tdbld"], ffac, ii) * np.log(10)
[docs] def d_delay_d_chromidx(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity:
"""Derivative of delay w.r.t. chromatic Gaussian index."""
ii = getattr(self, param).index
ffac = self.get_ffac(toas)
return self.chrom_gauss_delay_term(toas["tdbld"], ffac, ii) * np.log(ffac**-1)
[docs] def d_delay_d_log10sigma(
self, toas: TOAs, param: str, acc_delay=None
) -> u.Quantity:
"""Derivative of delay w.r.t. chromatic Gaussian st. deviation."""
ii = getattr(self, param).index
ffac = self.get_ffac(toas)
t0_mjd = getattr(self, f"CHROMGAUSS_EPOCH_{ii}").value
dt = (toas["tdbld"] - t0_mjd) * u.day
log10sigma = getattr(self, f"CHROMGAUSS_LOGSIG_{ii}").value
# sigma = 10 ** (log10sigma) * u.day
return (
self.chrom_gauss_delay_term(toas["tdbld"], ffac, ii)
* (dt.value) ** 2
/ (10 ** (2 * log10sigma))
* np.log(10)
)
[docs] def d_delay_d_epoch(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity:
"""Derivative of delay w.r.t. chromatic Gaussian epoch."""
ii = getattr(self, param).index
ffac = self.get_ffac(toas)
T = getattr(self, f"CHROMGAUSS_EPOCH_{ii}").value
dt = (toas["tdbld"] - T) * u.day
log10sigma = getattr(self, f"CHROMGAUSS_LOGSIG_{ii}").value
sigma_sq = 10 ** (2 * log10sigma) * u.day**2
return self.chrom_gauss_delay_term(toas["tdbld"], ffac, ii) * (dt / sigma_sq)
[docs] def d_delay_d_sign(self, toas: TOAs, param: str, acc_delay=None) -> u.Quantity:
"""Derivative of delay w.r.t. chromatic Gaussian sign."""
ii = getattr(self, param).index
ffac = self.get_ffac(toas)
# Derivative w.r.t. sign is just the delay term divided by sign
sign = getattr(self, f"CHROMGAUSS_SIGN_{ii}").value
return self.chrom_gauss_delay_term(toas["tdbld"], ffac, ii) / sign