Source code for paintbox.ssp_utils

import os
import glob

import numpy as np
import astropy.units as u
from astropy.table import Table, vstack
from astropy.io import fits
from tqdm import tqdm
from spectres import spectres
from scipy.ndimage.filters import gaussian_filter1d

from paintbox.sed import PaintboxBase, ParametricModel
from paintbox.logspace_dispersion import disp2vel,  logspace_dispersion

__all__ = ["CvD18", "MILES"]

np.seterr(divide='ignore', invalid='ignore')

[docs]class CvD18(PaintboxBase): """ Utility class for the use of CvD models. This class provides a convenience `paintbox` interface to CvD models, producing a callable object that can be used together with other classes to produce more detailed SED models. The CvD model files require pre-processing for use with `paintbox`, which may require an initial overhead of ~one minute, but the processed files can be reused for several files / runs if necessary it models are stored in disk. Parameters ---------- wave: ndarray, astropy.units.Quantity Wavelength array of the model. libpath: str Path to the locations where CvD18 models are stored. sigma: float Velocity dispersion of the output in km/s. Default is 100 km/s (the minimum velocity dispersion allowed). store: str Directory where processed models are stored for reuse. If models are not found within store, models in the libpath are processed for use, and stored in this directory. elements: list Chemical abundances to be included in the model. Available elements are 'Ba', 'C', 'Ca', 'Co', 'Cr', 'Cu', 'Eu', 'Fe', 'K', 'Mg', 'Mn', 'N', 'Na', 'Ni', 'Si', 'Sr', 'T', 'Ti', 'V', 'a/Fe', 'as/Fe'. Default is to use all elements except T (temperature of hot star) and 'a/Fe'. norm: bool Normalize the flux of the processed models to the median. Default is True. """ def __init__(self, wave=None, libpath=None, sigma=100, store=None, elements=None, norm=True): self.sigma = sigma self.store = store self.elements = ["C", "N", "Na", "Mg", "Si", "Ca", "Ti", "Fe", "K", "Cr", "Mn", "Ba", "Ni", "Co", "Eu", "Sr", "V", "Cu", "a/Fe"] if elements is None else elements self.libpath = libpath self._all_elements = ['Ba', 'C', 'Ca', 'Co', 'Cr', 'Cu', 'Eu', 'Fe', 'K', 'Mg', 'Mn', 'N', 'Na', 'Ni', 'Si', 'Sr', 'T', 'Ti', 'V', 'a/Fe', 'as/Fe'] self.norm = norm if os.path.isfile(self.store): self.load_templates() else: self.set_wave(wave) self.process_ssps() self.process_respfun() if self.store is not None: self.save_templates() self.build_model()
[docs] def set_wave(self, wave): if wave is None: ssp_files = glob.glob(os.path.join(self.libpath, "VCJ*.s100")) wave = np.loadtxt(ssp_files[0], usecols=(0,)) if hasattr(wave, "unit"): self.wave = wave.to(u.Angstrom).value else: self.wave = wave #Assumes units are Angstrom assert self.wave.min() >= 3501, "Minimum wavelength is 3501 Angstrom" assert self.wave.max() <= 25000, "Maximum wavelength is 25000 Angstrom" assert self.sigma >= 100, "Minumum velocity dispersion for models is " \ "100 km/s." return
[docs] def build_model(self): """ Build model with paintbox SED methods. """ ssp = ParametricModel(self.wave, self.params, self.templates) self._limits = {} for p in self.params.colnames: vmin = self.params[p].data.min() vmax = self.params[p].data.max() self._limits[p] = (vmin, vmax) self._response_functions = {} for element in self.elements: rf = ParametricModel(self.wave, self.rfpars[element], self.rfs[ element]) self.response_functions[element] = rf ssp = ssp * rf vmin = rf.params[element].data.min() vmax = rf.params[element].data.max() self._limits[element] = (vmin, vmax) if len(self.elements) > 0: # Update limits in case response functions # are used. for p in ["Age", "Z"]: vmin = rf.params[p].data.min() vmax = rf.params[p].data.max() self._limits[p] = (vmin, vmax) self._interpolator = ssp.constrain_duplicates() self._parnames = self._interpolator.parnames self._nparams = len(self.parnames)
[docs] def __call__(self, theta): """ Returns a model for a given set of parameters theta. """ return self._interpolator(theta)
[docs] def process_ssps(self): """ Process SSP models. """ ssp_files = glob.glob(os.path.join(self.libpath, "VCJ*.s100")) if len(ssp_files) == 0: raise ValueError(f"Stellar populations not found in libpath: " f"{self.libpath}") nimf = 16 imfs = 0.5 + np.arange(nimf) / 5 x2s, x1s= np.stack(np.meshgrid(imfs, imfs)).reshape(2, -1) velscale = int(self.sigma / 4) kernel_sigma = np.sqrt(self.sigma ** 2 - 100 ** 2) / velscale ssps, params = [], [] for fname in tqdm(ssp_files, desc="Processing SSP files"): spec = os.path.split(fname)[1] T = float(spec.split("_")[3][1:]) Z = float(spec.split("_")[4][1:-8].replace("p", "+").replace( "m", "-")) for i, (x1, x2) in enumerate(zip(x1s, x2s)): params.append(Table([[Z], [T], [x1], [x2]], names=["Z", "Age", "x1", "x2"])) data = np.loadtxt(fname) w = data[:,0] if self.sigma > 100: wvel = disp2vel(w, velscale) ssp = data[:, 1:].T if self.sigma <= 100: newssp = spectres(self.wave, w, ssp, fill=0, verbose=False) else: ssp_rebin = spectres(wvel, w, ssp, fill=0, verbose=False) ssp_broad = gaussian_filter1d(ssp_rebin, kernel_sigma, mode="constant", cval=0.0) newssp = spectres(self.wave, wvel, ssp_broad, fill=0, verbose=False) ssps.append(newssp) self.params = vstack(params) self.templates = np.vstack(ssps) self.fluxnorm = np.median(self.templates, axis=1) if self.norm else 1. self.templates /= self.fluxnorm[:, np.newaxis] return
[docs] def process_respfun(self): """ Prepare response functions from CvD models. """ self.rf_infiles = glob.glob(os.path.join(self.libpath, "atlas_ssp*.s100")) # Read one spectrum to get name of columns with open(self.rf_infiles[0]) as f: header = f.readline().replace("#", "") fields = [_.strip() for _ in header.split(",")] fields[fields.index("C+")] = "C+0.15" fields[fields.index("C-")] = "C-0.15" fields[fields.index("T+")] = "T+50" fields[fields.index("T-")] = "T-50" fields = ["{}0.3".format(_) if _.endswith("+") else _ for _ in fields ] fields = ["{}0.3".format(_) if _.endswith("-") else _ for _ in fields] elements = set([_.split("+")[0].split("-")[0] for _ in fields if any(c in _ for c in ["+", "-"])]) signal = ["+", "-"] velscale = int(self.sigma / 4) kernel_sigma = np.sqrt(self.sigma**2 - 100**2) / velscale rfsout = dict([(element, []) for element in elements]) parsout = dict([(element, []) for element in elements]) desc = "Preparing response functions" for i, fname in enumerate(tqdm(self.rf_infiles, desc=desc)): spec = os.path.split(fname)[1] T = float(spec.split("_")[2][1:]) Z = float(spec.split("_")[3].split(".abun")[0][1:].replace( "p", "+").replace("m", "-")) data = np.loadtxt(fname) w = data[:, 0] data = data.T if self.sigma > 100: wvel = disp2vel(w, velscale) rebin = spectres(wvel, w, data, fill=0, verbose=False) broad = gaussian_filter1d(rebin, kernel_sigma, mode="constant", cval=0.0) data = spectres(self.wave, wvel, broad, fill=0, verbose=False).T else: data = spectres(self.wave, w, data, fill=0, verbose=False).T fsun = data[:, 1] for element in elements: # Adding solar response p = Table([[Z], [T], [0.]], names=["Z", "Age", element]) rfsout[element].append(np.ones(len(self.wave))) parsout[element].append(p) # Adding non-solar responses for sign in signal: name = "{}{}".format(element, sign) cols = [(i,f) for i, f in enumerate(fields) if f.startswith(name)] for i, col in cols: val = float("{}1".format(sign)) * \ float(col.split(sign)[1]) t = Table([[Z], [T], [val]], names=["Z", "Age", element]) parsout[element].append(t) rf = data[:, i] / fsun rfsout[element].append(rf) self.rfs = dict([(e, np.array(rfsout[e])) for e in elements]) self.rfpars = dict([(e, vstack(parsout[e])) for e in elements]) return
[docs] def save_templates(self): hdu0 = fits.PrimaryHDU() hdu1 = fits.BinTableHDU(Table([self.wave], names=["wave"])) hdu1.header["EXTNAME"] = "WAVE" hdu2 = fits.ImageHDU(self.templates * self.fluxnorm[:, None]) hdu2.header["EXTNAME"] = "DATA.SSPS" params = Table(self.params) hdu3 = fits.BinTableHDU(params) hdu3.header["EXTNAME"] = "PARS.SSPS" hdulist = fits.HDUList([hdu0, hdu1, hdu2, hdu3]) for element in self._all_elements: hdudata = fits.ImageHDU(self.rfs[element]) hdudata.header["EXTNAME"] = f"DATA.{element}" hdulist.append(hdudata) hdutable = fits.BinTableHDU(self.rfpars[element]) hdutable.header["EXTNAME"] = f"PARS.{element}" hdulist.append(hdutable) hdulist.writeto(self.store, overwrite=True)
[docs] def load_templates(self): hdulist = fits.open(self.store) nhdus = len(hdulist) hdunum = np.arange(1, nhdus) hdunames = [hdulist[i].header["EXTNAME"] for i in hdunum] hdudict = dict(zip(hdunames, hdunum)) self.wave = Table.read(self.store, hdu=hdudict["WAVE"])["wave"].data self.params = Table.read(self.store, hdu=hdudict["PARS.SSPS"]) self.templates = hdulist[hdudict["DATA.SSPS"]].data self.rfs = {} self.rfpars = {} for e in self._all_elements: self.rfs[e] = hdulist[hdudict[f"DATA.{e}"]].data self.rfpars[e] = Table.read(self.store, hdu=hdudict[f"PARS.{e}"]) self.fluxnorm = np.median(self.templates, axis=1) if self.norm else 1. self.templates /= self.fluxnorm[:, np.newaxis] return
@property def limits(self): """ Lower and upper limits of the model parameters. """ return self._limits @property def response_functions(self): """ Dictionary with all response functions in model. """ return self._response_functions
[docs]class MILES(PaintboxBase): """ Utility class to prepare MILES SSP models. More info about MILES models can be found at their website [1] Parameters ---------- spectral_range: str (default 'E") Define the spectral range (wavelength) used in the models. Options are: - E (EMILES): 1680-50000 Angstrom - M (MILES): 3540.5-7409.6 Angstrom - B (V99_Blue): 3855.50-4476.44 Angstrom - R (V99_Red): 4794.95-5465.09 Angstrom - C (CaT): 8348.85-8951.50 Angstrom imf_type: str (default "bi") Initial Mass Function (IMF) parametrization. Options are: - un (unimodal) - bi (bimodal) - ku (Kroupa universal) - kb (Kroupa revised) - ch (Chabrier) isochrones: str (default BasTI) Isochrones used for the SSP. Options are BasTI and Padova. [1]: http://research.iac.es/proyecto/miles/ """ def __init__(self, spectral_range=None, imf_type=None, isochrones=None, wave=None, libpath=None, resolution=None, store=None, norm=True): # Check spectral range self.spectral_range = "E" if spectral_range is None else spectral_range allowed_ranges = ["E", "M", "B", "R", "C"] msg = """Allowed spectral ranges are "E", "M", "B", "R" and "C". """ assert self.spectral_range in allowed_ranges, msg # Check IMF type self.imf_type = "bi" if imf_type is None else imf_type msg = """ Allowed IMF types are "un", "bi", "ku", "kb" and "ch" """ allowed_imfs = ["un", "bi", "ku", "kb", "ch"] assert self.imf_type in allowed_imfs, msg # Check isochones self.isochrones = "BasTI" if isochrones is None else isochrones msg = "Isochrones should be BasTI or Padova. " assert self.isochrones in ["BasTI", "Padova"], msg # Define allowed values for metallicity, ages and alpha-elements self._MHs_iso = {"Padova": [-2.32, -1.71, -1.31, -0.71, -0.40, 0.00, 0.22], "BasTI": [-2.27, -1.79, -1.49, -1.26, -0.96, -0.66, -0.35, -0.25, +0.06, 0.15, 0.26, 0.40]} self._ages_iso = { "Padova": [0.063, 0.071, 0.079, 0.089, 0.10, 0.11, 0.13, 0.14, 0.16, 0.18, 0.20, 0.22, 0.25, 0.28, 0.32, 0.35, 0.40, 0.45, 0.50, 0.56, 0.63, 0.71, 0.79, 0.89, 1.00, 1.12, 1.26, 1.41, 1.58, 1.78, 2.00, 2.24, 2.51, 2.82, 3.16, 3.55, 3.98, 4.47, 5.01, 5.62, 6.31, 7.08, 7.94, 8.91, 10.00, 11.22, 12.59, 14.13, 15.85, 17.78], "BasTI": [00.03, 00.04, 00.05, 00.06, 00.07, 00.08, 00.09, 00.10, 00.15, 00.20, 00.25, 00.30, 00.35, 00.40, 00.45, 00.50, 00.60, 00.70, 00.80, 00.90, 01.00, 01.25, 01.50, 01.75, 02.00, 02.25, 02.50, 02.75, 03.00, 03.25, 03.50, 03.75, 04.00, 04.50, 05.00, 05.50, 06.00, 06.50, 07.00, 07.50, 08.00, 08.50, 09.00, 09.50, 10.00, 10.50, 11.00, 11.50, 12.00, 12.50, 13.00, 13.50, 14.00]} self._alphaFes_iso = {"Padova": ["base"], "BasTI": ["base", 0, 0.4]} self.imf_slopes = {"un": [0.3, 0.5, 0.8, 1.3, 1.5, 1.8, 2.0, 2.3, 2.5, 2.8, 3.0, 3.3, 3.5], "bi": [0.3, 0.5, 0.8, 1.3, 1.5, 1.8, 2.0, 2.3, 2.5, 2.8, 3.0, 3.3, 3.5], "ku": [1.3], "kb": [1.3], "ch": [1.3]} # self.resolution = resolution # self.store = store self.libpath = libpath # self.norm = norm # if os.path.isfile(self.store): # self.load_templates() # else: # self.set_wave(wave) # self.process_ssps() # if self.store is not None: # self.save_templates() # self.build_model() @property def MHs(self): """ Defines allowed metallicities in the SSPs.""" return self._MHs_iso[self.isochrones] @property def ages(self): """ Defines allowed ages in the SSPs. """ return self._ages_iso[self.isochrones] @property def alphaFes(self): """ Defines allowed alpha/Fe in the SSPs.""" return self._alphaFes_iso[self.isochrones]
[docs] def get_filename(self, MH, age, alphaFe="base", imfslope=1.3): """ Retrieves the filename containing the spectrum of given ages, metallicities, alpha/Fe and IMF slope. Parameters ---------- MH: float Metallicity of the SSP. age: float Age of the SSP. alphaFe: float or string Alpha-element abundances. Defaults to "base" models. For BasTI isochrones, models can have alpha element abundances of 0 and +0.4. imfslope: float Slope of the IMF. Defaults to 1.3. """ msign = "p" if MH >= 0. else "m" azero = "0" if age < 10. else ""
# filename = f"{self.spectral_range}{self.imf_type}{imfslope:.2f}Z{0 # }{1:.2f}T{2}{3:02.4f}_iPp0.00_baseFe_linear""" \ # "_FWHM_2.51.fits".format(msign, abs(MH), azero, age) # return os.path.join(self.libpath, filename)