import numpy as np
from numpy.polynomial import Chebyshev as Ch
from scipy.linalg import cho_factor, cho_solve
from scipy.optimize import minimize
import psoap
from psoap import constants as C
from psoap import matrix_functions
from psoap.data import lredshift
try:
import celerite
from celerite import terms
except ImportError:
print("If you want to use the fast 1D (SB1 or ST1 models), please install celerite")
try:
import george
from george import kernels
except ImportError:
print("If you want to use the fast GP solver (SB2, ST2, or ST3 models) please install george")
[docs]def predict_f(lwl_known, fl_known, sigma_known, lwl_predict, amp_f, l_f, mu_GP=1.0):
'''wl_known are known wavelengths.
wl_predict are the prediction wavelengths.
Assumes all inputs are 1D arrays.'''
# determine V11, V12, V21, and V22
M = len(lwl_known)
V11 = np.empty((M, M), dtype=np.float64)
matrix_functions.fill_V11_f(V11, lwl_known, amp_f, l_f)
# V11[np.diag_indices_from(V11)] += sigma_known**2
V11 = V11 + sigma_known**2 * np.eye(M)
N = len(wl_predict)
V12 = np.empty((M, N), dtype=np.float64)
matrix_functions.fill_V12_f(V12, lwl_known, lwl_predict, amp_f, l_f)
V22 = np.empty((N, N), dtype=np.float64)
# V22 is the covariance between the prediction wavelengths
# The routine to fill V11 is the same as V22
matrix_functions.fill_V11_f(V22, lwl_predict, amp_f, l_f)
# Find V11^{-1}
factor, flag = cho_factor(V11)
mu = mu_GP + np.dot(V12.T, cho_solve((factor, flag), (fl_known - mu_GP)))
Sigma = V22 - np.dot(V12.T, cho_solve((factor, flag), V12))
return (mu, Sigma)
[docs]def predict_python(wl_known, fl_known, sigma_known, wl_predict, amp_f, l_f, mu_GP=1.0):
'''wl_known are known wavelengths.
wl_predict are the prediction wavelengths.'''
# determine V11, V12, V21, and V22
V11 = get_V11(wl_known, sigma_known, amp_f, l_f)
# V12 is covariance between data wavelengths and prediction wavelengths
V12 = get_V12(wl_known, wl_predict, amp_f, l_f)
# V22 is the covariance between the prediction wavelengths
V22 = get_V22(wl_predict, amp_f, l_f)
# Find V11^{-1}
factor, flag = cho_factor(V11)
mu = mu_GP + np.dot(V12.T, cho_solve((factor, flag), (fl_known - mu_GP)))
Sigma = V22 - np.dot(V12.T, cho_solve((factor, flag), V12))
return (mu, Sigma)
[docs]def predict_f_g(lwl_f, lwl_g, fl_fg, sigma_fg, lwl_f_predict, lwl_g_predict, mu_f, amp_f, l_f, mu_g, amp_g, l_g, get_Sigma=True):
'''
Given that f + g is the flux that we're modeling, jointly predict the components.
'''
# Assert that wl_f and wl_g are the same length
assert len(lwl_f) == len(lwl_g), "Input wavelengths must be the same length."
n_pix = len(lwl_f)
assert len(lwl_f_predict) == len(lwl_g_predict), "Prediction wavelengths must be the same length."
n_pix_predict = len(lwl_f_predict)
# Convert mu constants into vectors
mu_f = mu_f * np.ones(n_pix_predict)
mu_g = mu_g * np.ones(n_pix_predict)
# Cat these into a single vector
mu_cat = np.hstack((mu_f, mu_g))
# Create the matrices for the input data
# print("allocating V11_f, V11_g", n_pix, n_pix)
V11_f = np.empty((n_pix, n_pix), dtype=np.float)
V11_g = np.empty((n_pix, n_pix), dtype=np.float)
# print("filling V11_f, V11_g", n_pix, n_pix)
matrix_functions.fill_V11_f(V11_f, lwl_f, amp_f, l_f)
matrix_functions.fill_V11_f(V11_g, lwl_g, amp_g, l_g)
B = V11_f + V11_g
B[np.diag_indices_from(B)] += sigma_fg**2
# print("factoring sum")
factor, flag = cho_factor(B)
# print("Allocating prediction matrices")
# Now create separate matrices for the prediction
V11_f_predict = np.empty((n_pix_predict, n_pix_predict), dtype=np.float)
V11_g_predict = np.empty((n_pix_predict, n_pix_predict), dtype=np.float)
# print("Filling prediction matrices")
matrix_functions.fill_V11_f(V11_f_predict, lwl_f_predict, amp_f, l_f)
matrix_functions.fill_V11_f(V11_g_predict, lwl_g_predict, amp_g, l_g)
zeros = np.zeros((n_pix_predict, n_pix_predict))
A = np.vstack((np.hstack([V11_f_predict, zeros]), np.hstack([zeros, V11_g_predict])))
# A[np.diag_indices_from(A)] += 1e-4 # Add a small nugget term
# C is now the cross-matrices between the predicted wavelengths and the data wavelengths
V12_f = np.empty((n_pix_predict, n_pix), dtype=np.float)
V12_g = np.empty((n_pix_predict, n_pix), dtype=np.float)
# print("Filling cross-matrices")
matrix_functions.fill_V12_f(V12_f, lwl_f_predict, lwl_f, amp_f, l_f)
matrix_functions.fill_V12_f(V12_g, lwl_g_predict, lwl_g, amp_g, l_g)
C = np.vstack((V12_f, V12_g))
# print("Sloving for mu, sigma")
# the 1.0 signifies that mu_f + mu_g = mu_fg = 1
mu = mu_cat + np.dot(C, cho_solve((factor, flag), fl_fg - 1.0))
if get_Sigma:
Sigma = A - np.dot(C, cho_solve((factor, flag), C.T))
return mu, Sigma
else:
return mu
def predict_f_g_sum(lwl_f, lwl_g, fl_fg, sigma_fg, lwl_f_predict, lwl_g_predict, mu_fg, amp_f, l_f, amp_g, l_g):
# Assert that wl_f and wl_g are the same length
assert len(lwl_f) == len(lwl_g), "Input wavelengths must be the same length."
M = len(lwl_f_predict)
N = len(lwl_f)
V11_f = np.empty((M, M), dtype=np.float)
V11_g = np.empty((M, M), dtype=np.float)
matrix_functions.fill_V11_f(V11_f, lwl_f_predict, amp_f, l_f)
matrix_functions.fill_V11_f(V11_g, lwl_g_predict, amp_g, l_g)
V11 = V11_f + V11_g
V11[np.diag_indices_from(V11)] += 1e-8
V12_f = np.empty((M, N), dtype=np.float64)
V12_g = np.empty((M, N), dtype=np.float64)
matrix_functions.fill_V12_f(V12_f, lwl_f_predict, lwl_f, amp_f, l_f)
matrix_functions.fill_V12_f(V12_g, lwl_g_predict, lwl_g, amp_g, l_g)
V12 = V12_f + V12_g
V22_f = np.empty((N,N), dtype=np.float)
V22_g = np.empty((N,N), dtype=np.float)
# It's a square matrix, so we can just reuse fill_V11_f
matrix_functions.fill_V11_f(V22_f, lwl_f, amp_f, l_f)
matrix_functions.fill_V11_f(V22_g, lwl_g, amp_g, l_g)
V22 = V22_f + V22_g
V22[np.diag_indices_from(V22)] += sigma_fg**2
factor, flag = cho_factor(V22)
mu = mu_fg + np.dot(V12, cho_solve((factor, flag), (fl_fg - 1.0)))
Sigma = V11 - np.dot(V12, cho_solve((factor, flag), V12.T))
return mu, Sigma
[docs]def predict_f_g_h(lwl_f, lwl_g, lwl_h, fl_fgh, sigma_fgh, lwl_f_predict, lwl_g_predict, lwl_h_predict, mu_f, mu_g, mu_h, amp_f, l_f, amp_g, l_g, amp_h, l_h):
'''
Given that f + g + h is the flux that we're modeling, jointly predict the components.
'''
# Assert that wl_f and wl_g are the same length
assert len(lwl_f) == len(lwl_g), "Input wavelengths must be the same length."
assert len(lwl_f) == len(lwl_h), "Input wavelengths must be the same length."
n_pix = len(lwl_f)
assert len(lwl_f_predict) == len(lwl_g_predict), "Prediction wavelengths must be the same length."
assert len(lwl_f_predict) == len(lwl_h_predict), "Prediction wavelengths must be the same length."
n_pix_predict = len(lwl_f_predict)
# Convert mu constants into vectors
mu_f = mu_f * np.ones(n_pix_predict)
mu_g = mu_g * np.ones(n_pix_predict)
mu_h = mu_h * np.ones(n_pix_predict)
# Cat these into a single vector
mu_cat = np.hstack((mu_f, mu_g, mu_h))
V11_f = np.empty((n_pix, n_pix), dtype=np.float)
V11_g = np.empty((n_pix, n_pix), dtype=np.float)
V11_h = np.empty((n_pix, n_pix), dtype=np.float)
matrix_functions.fill_V11_f(V11_f, lwl_f, amp_f, l_f)
matrix_functions.fill_V11_f(V11_g, lwl_g, amp_g, l_g)
matrix_functions.fill_V11_f(V11_h, lwl_h, amp_h, l_h)
B = V11_f + V11_g + V11_h
B[np.diag_indices_from(B)] += sigma_fgh**2
factor, flag = cho_factor(B)
# Now create separate matrices for the prediction
V11_f_predict = np.empty((n_pix_predict, n_pix_predict), dtype=np.float)
V11_g_predict = np.empty((n_pix_predict, n_pix_predict), dtype=np.float)
V11_h_predict = np.empty((n_pix_predict, n_pix_predict), dtype=np.float)
# Fill the prediction matrices
matrix_functions.fill_V11_f(V11_f_predict, lwl_f_predict, amp_f, l_f)
matrix_functions.fill_V11_f(V11_g_predict, lwl_g_predict, amp_g, l_g)
matrix_functions.fill_V11_f(V11_h_predict, lwl_h_predict, amp_h, l_h)
zeros = np.zeros((n_pix_predict, n_pix_predict))
A = np.vstack((np.hstack([V11_f_predict, zeros, zeros]), np.hstack([zeros, V11_g_predict, zeros]), np.hstack([zeros, zeros, V11_h_predict])))
V12_f = np.empty((n_pix_predict, n_pix), dtype=np.float)
V12_g = np.empty((n_pix_predict, n_pix), dtype=np.float)
V12_h = np.empty((n_pix_predict, n_pix), dtype=np.float)
matrix_functions.fill_V12_f(V12_f, lwl_f_predict, lwl_f, amp_f, l_f)
matrix_functions.fill_V12_f(V12_g, lwl_g_predict, lwl_g, amp_g, l_g)
matrix_functions.fill_V12_f(V12_h, lwl_h_predict, lwl_h, amp_h, l_h)
C = np.vstack((V12_f, V12_g, V12_h))
mu = mu_cat + np.dot(C, cho_solve((factor, flag), fl_fgh - 1.0))
Sigma = A - np.dot(C, cho_solve((factor, flag), C.T))
return mu, Sigma
[docs]def predict_f_g_h_sum(lwl_f, lwl_g, lwl_h, fl_fgh, sigma_fgh, lwl_f_predict, lwl_g_predict, lwl_h_predict, mu_fgh, amp_f, l_f, amp_g, l_g, amp_h, l_h):
'''
Given that f + g + h is the flux that we're modeling, predict the joint sum.
'''
# Assert that wl_f and wl_g are the same length
assert len(lwl_f) == len(lwl_g), "Input wavelengths must be the same length."
M = len(lwl_f_predict)
N = len(lwl_f)
V11_f = np.empty((M, M), dtype=np.float)
V11_g = np.empty((M, M), dtype=np.float)
V11_h = np.empty((M, M), dtype=np.float)
matrix_functions.fill_V11_f(V11_f, lwl_f_predict, amp_f, l_f)
matrix_functions.fill_V11_f(V11_g, lwl_g_predict, amp_g, l_g)
matrix_functions.fill_V11_f(V11_h, lwl_h_predict, amp_h, l_h)
V11 = V11_f + V11_g + V11_h
# V11[np.diag_indices_from(V11)] += 1e-5 # small nugget term
V12_f = np.empty((M, N), dtype=np.float64)
V12_g = np.empty((M, N), dtype=np.float64)
V12_h = np.empty((M, N), dtype=np.float64)
matrix_functions.fill_V12_f(V12_f, lwl_f_predict, lwl_f, amp_f, l_f)
matrix_functions.fill_V12_f(V12_g, lwl_g_predict, lwl_g, amp_g, l_g)
matrix_functions.fill_V12_f(V12_h, lwl_h_predict, lwl_h, amp_h, l_h)
V12 = V12_f + V12_g + V12_h
V22_f = np.empty((N,N), dtype=np.float)
V22_g = np.empty((N,N), dtype=np.float)
V22_h = np.empty((N,N), dtype=np.float)
# It's a square matrix, so we can just reuse fil_V11_f
matrix_functions.fill_V11_f(V22_f, lwl_f, amp_f, l_f)
matrix_functions.fill_V11_f(V22_g, lwl_g, amp_g, l_g)
matrix_functions.fill_V11_f(V22_h, lwl_h, amp_h, l_h)
V22 = V22_f + V22_g + V22_h
V22[np.diag_indices_from(V22)] += sigma_fgh**2
factor, flag = cho_factor(V22)
mu = mu_fgh + np.dot(V12.T, cho_solve((factor, flag), (fl_fgh - mu_fgh)))
Sigma = V11 - np.dot(V12, cho_solve((factor, flag), V12.T))
return mu, Sigma
[docs]def lnlike_f(V11, wl_f, fl, sigma, amp_f, l_f, mu_GP=1.):
"""Calculate the log-likelihood for a single-lined spectrum.
This function takes a pre-allocated array and fills out the covariance matrices and evaluates the likelihood function for a single-lined spectrum, assuming a squared-exponential kernel (does not ``celerite``).
Args:
V11 (numpy 2D array): Description of arg1
wl_f (numpy 1D array): Description of arg2
fl (numpy 1D array): ae
amp_f (float) : amplitude of GP
l_f (float) : length scale of GP
mu_GP (float) : mean of GP
Returns:
float: The log-likelihood value
"""
if amp_f < 0.0 or l_f < 0.0:
return -np.inf
# Fill the matrix using fast cython routine.
matrix_functions.fill_V11_f(V11, wl_f, amp_f, l_f)
V11[np.diag_indices_from(V11)] += sigma**2
try:
factor, flag = cho_factor(V11)
except np.linalg.linalg.LinAlgError:
return -np.inf
logdet = np.sum(2 * np.log((np.diag(factor))))
return -0.5 * (np.dot((fl - mu_GP).T, cho_solve((factor, flag), (fl - mu_GP))) + logdet)
[docs]def lnlike_f_g(V11, wl_f, wl_g, fl, sigma, amp_f, l_f, amp_g, l_g, mu_GP=1.):
'''
V11 is a matrix to be allocated.
wl_known, fl_known, and sigma_known are flattened 1D arrays.
'''
if amp_f < 0.0 or l_f < 0.0 or amp_g < 0.0 or l_g < 0.0:
return -np.inf
# Fill the matrix using fast cython routine.
matrix_functions.fill_V11_f_g(V11, wl_f, wl_g, amp_f, l_f, amp_g, l_g)
V11[np.diag_indices_from(V11)] += sigma**2
try:
# factor, flag = cho_factor(V11)
factor, flag = cho_factor(V11, overwrite_a=True, lower=False, check_finite=False)
except np.linalg.linalg.LinAlgError:
return -np.inf
logdet = np.sum(2 * np.log((np.diag(factor))))
return -0.5 * (np.dot((fl - mu_GP).T, cho_solve((factor, flag), (fl - mu_GP))) + logdet)
[docs]def lnlike_f_g_h(V11, wl_f, wl_g, wl_h, fl, sigma, amp_f, l_f, amp_g, l_g, amp_h, l_h, mu_GP=1.):
'''
V11 is a matrix to be allocated.
wl_known, fl_known, and sigma_known are flattened 1D arrays.
'''
if amp_f < 0.0 or l_f < 0.0 or amp_g < 0.0 or l_g < 0.0 or amp_h < 0.0 or l_h < 0.0:
return -np.inf
# Fill the matrix using fast cython routine.
matrix_functions.fill_V11_f_g_h(V11, wl_f, wl_g, wl_h, amp_f, l_f, amp_g, l_g, amp_h, l_h)
V11[np.diag_indices_from(V11)] += sigma**2
try:
factor, flag = cho_factor(V11)
except np.linalg.linalg.LinAlgError:
return -np.inf
logdet = np.sum(2 * np.log((np.diag(factor))))
return -0.5 * (np.dot((fl - mu_GP).T, cho_solve((factor, flag), (fl - mu_GP))) + logdet)
# Assemble lnlikelihood functions for the different models
lnlike = {"SB1": lnlike_f, "SB2": lnlike_f_g, "ST1": lnlike_f, "ST2": lnlike_f_g, "ST3": lnlike_f_g_h}
# Alternatively, use george to do the likelihood calculations
[docs]def lnlike_f_g_george(lwl_f, lwl_g, fl, sigma, amp_f, l_f, amp_g, l_g, mu_GP=1.):
'''
Evaluate the joint likelihood for *f* and *g* using George.
'''
# assumes that the log wavelengths, fluxes, and errors are already flattened
# lwl_f = chunk.lwl.flatten()
# lwl_g = chunk.lwl.flatten()
# does it help to sort?
# ind = np.argsort(lwl_f)
x = np.vstack((lwl_f, lwl_g)).T
# might also want to "block" the kernel to limit it to some velocity range
kernel = amp_f * kernels.ExpSquaredKernel(l_f, ndim=2, axes=0) # primary
kernel += amp_g * kernels.ExpSquaredKernel(l_g, ndim=2, axes=1) # secondary
# instantiate the GP and evaluate the kernel for the prior
gp = george.GP(kernel)
gp.compute(x, sigma)
# evaluate the likelihood for the data
return gp.log_likelihood(fl)
[docs]def optimize_GP_f(wl_known, fl_known, sigma_known, amp_f, l_f, mu_GP=1.0):
'''
Optimize the GP hyperparameters for the given slice of data. Amp and lv are starting guesses.
'''
N = len(wl_known)
V11 = np.empty((N,N), dtype=np.float64)
def func(x):
try:
a, l = x
return -lnlike_f(V11, wl_known, fl_known, sigma_known, a, l, mu_GP)
except np.linalg.linalg.LinAlgError:
return np.inf
ans = minimize(func, np.array([amp_f, l_f]), method="Nelder-Mead")
return ans["x"]
[docs]def optimize_epoch_velocity_f(lwl_epoch, fl_epoch, sigma_epoch, lwl_fixed, fl_fixed, sigma_fixed, gp):
'''
Optimize the wavelengths of the chosen epoch relative to the fixed wavelengths. Identify the velocity required to redshift the chosen epoch.
'''
fl = np.concatenate((fl_epoch, fl_fixed)).flatten()
sigma = np.concatenate((sigma_epoch, sigma_fixed)).flatten()
def func(p):
try:
v, log_sigma, log_rho = p
if v < -200 or v > 200 or log_sigma < -3 or log_sigma > -2 or log_rho < -9 or log_rho > -8:
return -np.inf
# Doppler shift the input wl_epoch
lwl_shift = lredshift(lwl_epoch, v)
# Reconcatenate spectra into 1D array and sort
lwl = np.concatenate((lwl_shift, lwl_fixed)).flatten()
indsort = np.argsort(lwl)
# Set the par vectors
gp.set_parameter_vector(p[1:])
# compute GP on new wl grid
gp.compute(lwl[indsort], yerr=sigma[indsort])
return -gp.log_likelihood(fl[indsort])
except np.linalg.linalg.LinAlgError:
return np.inf
# bound as -200 to 200 km/s
p0 = np.concatenate((np.array([0.0]), gp.get_parameter_vector()))
# bounds = [(-200, 200.)] + gp.get_parameter_bounds()
# print(bounds)
# ans = minimize(func, p0, method="L-BFGS-B", bounds=bounds)
ans = minimize(func, p0, method="Nelder-Mead")
# The velocity returned is the amount that was required to redshift wl_epoch to line up with wl_fixed.
if ans["success"]:
print("Success found with", ans["x"])
return ans["x"][0]
else:
print(ans)
raise C.ChunkError("Unable to optimize velocity for epoch.")
def determine_all_velocities(chunk, log_sigma, log_rho, mu_GP=1.0):
kernel = terms.Matern32Term(log_sigma=log_sigma, log_rho=log_rho)
gp = celerite.GP(kernel, mean=1.0, fit_mean=False)
lwl_fixed = chunk.lwl[0]
fl_fixed = chunk.fl[0]
sigma_fixed = chunk.sigma[0]
velocities = np.empty(chunk.n_epochs, dtype=np.float64)
velocities[0] = 0.0
for i in range(1, chunk.n_epochs):
try:
velocities[i] = optimize_epoch_velocity_f(chunk.lwl[i], chunk.fl[i], chunk.sigma[i], lwl_fixed, fl_fixed, sigma_fixed, gp)
except C.ChunkError as e:
print("Unable to optimize velocity for epoch {:}".format(chunk.date1D[i]))
velocities[i] = 0.0
return velocities
# uses smart inverse from Celerite
[docs]def optimize_calibration_ST1(lwl0, lwl1, lwl_cal, fl_cal, fl_fixed, gp, A, C, mu_GP=1.0, order=1):
'''
Determine the calibration parameters for this epoch of observations.
lwl0, lwl1: set the points for the Chebyshev.
This is a more general method than optimize_calibration_static, since it allows arbitrary covariance matrices, which should be used when there is orbital motion.
lwl_cal: the wavelengths corresponding to the epoch we want to calibrate
fl_cal: the fluxes corresponding to the epoch we want to calibrate
fl_fixed: the remaining epochs of data to calibrate in reference to.
gp: the celerite GP
order: the degree polynomial to use. order = 1 is a line, order = 2 is a line + parabola
Assumes that covariance matrices are appropriately filled out.
'''
# Get a clean set of the Chebyshev polynomials evaluated on the input wavelengths
T = []
for i in range(0, order + 1):
coeff = [0 for j in range(i)] + [1]
Chtemp = Ch(coeff, domain=[lwl0, lwl1])
Ttemp = Chtemp(lwl_cal)
T += [Ttemp]
T = np.array(T)
D = fl_cal[:,np.newaxis] * T.T
# Solve for the calibration coefficients c0, c1, ...
# Find B^{-1}, fl_prime, and C_prime
# B^{-1} corresponds to the gp.apply_inverse
fl_prime = mu_GP + np.dot(C, gp.apply_inverse(fl_fixed.flatten() - mu_GP))
C_prime = A - np.dot(C, gp.apply_inverse(C.T))
# Find {C^\prime}^{-1}
CP_cho = cho_factor(C_prime)
# Invert the least squares problem
left = np.dot(D.T, cho_solve(CP_cho, D))
right = np.dot(D.T, cho_solve(CP_cho, fl_prime))
left_cho = cho_factor(left)
# the coefficents, X = [c0, c1]
X = cho_solve(left_cho, right)
# Apply the correction
fl_cor = np.dot(D, X)
# Return both the corrected flux and the coefficients, in case we want to log them,
# or apply the correction later.
return fl_cor, X
[docs]def optimize_calibration(lwl0, lwl1, lwl_cal, fl_cal, fl_fixed, A, B, C, order=1, mu_GP=1.0):
'''
Determine the calibration parameters for this epoch of observations. This is a more general method than :py:meth:`psoap.covariance.optimize_calibration_static`, since it allows arbitrary covariance matrices, which should be used when there is orbital motion. Assumes that covariance matrices are appropriately filled out.
Args:
lwl0 (float) : left side evaluation point for Chebyshev
lwl1 (float) : right side evaluation point for Chebyshev
lwl_cal (np.array): the wavelengths corresponding to the epoch we want to calibrate
fl_cal (np.array): the fluxes corresponding to the epoch we want to calibrate
fl_fixed (np.array): the remaining epochs of data to calibrate in reference to.
A (2D np.array) : matrix_functions.fill_V11_f(A, lwl_cal, amp, l_f) with sigma_cal already added to the diagonal
B (2D np.array) : matrix_functions.fill_V11_f(B, lwl_fixed, amp, l_f) with sigma_fixed already added to the diagonal
C (2D np.array): matrix_functions.fill_V12_f(C, lwl_cal, lwl_fixed, amp, l_f) cross matrix (with no sigma added, since these are independent measurements).
order (int): the degree polynomial to use. order = 1 is a line, order = 2 is a line + parabola
Returns:
(np.array, np.array): a tuple of two data products. The first is the ``fl_cal`` vector, now calibrated. The second is the array of the Chebyshev coefficients, in case one wants to re-evaluate the calibration polynomials.
'''
# basically, assume that A, B, and C are already filled out.
# the only thing this routine needs to do is fill out the Q matrix
# Get a clean set of the Chebyshev polynomials evaluated on the input wavelengths
T = []
for i in range(0, order + 1):
coeff = [0 for j in range(i)] + [1]
Chtemp = Ch(coeff, domain=[lwl0, lwl1])
Ttemp = Chtemp(lwl_cal)
T += [Ttemp]
T = np.array(T)
D = fl_cal[:,np.newaxis] * T.T
# Solve for the calibration coefficients c0, c1, ...
# Find B^{-1}, fl_prime, and C_prime
try:
B_cho = cho_factor(B)
except np.linalg.linalg.LinAlgError:
print("Failed to solve matrix inverse. Calibration not valid.")
raise
fl_prime = mu_GP + np.dot(C, cho_solve(B_cho, (fl_fixed.flatten() - mu_GP)))
C_prime = A - np.dot(C, cho_solve(B_cho, C.T))
# Find {C^\prime}^{-1}
CP_cho = cho_factor(C_prime)
# Invert the least squares problem
left = np.dot(D.T, cho_solve(CP_cho, D))
right = np.dot(D.T, cho_solve(CP_cho, fl_prime))
left_cho = cho_factor(left)
# the coefficents, X = [c0, c1]
X = cho_solve(left_cho, right)
# Apply the correction
fl_cor = np.dot(D, X)
# Return both the corrected flux and the coefficients, in case we want to log them,
# or apply the correction later.
return fl_cor, X
[docs]def optimize_calibration_static(wl0, wl1, wl_cal, fl_cal, sigma_cal, wl_fixed, fl_fixed, sigma_fixed, amp, l_f, order=1, mu_GP=1.0):
'''
Determine the calibration parameters for this epoch of observations. Assumes all wl, fl arrays are 1D, and that the relative velocities between all epochs are zero.
Args:
wl0 (float) : left wl point to evaluate the Chebyshev
wl1 (float) : right wl point to evaluate the Chebyshev
wl_cal (np.array) : the wavelengths of the epoch to calibrate
fl_cal (np.array) : the fluxes of the epoch to calibrate
sigma_cal (np.array): the sigmas of the epoch to calibrate
wl_fixed (np.array) : the 1D (flattened) array of the reference wavelengths
fl_fixed (np.array) : the 1D (flattened) array of the reference fluxes
sigma_fixed (np.array) : the 1D (flattened) array of the reference sigmas
amp (float): the GP amplitude
l_f (float): the GP length
order (int): the Chebyshev order to use
mu_GP (optional): the mean of the GP to assume.
Returns:
(np.array, np.array): a tuple of two data products. The first is the ``fl_cal`` vector, now calibrated. The second is the array of the Chebyshev coefficients, in case one wants to re-evaluate the calibration polynomials.
'''
N_A = len(wl_cal)
A = np.empty((N_A, N_A), dtype=np.float64)
N_B = len(wl_fixed)
B = np.empty((N_B, N_B), dtype=np.float64)
C = np.empty((N_A, N_B), dtype=np.float64)
matrix_functions.fill_V11_f(A, wl_cal, amp, l_f)
matrix_functions.fill_V11_f(B, wl_fixed, amp, l_f)
matrix_functions.fill_V12_f(C, wl_cal, wl_fixed, amp, l_f)
# Add in sigmas
A[np.diag_indices_from(A)] += sigma_cal**2
B[np.diag_indices_from(B)] += sigma_fixed**2
# Get a clean set of the Chebyshev polynomials evaluated on the input wavelengths
T = []
for i in range(0, order + 1):
coeff = [0 for j in range(i)] + [1]
Chtemp = Ch(coeff, domain=[wl0, wl1])
Ttemp = Chtemp(wl_cal)
T += [Ttemp]
T = np.array(T)
D = fl_cal[:,np.newaxis] * T.T
# Solve for the calibration coefficients c0, c1
# Find B^{-1}, fl_prime, and C_prime
try:
B_cho = cho_factor(B)
except np.linalg.linalg.LinAlgError:
print("Failed to solve matrix inverse. Calibration not valid.")
raise
fl_prime = mu_GP + np.dot(C, cho_solve(B_cho, (fl_fixed.flatten() - mu_GP)))
C_prime = A - np.dot(C, cho_solve(B_cho, C.T))
# Find {C^\prime}^{-1}
CP_cho = cho_factor(C_prime)
# Invert the least squares problem
left = np.dot(D.T, cho_solve(CP_cho, D))
right = np.dot(D.T, cho_solve(CP_cho, fl_prime))
left_cho = cho_factor(left)
# the coefficents, X = [c0, c1]
X = cho_solve(left_cho, right)
# Apply the correction
fl_cor = np.dot(D, X)
return fl_cor, X
[docs]def cycle_calibration(wl, fl, sigma, amp_f, l_f, ncycles, order=1, limit_array=3, mu_GP=1.0, soften=1.0):
'''
Given a chunk of spectra, cycle n_cycles amongst all spectra and return the spectra with inferred calibration adjustments.
order : what order of Chebyshev polynomials to use. 1st order = line.
Only use `limit_array` number of spectra to save memory.
'''
wl0 = np.min(wl)
wl1 = np.max(wl)
fl_out = np.copy(fl)
# Soften the sigmas a little bit
sigma = soften * sigma
n_epochs = len(wl)
for cycle in range(ncycles):
for i in range(n_epochs):
wl_tweak = wl[i]
fl_tweak = fl_out[i]
sigma_tweak = sigma[i]
# Temporary arrays without the epoch we just chose
wl_remain = np.delete(wl, i, axis=0)[0:limit_array]
fl_remain = np.delete(fl_out, i, axis=0)[0:limit_array]
sigma_remain = np.delete(sigma, i, axis=0)[0:limit_array]
# optimize the calibration of "tweak" with respect to all other orders
fl_cor, X = optimize_calibration(wl0, wl1, wl_tweak, fl_tweak, sigma_tweak, wl_remain.flatten(), fl_remain.flatten(), sigma_remain.flatten(), amp_f, l_f, order=order, mu_GP=mu_GP)
# replace this epoch with the corrected fluxes
fl_out[i] = fl_cor
return fl_out
[docs]def cycle_calibration_chunk(chunk, amp_f, l_f, n_cycles, order=1, limit_array=3, mu_GP=1.0, soften=1.0):
'''
Do the calibration on a chunk at at time, incorporating the masks.
'''
# Figure out the min and max wavelengths to set the domain of the Chebyshevs
wl0 = np.min(chunk.wl)
wl1 = np.max(chunk.wl)
# Temporary copy, so that we can do multiple cycle corrections.
fl_out = np.copy(chunk.fl)
# Soften the sigmas a little bit to prevent inversion errors.
sigma = soften * chunk.sigma
for cycle in range(n_cycles):
for i in range(chunk.n_epochs):
wl_tweak = chunk.wl[i]
fl_tweak = fl_out[i]
sigma_tweak = chunk.sigma[i]
mask_tweak = chunk.mask[i]
# Temporary arrays without the epoch we just chose
wl_remain = np.delete(chunk.wl, i, axis=0)[0:limit_array]
fl_remain = np.delete(fl_out, i, axis=0)[0:limit_array]
sigma_remain = np.delete(chunk.sigma, i, axis=0)[0:limit_array]
mask_remain = np.delete(chunk.mask, i, axis=0)[0:limit_array]
# optimize the calibration of "tweak" with respect to all other orders
fl_cor, X = optimize_calibration(wl0, wl1, wl_tweak[mask_tweak], fl_tweak[mask_tweak], sigma_tweak[mask_tweak], wl_remain[mask_remain], fl_remain[mask_remain], sigma_remain[mask_remain], amp_f, l_f, order=order, mu_GP=mu_GP)
# since fl_cor may have actually have fewer pixels than originally, we can't just
# stuff the corrected fluxes directly back into the array.
# Instead, we need to re-evaluate the line on all the wavelengths
# using the chebyshev coefficients, and apply this.
T = []
for k in range(0, order + 1):
pass
coeff = [0 for j in range(k)] + [1]
Chtemp = Ch(coeff, domain=[wl0, wl1])
Ttemp = Chtemp(wl_tweak)
T += [Ttemp]
T = np.array(T)
D = fl_tweak[:,np.newaxis] * T.T
# Apply the correction
fl_cor = np.dot(D, X)
# replace this epoch with the corrected fluxes
fl_out[i] = fl_cor
# Stuff the full set of corrected fluxes (masked points included) back into the chunk.
chunk.fl[:] = fl_out