# pylint: disable=too-many-lines, too-many-function-args, redefined-outer-name
"""Diagnostic functions for ArviZ."""
import warnings
from collections.abc import Sequence
import numpy as np
import packaging
import pandas as pd
import scipy
from scipy import stats
from ..data import convert_to_dataset
from ..utils import Numba, _numba_var, _stack, _var_names
from .density_utils import histogram as _histogram
from .stats_utils import _circular_standard_deviation, _sqrt
from .stats_utils import autocov as _autocov
from .stats_utils import not_valid as _not_valid
from .stats_utils import quantile as _quantile
from .stats_utils import stats_variance_2d as svar
from .stats_utils import wrap_xarray_ufunc as _wrap_xarray_ufunc
__all__ = ["bfmi", "ess", "rhat", "mcse"]
[docs]
def bfmi(data):
r"""Calculate the estimated Bayesian fraction of missing information (BFMI).
BFMI quantifies how well momentum resampling matches the marginal energy distribution. For more
information on BFMI, see https://arxiv.org/pdf/1604.00695v1.pdf. The current advice is that
values smaller than 0.3 indicate poor sampling. However, this threshold is
provisional and may change. See
`pystan_workflow <http://mc-stan.org/users/documentation/case-studies/pystan_workflow.html>`_
for more information.
Parameters
----------
data : obj
Any object that can be converted to an :class:`arviz.InferenceData` object.
Refer to documentation of :func:`arviz.convert_to_dataset` for details.
If InferenceData, energy variable needs to be found.
Returns
-------
z : array
The Bayesian fraction of missing information of the model and trace. One element per
chain in the trace.
See Also
--------
plot_energy : Plot energy transition distribution and marginal energy
distribution in HMC algorithms.
Examples
--------
Compute the BFMI of an InferenceData object
.. ipython::
In [1]: import arviz as az
...: data = az.load_arviz_data('radon')
...: az.bfmi(data)
"""
if isinstance(data, np.ndarray):
return _bfmi(data)
dataset = convert_to_dataset(data, group="sample_stats")
if not hasattr(dataset, "energy"):
raise TypeError("Energy variable was not found.")
return _bfmi(dataset.energy.transpose("chain", "draw"))
[docs]
def ess(
data,
*,
var_names=None,
method="bulk",
relative=False,
prob=None,
dask_kwargs=None,
):
r"""Calculate estimate of the effective sample size (ess).
Parameters
----------
data : obj
Any object that can be converted to an :class:`arviz.InferenceData` object.
Refer to documentation of :func:`arviz.convert_to_dataset` for details.
For ndarray: shape = (chain, draw).
For n-dimensional ndarray transform first to dataset with :func:`arviz.convert_to_dataset`.
var_names : str or list of str
Names of variables to include in the return value Dataset.
method : str, optional, default "bulk"
Select ess method. Valid methods are:
- "bulk"
- "tail" # prob, optional
- "quantile" # prob
- "mean" (old ess)
- "sd"
- "median"
- "mad" (mean absolute deviance)
- "z_scale"
- "folded"
- "identity"
- "local"
relative : bool
Return relative ess
``ress = ess / n``
prob : float, or tuple of two floats, optional
probability value for "tail", "quantile" or "local" ess functions.
dask_kwargs : dict, optional
Dask related kwargs passed to :func:`~arviz.wrap_xarray_ufunc`.
Returns
-------
xarray.Dataset
Return the effective sample size, :math:`\hat{N}_{eff}`
Notes
-----
The basic ess (:math:`N_{\mathit{eff}}`) diagnostic is computed by:
.. math:: \hat{N}_{\mathit{eff}} = \frac{MN}{\hat{\tau}}
.. math:: \hat{\tau} = -1 + 2 \sum_{t'=0}^K \hat{P}_{t'}
where :math:`M` is the number of chains, :math:`N` the number of draws,
:math:`\hat{\rho}_t` is the estimated _autocorrelation at lag :math:`t`, and
:math:`K` is the last integer for which :math:`\hat{P}_{K} = \hat{\rho}_{2K} +
\hat{\rho}_{2K+1}` is still positive.
The current implementation is similar to Stan, which uses Geyer's initial monotone sequence
criterion (Geyer, 1992; Geyer, 2011).
References
----------
* Vehtari et al. (2021). Rank-normalization, folding, and
localization: An improved Rhat for assessing convergence of
MCMC. Bayesian analysis, 16(2):667-718.
* https://mc-stan.org/docs/reference-manual/analysis.html#effective-sample-size.section
* Gelman et al. BDA3 (2013) Formula 11.8
See Also
--------
arviz.rhat : Compute estimate of rank normalized splitR-hat for a set of traces.
arviz.mcse : Calculate Markov Chain Standard Error statistic.
plot_ess : Plot quantile, local or evolution of effective sample sizes (ESS).
arviz.summary : Create a data frame with summary statistics.
Examples
--------
Calculate the effective_sample_size using the default arguments:
.. ipython::
In [1]: import arviz as az
...: data = az.load_arviz_data('non_centered_eight')
...: az.ess(data)
Calculate the ress of some of the variables
.. ipython::
In [1]: az.ess(data, relative=True, var_names=["mu", "theta_t"])
Calculate the ess using the "tail" method, leaving the `prob` argument at its default
value.
.. ipython::
In [1]: az.ess(data, method="tail")
"""
methods = {
"bulk": _ess_bulk,
"tail": _ess_tail,
"quantile": _ess_quantile,
"mean": _ess_mean,
"sd": _ess_sd,
"median": _ess_median,
"mad": _ess_mad,
"z_scale": _ess_z_scale,
"folded": _ess_folded,
"identity": _ess_identity,
"local": _ess_local,
}
if method not in methods:
raise TypeError(f"ess method {method} not found. Valid methods are:\n{', '.join(methods)}")
ess_func = methods[method]
if (method == "quantile") and prob is None:
raise TypeError("Quantile (prob) information needs to be defined.")
if isinstance(data, np.ndarray):
data = np.atleast_2d(data)
if len(data.shape) < 3:
if prob is not None:
return ess_func( # pylint: disable=unexpected-keyword-arg
data, prob=prob, relative=relative
)
return ess_func(data, relative=relative)
msg = (
"Only uni-dimensional ndarray variables are supported."
" Please transform first to dataset with `az.convert_to_dataset`."
)
raise TypeError(msg)
dataset = convert_to_dataset(data, group="posterior")
var_names = _var_names(var_names, dataset)
dataset = dataset if var_names is None else dataset[var_names]
ufunc_kwargs = {"ravel": False}
func_kwargs = {"relative": relative} if prob is None else {"prob": prob, "relative": relative}
return _wrap_xarray_ufunc(
ess_func,
dataset,
ufunc_kwargs=ufunc_kwargs,
func_kwargs=func_kwargs,
dask_kwargs=dask_kwargs,
)
[docs]
def rhat(data, *, var_names=None, method="rank", dask_kwargs=None):
r"""Compute estimate of rank normalized splitR-hat for a set of traces.
The rank normalized R-hat diagnostic tests for lack of convergence by comparing the variance
between multiple chains to the variance within each chain. If convergence has been achieved,
the between-chain and within-chain variances should be identical. To be most effective in
detecting evidence for nonconvergence, each chain should have been initialized to starting
values that are dispersed relative to the target distribution.
Parameters
----------
data : obj
Any object that can be converted to an :class:`arviz.InferenceData` object.
Refer to documentation of :func:`arviz.convert_to_dataset` for details.
At least 2 posterior chains are needed to compute this diagnostic of one or more
stochastic parameters.
For ndarray: shape = (chain, draw).
For n-dimensional ndarray transform first to dataset with ``az.convert_to_dataset``.
var_names : list
Names of variables to include in the rhat report
method : str
Select R-hat method. Valid methods are:
- "rank" # recommended by Vehtari et al. (2021)
- "split"
- "folded"
- "z_scale"
- "identity"
dask_kwargs : dict, optional
Dask related kwargs passed to :func:`~arviz.wrap_xarray_ufunc`.
Returns
-------
xarray.Dataset
Returns dataset of the potential scale reduction factors, :math:`\hat{R}`
See Also
--------
ess : Calculate estimate of the effective sample size (ess).
mcse : Calculate Markov Chain Standard Error statistic.
plot_forest : Forest plot to compare HDI intervals from a number of distributions.
Notes
-----
The diagnostic is computed by:
.. math:: \hat{R} = \sqrt{\frac{\hat{V}}{W}}
where :math:`W` is the within-chain variance and :math:`\hat{V}` is the posterior variance
estimate for the pooled rank-traces. This is the potential scale reduction factor, which
converges to unity when each of the traces is a sample from the target posterior. Values
greater than one indicate that one or more chains have not yet converged.
Rank values are calculated over all the chains with ``scipy.stats.rankdata``.
Each chain is split in two and normalized with the z-transform following
Vehtari et al. (2021).
References
----------
* Vehtari et al. (2021). Rank-normalization, folding, and
localization: An improved Rhat for assessing convergence of
MCMC. Bayesian analysis, 16(2):667-718.
* Gelman et al. BDA3 (2013)
* Brooks and Gelman (1998)
* Gelman and Rubin (1992)
Examples
--------
Calculate the R-hat using the default arguments:
.. ipython::
In [1]: import arviz as az
...: data = az.load_arviz_data("non_centered_eight")
...: az.rhat(data)
Calculate the R-hat of some variables using the folded method:
.. ipython::
In [1]: az.rhat(data, var_names=["mu", "theta_t"], method="folded")
"""
methods = {
"rank": _rhat_rank,
"split": _rhat_split,
"folded": _rhat_folded,
"z_scale": _rhat_z_scale,
"identity": _rhat_identity,
}
if method not in methods:
raise TypeError(
f"R-hat method {method} not found. Valid methods are:\n{', '.join(methods)}"
)
rhat_func = methods[method]
if isinstance(data, np.ndarray):
data = np.atleast_2d(data)
if len(data.shape) < 3:
return rhat_func(data)
msg = (
"Only uni-dimensional ndarray variables are supported."
" Please transform first to dataset with `az.convert_to_dataset`."
)
raise TypeError(msg)
dataset = convert_to_dataset(data, group="posterior")
var_names = _var_names(var_names, dataset)
dataset = dataset if var_names is None else dataset[var_names]
ufunc_kwargs = {"ravel": False}
func_kwargs = {}
return _wrap_xarray_ufunc(
rhat_func,
dataset,
ufunc_kwargs=ufunc_kwargs,
func_kwargs=func_kwargs,
dask_kwargs=dask_kwargs,
)
[docs]
def mcse(data, *, var_names=None, method="mean", prob=None, dask_kwargs=None):
"""Calculate Markov Chain Standard Error statistic.
Parameters
----------
data : obj
Any object that can be converted to an :class:`arviz.InferenceData` object
Refer to documentation of :func:`arviz.convert_to_dataset` for details
For ndarray: shape = (chain, draw).
For n-dimensional ndarray transform first to dataset with ``az.convert_to_dataset``.
var_names : list
Names of variables to include in the rhat report
method : str
Select mcse method. Valid methods are:
- "mean"
- "sd"
- "median"
- "quantile"
prob : float
Quantile information.
dask_kwargs : dict, optional
Dask related kwargs passed to :func:`~arviz.wrap_xarray_ufunc`.
Returns
-------
xarray.Dataset
Return the msce dataset
See Also
--------
ess : Compute autocovariance estimates for every lag for the input array.
summary : Create a data frame with summary statistics.
plot_mcse : Plot quantile or local Monte Carlo Standard Error.
Examples
--------
Calculate the Markov Chain Standard Error using the default arguments:
.. ipython::
In [1]: import arviz as az
...: data = az.load_arviz_data("non_centered_eight")
...: az.mcse(data)
Calculate the Markov Chain Standard Error using the quantile method:
.. ipython::
In [1]: az.mcse(data, method="quantile", prob=0.7)
"""
methods = {
"mean": _mcse_mean,
"sd": _mcse_sd,
"median": _mcse_median,
"quantile": _mcse_quantile,
}
if method not in methods:
raise TypeError(
"mcse method {} not found. Valid methods are:\n{}".format(
method, "\n ".join(methods)
)
)
mcse_func = methods[method]
if method == "quantile" and prob is None:
raise TypeError("Quantile (prob) information needs to be defined.")
if isinstance(data, np.ndarray):
data = np.atleast_2d(data)
if len(data.shape) < 3:
if prob is not None:
return mcse_func(data, prob=prob) # pylint: disable=unexpected-keyword-arg
return mcse_func(data)
msg = (
"Only uni-dimensional ndarray variables are supported."
" Please transform first to dataset with `az.convert_to_dataset`."
)
raise TypeError(msg)
dataset = convert_to_dataset(data, group="posterior")
var_names = _var_names(var_names, dataset)
dataset = dataset if var_names is None else dataset[var_names]
ufunc_kwargs = {"ravel": False}
func_kwargs = {} if prob is None else {"prob": prob}
return _wrap_xarray_ufunc(
mcse_func,
dataset,
ufunc_kwargs=ufunc_kwargs,
func_kwargs=func_kwargs,
dask_kwargs=dask_kwargs,
)
def ks_summary(pareto_tail_indices):
"""Display a summary of Pareto tail indices.
Parameters
----------
pareto_tail_indices : array
Pareto tail indices.
Returns
-------
df_k : dataframe
Dataframe containing k diagnostic values.
"""
_numba_flag = Numba.numba_flag
if _numba_flag:
bins = np.asarray([-np.inf, 0.5, 0.7, 1, np.inf])
kcounts, *_ = _histogram(pareto_tail_indices, bins)
else:
kcounts, *_ = _histogram(pareto_tail_indices, bins=[-np.inf, 0.5, 0.7, 1, np.inf])
kprop = kcounts / len(pareto_tail_indices) * 100
df_k = pd.DataFrame(
dict(_=["(good)", "(ok)", "(bad)", "(very bad)"], Count=kcounts, Pct=kprop)
).rename(index={0: "(-Inf, 0.5]", 1: " (0.5, 0.7]", 2: " (0.7, 1]", 3: " (1, Inf)"})
if np.sum(kcounts[1:]) == 0:
warnings.warn("All Pareto k estimates are good (k < 0.5)")
elif np.sum(kcounts[2:]) == 0:
warnings.warn("All Pareto k estimates are ok (k < 0.7)")
return df_k
def _bfmi(energy):
r"""Calculate the estimated Bayesian fraction of missing information (BFMI).
BFMI quantifies how well momentum resampling matches the marginal energy distribution. For more
information on BFMI, see https://arxiv.org/pdf/1604.00695v1.pdf. The current advice is that
values smaller than 0.3 indicate poor sampling. However, this threshold is provisional and may
change. See http://mc-stan.org/users/documentation/case-studies/pystan_workflow.html for more
information.
Parameters
----------
energy : NumPy array
Should be extracted from a gradient based sampler, such as in Stan or PyMC3. Typically,
after converting a trace or fit to InferenceData, the energy will be in
`data.sample_stats.energy`.
Returns
-------
z : array
The Bayesian fraction of missing information of the model and trace. One element per
chain in the trace.
"""
energy_mat = np.atleast_2d(energy)
num = np.square(np.diff(energy_mat, axis=1)).mean(axis=1) # pylint: disable=no-member
if energy_mat.ndim == 2:
den = _numba_var(svar, np.var, energy_mat, axis=1, ddof=1)
else:
den = np.var(energy, axis=1, ddof=1)
return num / den
def _backtransform_ranks(arr, c=3 / 8): # pylint: disable=invalid-name
"""Backtransformation of ranks.
Parameters
----------
arr : np.ndarray
Ranks array
c : float
Fractional offset. Defaults to c = 3/8 as recommended by Blom (1958).
Returns
-------
np.ndarray
References
----------
Blom, G. (1958). Statistical Estimates and Transformed Beta-Variables. Wiley; New York.
"""
arr = np.asarray(arr)
size = arr.size
return (arr - c) / (size - 2 * c + 1)
def _z_scale(ary):
"""Calculate z_scale.
Parameters
----------
ary : np.ndarray
Returns
-------
np.ndarray
"""
ary = np.asarray(ary)
if packaging.version.parse(scipy.__version__) < packaging.version.parse("1.10.0.dev0"):
rank = stats.rankdata(ary, method="average")
else:
# the .ravel part is only needed to overcom a bug in scipy 1.10.0.rc1
rank = stats.rankdata( # pylint: disable=unexpected-keyword-arg
ary, method="average", nan_policy="omit"
)
rank = _backtransform_ranks(rank)
z = stats.norm.ppf(rank)
z = z.reshape(ary.shape)
return z
def _split_chains(ary):
"""Split and stack chains."""
ary = np.asarray(ary)
if len(ary.shape) <= 1:
ary = np.atleast_2d(ary)
_, n_draw = ary.shape
half = n_draw // 2
return _stack(ary[:, :half], ary[:, -half:])
def _z_fold(ary):
"""Fold and z-scale values."""
ary = np.asarray(ary)
ary = abs(ary - np.median(ary))
ary = _z_scale(ary)
return ary
def _rhat(ary):
"""Compute the rhat for a 2d array."""
_numba_flag = Numba.numba_flag
ary = np.asarray(ary, dtype=float)
if _not_valid(ary, check_shape=False):
return np.nan
_, num_samples = ary.shape
# Calculate chain mean
chain_mean = np.mean(ary, axis=1)
# Calculate chain variance
chain_var = _numba_var(svar, np.var, ary, axis=1, ddof=1)
# Calculate between-chain variance
between_chain_variance = num_samples * _numba_var(svar, np.var, chain_mean, axis=None, ddof=1)
# Calculate within-chain variance
within_chain_variance = np.mean(chain_var)
# Estimate of marginal posterior variance
rhat_value = np.sqrt(
(between_chain_variance / within_chain_variance + num_samples - 1) / (num_samples)
)
return rhat_value
def _rhat_rank(ary):
"""Compute the rank normalized rhat for 2d array.
Computation follows https://arxiv.org/abs/1903.08008
"""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=2)):
return np.nan
split_ary = _split_chains(ary)
rhat_bulk = _rhat(_z_scale(split_ary))
split_ary_folded = abs(split_ary - np.median(split_ary))
rhat_tail = _rhat(_z_scale(split_ary_folded))
rhat_rank = max(rhat_bulk, rhat_tail)
return rhat_rank
def _rhat_folded(ary):
"""Calculate split-Rhat for folded z-values."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=2)):
return np.nan
ary = _z_fold(_split_chains(ary))
return _rhat(ary)
def _rhat_z_scale(ary):
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=2)):
return np.nan
return _rhat(_z_scale(_split_chains(ary)))
def _rhat_split(ary):
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=2)):
return np.nan
return _rhat(_split_chains(ary))
def _rhat_identity(ary):
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=2)):
return np.nan
return _rhat(ary)
def _ess(ary, relative=False):
"""Compute the effective sample size for a 2D array."""
_numba_flag = Numba.numba_flag
ary = np.asarray(ary, dtype=float)
if _not_valid(ary, check_shape=False):
return np.nan
if (np.max(ary) - np.min(ary)) < np.finfo(float).resolution: # pylint: disable=no-member
return ary.size
if len(ary.shape) < 2:
ary = np.atleast_2d(ary)
n_chain, n_draw = ary.shape
acov = _autocov(ary, axis=1)
chain_mean = ary.mean(axis=1)
mean_var = np.mean(acov[:, 0]) * n_draw / (n_draw - 1.0)
var_plus = mean_var * (n_draw - 1.0) / n_draw
if n_chain > 1:
var_plus += _numba_var(svar, np.var, chain_mean, axis=None, ddof=1)
rho_hat_t = np.zeros(n_draw)
rho_hat_even = 1.0
rho_hat_t[0] = rho_hat_even
rho_hat_odd = 1.0 - (mean_var - np.mean(acov[:, 1])) / var_plus
rho_hat_t[1] = rho_hat_odd
# Geyer's initial positive sequence
t = 1
while t < (n_draw - 3) and (rho_hat_even + rho_hat_odd) > 0.0:
rho_hat_even = 1.0 - (mean_var - np.mean(acov[:, t + 1])) / var_plus
rho_hat_odd = 1.0 - (mean_var - np.mean(acov[:, t + 2])) / var_plus
if (rho_hat_even + rho_hat_odd) >= 0:
rho_hat_t[t + 1] = rho_hat_even
rho_hat_t[t + 2] = rho_hat_odd
t += 2
max_t = t - 2
# improve estimation
if rho_hat_even > 0:
rho_hat_t[max_t + 1] = rho_hat_even
# Geyer's initial monotone sequence
t = 1
while t <= max_t - 2:
if (rho_hat_t[t + 1] + rho_hat_t[t + 2]) > (rho_hat_t[t - 1] + rho_hat_t[t]):
rho_hat_t[t + 1] = (rho_hat_t[t - 1] + rho_hat_t[t]) / 2.0
rho_hat_t[t + 2] = rho_hat_t[t + 1]
t += 2
ess = n_chain * n_draw
tau_hat = -1.0 + 2.0 * np.sum(rho_hat_t[: max_t + 1]) + np.sum(rho_hat_t[max_t + 1 : max_t + 2])
tau_hat = max(tau_hat, 1 / np.log10(ess))
ess = (1 if relative else ess) / tau_hat
if np.isnan(rho_hat_t).any():
ess = np.nan
return ess
def _ess_bulk(ary, relative=False):
"""Compute the effective sample size for the bulk."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
z_scaled = _z_scale(_split_chains(ary))
ess_bulk = _ess(z_scaled, relative=relative)
return ess_bulk
def _ess_tail(ary, prob=None, relative=False):
"""Compute the effective sample size for the tail.
If `prob` defined, ess = min(qess(prob), qess(1-prob))
"""
if prob is None:
prob = (0.05, 0.95)
elif not isinstance(prob, Sequence):
prob = (prob, 1 - prob)
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
prob_low, prob_high = prob
quantile_low_ess = _ess_quantile(ary, prob_low, relative=relative)
quantile_high_ess = _ess_quantile(ary, prob_high, relative=relative)
return min(quantile_low_ess, quantile_high_ess)
def _ess_mean(ary, relative=False):
"""Compute the effective sample size for the mean."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
return _ess(_split_chains(ary), relative=relative)
def _ess_sd(ary, relative=False):
"""Compute the effective sample size for the sd."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
ary = _split_chains(ary)
return min(_ess(ary, relative=relative), _ess(ary**2, relative=relative))
def _ess_quantile(ary, prob, relative=False):
"""Compute the effective sample size for the specific residual."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
if prob is None:
raise TypeError("Prob not defined.")
(quantile,) = _quantile(ary, prob)
iquantile = ary <= quantile
return _ess(_split_chains(iquantile), relative=relative)
def _ess_local(ary, prob, relative=False):
"""Compute the effective sample size for the specific residual."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
if prob is None:
raise TypeError("Prob not defined.")
if len(prob) != 2:
raise ValueError("Prob argument in ess local must be upper and lower bound")
quantile = _quantile(ary, prob)
iquantile = (quantile[0] <= ary) & (ary <= quantile[1])
return _ess(_split_chains(iquantile), relative=relative)
def _ess_z_scale(ary, relative=False):
"""Calculate ess for z-scaLe."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
return _ess(_z_scale(_split_chains(ary)), relative=relative)
def _ess_folded(ary, relative=False):
"""Calculate split-ess for folded data."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
return _ess(_z_fold(_split_chains(ary)), relative=relative)
def _ess_median(ary, relative=False):
"""Calculate split-ess for median."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
return _ess_quantile(ary, 0.5, relative=relative)
def _ess_mad(ary, relative=False):
"""Calculate split-ess for mean absolute deviance."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
ary = abs(ary - np.median(ary))
ary = ary <= np.median(ary)
ary = _z_scale(_split_chains(ary))
return _ess(ary, relative=relative)
def _ess_identity(ary, relative=False):
"""Calculate ess."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
return _ess(ary, relative=relative)
def _mcse_mean(ary):
"""Compute the Markov Chain mean error."""
_numba_flag = Numba.numba_flag
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
ess = _ess_mean(ary)
if _numba_flag:
sd = _sqrt(svar(np.ravel(ary), ddof=1), np.zeros(1))
else:
sd = np.std(ary, ddof=1)
mcse_mean_value = sd / np.sqrt(ess)
return mcse_mean_value
def _mcse_sd(ary):
"""Compute the Markov Chain sd error."""
_numba_flag = Numba.numba_flag
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
ess = _ess_sd(ary)
if _numba_flag:
sd = float(_sqrt(svar(np.ravel(ary), ddof=1), np.zeros(1)).item())
else:
sd = np.std(ary, ddof=1)
fac_mcse_sd = np.sqrt(np.exp(1) * (1 - 1 / ess) ** (ess - 1) - 1)
mcse_sd_value = sd * fac_mcse_sd
return mcse_sd_value
def _mcse_median(ary):
"""Compute the Markov Chain median error."""
return _mcse_quantile(ary, 0.5)
def _mcse_quantile(ary, prob):
"""Compute the Markov Chain quantile error at quantile=prob."""
ary = np.asarray(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
return np.nan
ess = _ess_quantile(ary, prob)
probability = [0.1586553, 0.8413447]
with np.errstate(invalid="ignore"):
ppf = stats.beta.ppf(probability, ess * prob + 1, ess * (1 - prob) + 1)
sorted_ary = np.sort(ary.ravel())
size = sorted_ary.size
ppf_size = ppf * size - 1
th1 = sorted_ary[int(np.floor(np.nanmax((ppf_size[0], 0))))]
th2 = sorted_ary[int(np.ceil(np.nanmin((ppf_size[1], size - 1))))]
return (th2 - th1) / 2
def _mc_error(ary, batches=5, circular=False):
"""Calculate the simulation standard error, accounting for non-independent samples.
The trace is divided into batches, and the standard deviation of the batch
means is calculated.
Parameters
----------
ary : Numpy array
An array containing MCMC samples
batches : integer
Number of batches
circular : bool
Whether to compute the error taking into account `ary` is a circular variable
(in the range [-np.pi, np.pi]) or not. Defaults to False (i.e non-circular variables).
Returns
-------
mc_error : float
Simulation standard error
"""
_numba_flag = Numba.numba_flag
if ary.ndim > 1:
dims = np.shape(ary)
trace = np.transpose([t.ravel() for t in ary])
return np.reshape([_mc_error(t, batches) for t in trace], dims[1:])
else:
if _not_valid(ary, check_shape=False):
return np.nan
if batches == 1:
if circular:
if _numba_flag:
std = _circular_standard_deviation(ary, high=np.pi, low=-np.pi)
else:
std = stats.circstd(ary, high=np.pi, low=-np.pi)
elif _numba_flag:
std = float(_sqrt(svar(ary), np.zeros(1)).item())
else:
std = np.std(ary)
return std / np.sqrt(len(ary))
batched_traces = np.resize(ary, (batches, int(len(ary) / batches)))
if circular:
means = stats.circmean(batched_traces, high=np.pi, low=-np.pi, axis=1)
if _numba_flag:
std = _circular_standard_deviation(means, high=np.pi, low=-np.pi)
else:
std = stats.circstd(means, high=np.pi, low=-np.pi)
else:
means = np.mean(batched_traces, 1)
std = _sqrt(svar(means), np.zeros(1)) if _numba_flag else np.std(means)
return std / np.sqrt(batches)
def _multichain_statistics(ary, focus="mean"):
"""Calculate efficiently multichain statistics for summary.
Parameters
----------
ary : numpy.ndarray
focus : select focus for the statistics. Deafault is mean.
Returns
-------
tuple
Order of return parameters is
If focus equals "mean"
- mcse_mean, mcse_sd, ess_bulk, ess_tail, r_hat
Else if focus equals "median"
- mcse_median, ess_median, ess_tail, r_hat
"""
ary = np.atleast_2d(ary)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=1)):
if focus == "mean":
return np.nan, np.nan, np.nan, np.nan, np.nan
return np.nan, np.nan, np.nan, np.nan
z_split = _z_scale(_split_chains(ary))
# ess tail
quantile05, quantile95 = _quantile(ary, [0.05, 0.95])
iquantile05 = ary <= quantile05
quantile05_ess = _ess(_split_chains(iquantile05))
iquantile95 = ary <= quantile95
quantile95_ess = _ess(_split_chains(iquantile95))
ess_tail_value = min(quantile05_ess, quantile95_ess)
if _not_valid(ary, shape_kwargs=dict(min_draws=4, min_chains=2)):
rhat_value = np.nan
else:
# r_hat
rhat_bulk = _rhat(z_split)
ary_folded = np.abs(ary - np.median(ary))
rhat_tail = _rhat(_z_scale(_split_chains(ary_folded)))
rhat_value = max(rhat_bulk, rhat_tail)
if focus == "mean":
# ess mean
ess_mean_value = _ess_mean(ary)
# ess sd
ess_sd_value = _ess_sd(ary)
# mcse_mean
sd = np.std(ary, ddof=1)
mcse_mean_value = sd / np.sqrt(ess_mean_value)
# ess bulk
ess_bulk_value = _ess(z_split)
# mcse_sd
fac_mcse_sd = np.sqrt(np.exp(1) * (1 - 1 / ess_sd_value) ** (ess_sd_value - 1) - 1)
mcse_sd_value = sd * fac_mcse_sd
return (
mcse_mean_value,
mcse_sd_value,
ess_bulk_value,
ess_tail_value,
rhat_value,
)
# ess median
ess_median_value = _ess_median(ary)
# mcse_median
mcse_median_value = _mcse_median(ary)
return (
mcse_median_value,
ess_median_value,
ess_tail_value,
rhat_value,
)