"""Functions performing various calculations."""
import numpy as np
import pandas as pd
from functools import reduce
from typing import List, Union
import sympy
import numbers
from .C import *
import petab
__all__ = ['calculate_residuals', 'calculate_residuals_for_table',
'get_symbolic_noise_formulas', 'evaluate_noise_formula',
'calculate_chi2', 'calculate_chi2_for_table_from_residuals',
'calculate_llh', 'calculate_llh_for_table', 'calculate_single_llh']
[docs]def calculate_residuals(
measurement_dfs: Union[List[pd.DataFrame], pd.DataFrame],
simulation_dfs: Union[List[pd.DataFrame], pd.DataFrame],
observable_dfs: Union[List[pd.DataFrame], pd.DataFrame],
parameter_dfs: Union[List[pd.DataFrame], pd.DataFrame],
normalize: bool = True,
scale: bool = True
) -> List[pd.DataFrame]:
"""Calculate residuals.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
normalize:
Whether to normalize residuals by the noise standard deviation
terms.
scale:
Whether to calculate residuals of scaled values.
Returns:
List of DataFrames in the same structure as `measurement_dfs`
with a field `residual` instead of measurement.
"""
# convenience
if isinstance(measurement_dfs, pd.DataFrame):
measurement_dfs = [measurement_dfs]
if isinstance(simulation_dfs, pd.DataFrame):
simulation_dfs = [simulation_dfs]
if isinstance(observable_dfs, pd.DataFrame):
observable_dfs = [observable_dfs]
if isinstance(parameter_dfs, pd.DataFrame):
parameter_dfs = [parameter_dfs]
# iterate over data frames
residual_dfs = []
for (measurement_df, simulation_df, observable_df, parameter_df) in zip(
measurement_dfs, simulation_dfs, observable_dfs, parameter_dfs):
residual_df = calculate_residuals_for_table(
measurement_df, simulation_df, observable_df, parameter_df,
normalize, scale)
residual_dfs.append(residual_df)
return residual_dfs
[docs]def calculate_residuals_for_table(
measurement_df: pd.DataFrame,
simulation_df: pd.DataFrame,
observable_df: pd.DataFrame,
parameter_df: pd.DataFrame,
normalize: bool = True,
scale: bool = True
) -> pd.DataFrame:
"""
Calculate residuals for a single measurement table.
For the arguments, see `calculate_residuals`.
"""
# create residual df as copy of measurement df, change column
residual_df = measurement_df.copy(deep=True).rename(
columns={MEASUREMENT: RESIDUAL})
# matching columns
compared_cols = set(MEASUREMENT_DF_COLS)
compared_cols -= {MEASUREMENT}
compared_cols &= set(measurement_df.columns)
compared_cols &= set(simulation_df.columns)
# compute noise formulas for observables
noise_formulas = get_symbolic_noise_formulas(observable_df)
# iterate over measurements, find corresponding simulations
for irow, row in measurement_df.iterrows():
measurement = row[MEASUREMENT]
# look up in simulation df
masks = [(simulation_df[col] == row[col]) | petab.is_empty(row[col])
for col in compared_cols]
mask = reduce(lambda x, y: x & y, masks)
simulation = simulation_df.loc[mask][SIMULATION].iloc[0]
if scale:
# apply scaling
observable = observable_df.loc[row[OBSERVABLE_ID]]
trafo = observable.get(OBSERVABLE_TRANSFORMATION, LIN)
simulation = petab.scale(simulation, trafo)
measurement = petab.scale(measurement, trafo)
# non-normalized residual is just the difference
residual = simulation - measurement
noise_value = 1
if normalize:
# look up noise standard deviation
noise_value = evaluate_noise_formula(
row, noise_formulas, parameter_df, simulation)
residual /= noise_value
# fill in value
residual_df.loc[irow, RESIDUAL] = residual
return residual_df
[docs]def calculate_chi2(
measurement_dfs: Union[List[pd.DataFrame], pd.DataFrame],
simulation_dfs: Union[List[pd.DataFrame], pd.DataFrame],
observable_dfs: Union[List[pd.DataFrame], pd.DataFrame],
parameter_dfs: Union[List[pd.DataFrame], pd.DataFrame],
normalize: bool = True,
scale: bool = True
) -> float:
"""Calculate the chi2 value.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
normalize:
Whether to normalize residuals by the noise standard deviation
terms.
scale:
Whether to calculate residuals of scaled values.
Returns:
The aggregated chi2 value.
"""
residual_dfs = calculate_residuals(
measurement_dfs, simulation_dfs, observable_dfs, parameter_dfs,
normalize, scale)
chi2s = [calculate_chi2_for_table_from_residuals(df)
for df in residual_dfs]
chi2 = sum(chi2s)
return chi2
[docs]def calculate_chi2_for_table_from_residuals(
residual_df: pd.DataFrame) -> float:
"""Compute chi2 value for a single residual table."""
return (np.array(residual_df[RESIDUAL])**2).sum()
[docs]def calculate_llh(
measurement_dfs: Union[List[pd.DataFrame], pd.DataFrame],
simulation_dfs: Union[List[pd.DataFrame], pd.DataFrame],
observable_dfs: Union[List[pd.DataFrame], pd.DataFrame],
parameter_dfs: Union[List[pd.DataFrame], pd.DataFrame],
) -> float:
"""Calculate total log likelihood.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
Returns:
The log-likelihood.
"""
# convenience
if isinstance(measurement_dfs, pd.DataFrame):
measurement_dfs = [measurement_dfs]
if isinstance(simulation_dfs, pd.DataFrame):
simulation_dfs = [simulation_dfs]
if isinstance(observable_dfs, pd.DataFrame):
observable_dfs = [observable_dfs]
if isinstance(parameter_dfs, pd.DataFrame):
parameter_dfs = [parameter_dfs]
# iterate over data frames
llhs = []
for (measurement_df, simulation_df, observable_df, parameter_df) in zip(
measurement_dfs, simulation_dfs, observable_dfs, parameter_dfs):
_llh = calculate_llh_for_table(
measurement_df, simulation_df, observable_df, parameter_df)
llhs.append(_llh)
llh = sum(llhs)
return llh
[docs]def calculate_llh_for_table(
measurement_df: pd.DataFrame,
simulation_df: pd.DataFrame,
observable_df: pd.DataFrame,
parameter_df: pd.DataFrame) -> float:
"""Calculate log-likelihood for one set of tables. For the arguments, see
`calculate_llh`."""
llhs = []
# matching columns
compared_cols = set(MEASUREMENT_DF_COLS)
compared_cols -= {MEASUREMENT}
compared_cols &= set(measurement_df.columns)
compared_cols &= set(simulation_df.columns)
# compute noise formulas for observables
noise_formulas = get_symbolic_noise_formulas(observable_df)
# iterate over measurements, find corresponding simulations
for irow, row in measurement_df.iterrows():
measurement = row[MEASUREMENT]
# look up in simulation df
masks = [(simulation_df[col] == row[col]) | petab.is_empty(row[col])
for col in compared_cols]
mask = reduce(lambda x, y: x & y, masks)
simulation = simulation_df.loc[mask][SIMULATION].iloc[0]
observable = observable_df.loc[row[OBSERVABLE_ID]]
# get scale
scale = observable.get(OBSERVABLE_TRANSFORMATION, LIN)
# get noise standard deviation
noise_value = evaluate_noise_formula(
row, noise_formulas, parameter_df, petab.scale(simulation, scale))
# get noise distribution
noise_distribution = observable.get(NOISE_DISTRIBUTION, NORMAL)
llh = calculate_single_llh(
measurement, simulation, scale, noise_distribution, noise_value)
llhs.append(llh)
llh = sum(llhs)
return llh
[docs]def calculate_single_llh(
measurement: float,
simulation: float,
scale: str,
noise_distribution: str,
noise_value: float) -> float:
"""Calculate a single log likelihood.
Arguments:
measurement: The measurement value.
simulation: The simulated value.
scale: The scale on which the noise model is to be applied.
noise_distribution: The noise distribution.
noise_value: The considered noise models possess a single noise
parameter, e.g. the normal standard deviation.
Returns:
The computed likelihood for the given values.
"""
# short-hand
m, s, sigma = measurement, simulation, noise_value
pi, log, log10 = np.pi, np.log, np.log10
# go over the possible cases
if noise_distribution == NORMAL and scale == LIN:
nllh = 0.5*log(2*pi*sigma**2) + 0.5*((s-m)/sigma)**2
elif noise_distribution == NORMAL and scale == LOG:
nllh = 0.5*log(2*pi*sigma**2*m**2) + 0.5*((log(s)-log(m))/sigma)**2
elif noise_distribution == NORMAL and scale == LOG10:
nllh = 0.5*log(2*pi*sigma**2*m**2*log(10)**2) + \
0.5*((log10(s)-log10(m))/sigma)**2
elif noise_distribution == LAPLACE and scale == LIN:
nllh = log(2*sigma) + abs((s-m)/sigma)
elif noise_distribution == LAPLACE and scale == LOG:
nllh = log(2*sigma*m) + abs((log(s)-log(m))/sigma)
elif noise_distribution == LAPLACE and scale == LOG10:
nllh = log(2*sigma*m*log(10)) + abs((log10(s)-log10(m))/sigma)
else:
raise NotImplementedError(
"Unsupported combination of noise_distribution and scale "
f"specified: {noise_distribution}, {scale}.")
llh = - nllh
return llh