Source code for paintbox.utils.CvD_utils

""" Calculates the M/L for CvD models. """
import os
import glob

import numpy as np
import astropy.units as u
from astropy.table import Table, vstack
from astropy.io import fits
from tqdm import tqdm
from spectres import spectres
from scipy.ndimage.filters import gaussian_filter1d
from paintbox.sed import ParametricModel, PaintboxBase
from paintbox.operators import Constrain

from .disp2vel import disp2vel

__all__ = ["CvD18"]

np.seterr(divide='ignore', invalid='ignore')

[docs]class CvD18(PaintboxBase): """ Utility class for the use of CvD models. This class provides a convenience `paintbox` interface to CvD models, producing a callable object that can be used together with other classes to produce more detailed SED models. The CvD model files require pre-processing for use with `paintbox`, which may require an initial overhead of ~one minute, but the processed files can be reused for several files / runs if necessary it models are stored in disk. Parameters ---------- wave: ndarray, astropy.units.Quantity Wavelength array of the model. libpath: str Path to the locations where CvD18 models are stored. sigma: float Velocity dispersion of the output in km/s. Default is 100 km/s (the minimum velocity dispersion allowed). store: str Directory where processed models are stored for reuse. If models are not found within store, models in the libpath are processed for use, and stored in this directory. elements: list Chemical abundances to be included in the model. Available elements are 'Ba', 'C', 'Ca', 'Co', 'Cr', 'Cu', 'Eu', 'Fe', 'K', 'Mg', 'Mn', 'N', 'Na', 'Ni', 'Si', 'Sr', 'T', 'Ti', 'V', 'a/Fe', 'as/Fe'. Default is to use all elements except T (temperature of hot star) and 'a/Fe'. norm: bool Normalize the flux of the processed models to the median. Default is True. """ def __init__(self, wave=None, libpath=None, sigma=100, store=None, elements=None, norm=True): self.sigma = sigma self.store = store self.elements = ["C", "N", "Na", "Mg", "Si", "Ca", "Ti", "Fe", "K", "Cr", "Mn", "Ba", "Ni", "Co", "Eu", "Sr", "V", "Cu", "a/Fe"] if elements is None else elements self.libpath = libpath self._all_elements = ['Ba', 'C', 'Ca', 'Co', 'Cr', 'Cu', 'Eu', 'Fe', 'K', 'Mg', 'Mn', 'N', 'Na', 'Ni', 'Si', 'Sr', 'T', 'Ti', 'V', 'a/Fe', 'as/Fe'] self.norm = norm if os.path.isfile(self.store): self.load_templates() else: self.set_wave(wave) self.process_ssps() self.process_respfun() if self.store is not None: self.save_templates() self.build_model()
[docs] def set_wave(self, wave): if wave is None: ssp_files = glob.glob(os.path.join(self.libpath, "VCJ*.s100")) wave = np.loadtxt(ssp_files[0], usecols=(0,)) if hasattr(wave, "unit"): self.wave = wave.to(u.Angstrom).value else: self.wave = wave #Assumes units are Angstrom assert self.wave.min() >= 3501, "Minimum wavelength is 3501 Angstrom" assert self.wave.max() <= 25000, "Maximum wavelength is 25000 Angstrom" assert self.sigma >= 100, "Minumum velocity dispersion for models is " \ "100 km/s." return
[docs] def build_model(self): """ Build model with paintbox SED methods. """ ssp = ParametricModel(self.wave, self.params, self.templates) self._limits = {} for p in self.params.colnames: vmin = self.params[p].data.min() vmax = self.params[p].data.max() self._limits[p] = (vmin, vmax) self._response_functions = {} for element in self.elements: rf = ParametricModel(self.wave, self.rfpars[element], self.rfs[ element]) self.response_functions[element] = rf ssp = ssp * rf vmin = rf.params[element].data.min() vmax = rf.params[element].data.max() self._limits[element] = (vmin, vmax) if len(self.elements) > 0: # Update limits in case response functions # are used. for p in ["Age", "Z"]: vmin = rf.params[p].data.min() vmax = rf.params[p].data.max() self._limits[p] = (vmin, vmax) self._interpolator = Constrain(ssp) self._parnames = self._interpolator.parnames self._nparams = len(self.parnames)
[docs] def __call__(self, theta): """ Returns a model for a given set of parameters theta. """ return self._interpolator(theta)
[docs] def process_ssps(self): """ Process SSP models. """ ssp_files = glob.glob(os.path.join(self.libpath, "VCJ*.s100")) if len(ssp_files) == 0: raise ValueError(f"Stellar populations not found in libpath: " f"{self.libpath}") nimf = 16 imfs = 0.5 + np.arange(nimf) / 5 x2s, x1s= np.stack(np.meshgrid(imfs, imfs)).reshape(2, -1) velscale = int(self.sigma / 4) kernel_sigma = np.sqrt(self.sigma ** 2 - 100 ** 2) / velscale ssps, params = [], [] for fname in tqdm(ssp_files, desc="Processing SSP files"): spec = os.path.split(fname)[1] T = float(spec.split("_")[3][1:]) Z = float(spec.split("_")[4][1:-8].replace("p", "+").replace( "m", "-")) for i, (x1, x2) in enumerate(zip(x1s, x2s)): params.append(Table([[Z], [T], [x1], [x2]], names=["Z", "Age", "x1", "x2"])) data = np.loadtxt(fname) w = data[:,0] if self.sigma > 100: wvel = disp2vel(w, velscale) ssp = data[:, 1:].T if self.sigma <= 100: newssp = spectres(self.wave, w, ssp, fill=0, verbose=False) else: ssp_rebin = spectres(wvel, w, ssp, fill=0, verbose=False) ssp_broad = gaussian_filter1d(ssp_rebin, kernel_sigma, mode="constant", cval=0.0) newssp = spectres(self.wave, wvel, ssp_broad, fill=0, verbose=False) ssps.append(newssp) self.params = vstack(params) self.templates = np.vstack(ssps) self.fluxnorm = np.median(self.templates, axis=1) if self.norm else 1. self.templates /= self.fluxnorm[:, np.newaxis] return
[docs] def process_respfun(self): """ Prepare response functions from CvD models. """ self.rf_infiles = glob.glob(os.path.join(self.libpath, "atlas_ssp*.s100")) # Read one spectrum to get name of columns with open(self.rf_infiles[0]) as f: header = f.readline().replace("#", "") fields = [_.strip() for _ in header.split(",")] fields[fields.index("C+")] = "C+0.15" fields[fields.index("C-")] = "C-0.15" fields[fields.index("T+")] = "T+50" fields[fields.index("T-")] = "T-50" fields = ["{}0.3".format(_) if _.endswith("+") else _ for _ in fields ] fields = ["{}0.3".format(_) if _.endswith("-") else _ for _ in fields] elements = set([_.split("+")[0].split("-")[0] for _ in fields if any(c in _ for c in ["+", "-"])]) signal = ["+", "-"] velscale = int(self.sigma / 4) kernel_sigma = np.sqrt(self.sigma**2 - 100**2) / velscale rfsout = dict([(element, []) for element in elements]) parsout = dict([(element, []) for element in elements]) desc = "Preparing response functions" for i, fname in enumerate(tqdm(self.rf_infiles, desc=desc)): spec = os.path.split(fname)[1] T = float(spec.split("_")[2][1:]) Z = float(spec.split("_")[3].split(".abun")[0][1:].replace( "p", "+").replace("m", "-")) data = np.loadtxt(fname) w = data[:, 0] data = data.T if self.sigma > 100: wvel = disp2vel(w, velscale) rebin = spectres(wvel, w, data, fill=0, verbose=False) broad = gaussian_filter1d(rebin, kernel_sigma, mode="constant", cval=0.0) data = spectres(self.wave, wvel, broad, fill=0, verbose=False).T else: data = spectres(self.wave, w, data, fill=0, verbose=False).T fsun = data[:, 1] for element in elements: # Adding solar response p = Table([[Z], [T], [0.]], names=["Z", "Age", element]) rfsout[element].append(np.ones(len(self.wave))) parsout[element].append(p) # Adding non-solar responses for sign in signal: name = "{}{}".format(element, sign) cols = [(i,f) for i, f in enumerate(fields) if f.startswith(name)] for i, col in cols: val = float("{}1".format(sign)) * \ float(col.split(sign)[1]) t = Table([[Z], [T], [val]], names=["Z", "Age", element]) parsout[element].append(t) rf = data[:, i] / fsun rfsout[element].append(rf) self.rfs = dict([(e, np.array(rfsout[e])) for e in elements]) self.rfpars = dict([(e, vstack(parsout[e])) for e in elements]) return
[docs] def save_templates(self): hdu0 = fits.PrimaryHDU() hdu1 = fits.BinTableHDU(Table([self.wave], names=["wave"])) hdu1.header["EXTNAME"] = "WAVE" hdu2 = fits.ImageHDU(self.templates * self.fluxnorm[:, None]) hdu2.header["EXTNAME"] = "DATA.SSPS" params = Table(self.params) hdu3 = fits.BinTableHDU(params) hdu3.header["EXTNAME"] = "PARS.SSPS" hdulist = fits.HDUList([hdu0, hdu1, hdu2, hdu3]) for element in self._all_elements: hdudata = fits.ImageHDU(self.rfs[element]) hdudata.header["EXTNAME"] = f"DATA.{element}" hdulist.append(hdudata) hdutable = fits.BinTableHDU(self.rfpars[element]) hdutable.header["EXTNAME"] = f"PARS.{element}" hdulist.append(hdutable) hdulist.writeto(self.store, overwrite=True)
[docs] def load_templates(self): hdulist = fits.open(self.store) nhdus = len(hdulist) hdunum = np.arange(1, nhdus) hdunames = [hdulist[i].header["EXTNAME"] for i in hdunum] hdudict = dict(zip(hdunames, hdunum)) self.wave = Table.read(self.store, hdu=hdudict["WAVE"])["wave"].data self.params = Table.read(self.store, hdu=hdudict["PARS.SSPS"]) self.templates = hdulist[hdudict["DATA.SSPS"]].data self.rfs = {} self.rfpars = {} for e in self._all_elements: self.rfs[e] = hdulist[hdudict[f"DATA.{e}"]].data self.rfpars[e] = Table.read(self.store, hdu=hdudict[f"PARS.{e}"]) self.fluxnorm = np.median(self.templates, axis=1) if self.norm else 1. self.templates /= self.fluxnorm[:, np.newaxis] return
@property def limits(self): """ Lower and upper limits of the model parameters. """ return self._limits @property def response_functions(self): """ Dictionary with all response functions in model. """ return self._response_functions