"""
samplermcmc.proposal
"""
import numpy as np
import sys
from .tools import get_autocorr_len, gauss_ll
try:
from hyperkde import HyperKDE
except ImportError:
HyperKDE = None
[docs]
def init_logger():
"""Default logger is stdout."""
from importlib import reload # Not needed in Python 2
import logging
reload(logging)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
return logging.getLogger()
[docs]
class Proposal:
"""A proposal function. We use a class to stored meta-data and ease
the customization.
"""
def __init__(self, names, logger=None, sort_name=None, sort_range=None, **kwargs):
"""Default initialization.
Number of parameters and their names are set here.
sort_range: (1st index use to reshape, first axis size)
"""
self.names = np.array(names)
self.dim = len(names)
self.name = "default"
self.sort_name = sort_name
if sort_name is not None:
self.sort_sel = np.where(self.names == sort_name)[0]
self.sort_range = sort_range
if logger is None:
logger = init_logger()
self.logger = logger
self.ps_slow = kwargs.get("ps_slow", False)
self.ps_fast = kwargs.get("ps_fast", False)
if self.ps_slow and self.ps_fast:
raise ValueError('Only one of ps_slow and ps_fast can be set to True')
def chn_mask(self, chain):
if self.ps_slow:
return chain.slow_mask
if self.ps_fast:
return chain.fast_mask
return list(range(self.dim))
[docs]
def get_cov(self, chain):
"""Return covariance SVD decomposition."""
if self.ps_slow:
return chain.cov_slow, chain.U_slow, chain.S_slow
if self.ps_fast:
return chain.cov_fast, chain.U_fast, chain.S_fast
return chain.cov, chain.U, chain.S
def sort_by(self, x):
if self.sort_name is None:
return x
short_a = np.argsort(x[self.sort_sel])
reshaped = x[self.sort_range[0] :].reshape(-1, self.sort_range[1])
reshaped = reshaped[short_a, :]
x = np.hstack([x[0 : self.sort_range[0]], reshaped.flatten()])
return x
def history(self):
return dict()
def Lx(self, chain):
if self.ps_slow:
return chain.xslowll
if self.ps_fast:
return chain.xfastll
return chain.Lx
def log_l(self, chain, x):
x = x.copy()
if self.ps_slow:
# We return the full loglike at a different slow point so
# this can get really slow.
return chain._log_l(x, chain.split(chain.Mx)[1],
**chain.slow_fun(x))
if self.ps_fast:
return chain._log_l(chain.split(chain.Mx)[0], x,
**chain.extra)
#chain.wrap(x, chain.wrap_mask)
return chain.log_l(x)
def get_sample(self, chain, ind):
smpl = chain.get_sample(ind)
if self.ps_slow:
return chain.split(smpl)[0]
if self.ps_fast:
return chain.split(smpl)[1]
return smpl
[docs]
class Slice(Proposal):
"""A slice sampler.
Any proposed points is accepted here.
"""
def __init__(self, names, get_cov=None, get_dir=None, **kwargs):
"""Set scaling probability and factor."""
super().__init__(names, **kwargs)
self.p1 = [0.95, 10.0]
self.p2 = [0.9, 0.2]
self.jump = [0.2, 3] # prob, sigma
self.max_cnt = 200
self.name = "slice"
self.cnt = []
self.get_cov = super().get_cov if get_cov is None else get_cov
self.get_direction = self.get_direction_ if get_dir is None else get_dir
[docs]
def get_direction_(self, x, U, S):
"""Design the direction in which we move."""
prob = np.random.rand()
if prob > self.p1[0]:
scale = self.p1[1]
elif prob > self.p2[0]:
scale = self.p2[1]
else:
scale = 1.0
d = len(x)
cd = scale * 2.38 / np.sqrt(d)
# dx = np.dot(U, np.random.randn(d) * cd * np.sqrt(S))
dx = cd * np.dot(np.random.randn(d)*np.sqrt(S), U.T)
if prob <= self.jump[0]: ### jump in the coordinate direction
ix = np.random.choice(d) ### choosing the coordinate
del_x = (
self.jump[1] * dx[ix]
) ### choose 3 sigma jump in that coord. direction
dx = np.zeros(d)
dx[ix] = del_x
return dx
[docs]
def slice(self, x, chain=None, **kwargs):
"""Return actual sample.
Args:
x: the current position
chain: all additional runtime data needed to propose a new point.
Returns:
y: the new position
status: 0 (do MH decision), 1 (accept) or -1 (reject)
logy: the log-likelihood at y position if already computed.
qxy: 0 for symmetric proposal distribution
"""
_, U, S = self.get_cov(chain)
dx = self.get_direction(x, U, S)
logLxT = self.Lx(chain) * chain.beta
logu = logLxT - np.random.exponential(1.0)
xr = np.copy(x)
for i in range(self.max_cnt): # going "right"
xr = xr + dx
loglik = self.log_l(chain, xr) * chain.beta
if loglik <= logu:
break
self.nl_cnt = i
xl = np.copy(x)
for i in range(self.nl_cnt, self.max_cnt): # going "left"
xl = xl - dx
loglik = self.log_l(chain, xl) * chain.beta
if loglik <= logu:
break
self.nl_cnt = i
for i in range(
self.nl_cnt, self.max_cnt
): # choosing a point along that direction
al1 = np.random.random()
xp = xl + al1*(xr-xl)
loglik = self.log_l(chain, xp) * chain.beta
if loglik > logu:
pars = xp
break
else:
sgn = np.dot((xp-x), (xr-xl))
if sgn<0:
xl = xp
else:
xr = xp
self.nl_cnt = i
self.cnt.append(i)
if self.nl_cnt >= self.max_cnt - 1 or not np.isfinite(loglik):
return x, 0, self.Lx(chain), 0
pars = self.sort_by(pars)
return pars, 1, loglik / chain.beta, 0
def history(self):
return dict({"slice_cnt": self.cnt})
[docs]
class SCAM(Proposal):
"""A Single Component Adaptive Metropolis sampler."""
def __init__(self, names, get_cov=None, **kwargs):
"""Set scaling probability and factor."""
super().__init__(names, **kwargs)
self.p1 = (0.97, 10.0)
self.p2 = (0.90, 0.2)
self.addNoise = False
self.name = "SCAM"
self.get_cov = super().get_cov if get_cov is None else get_cov
self.extra_fact = kwargs.get("extra_fact", 1.)
[docs]
def SCAM(self, x, chain=None, **kwargs):
"""Return actual sample by jumping in one component direction.
Args:
x: the current position
chain: all additional runtime data needed to propose a new point.
Returns:
y: the new position
status: 0 (do MH decision), 1 (accept) or -1 (reject)
logy: the log-likelihood at y position if already computed.
qxy: 0 for symmetric proposal distribution
"""
prob = np.random.rand()
if prob > self.p1[0]: # ocasional large/small jump:
scale = self.p1[1]
elif prob > self.p2[0]:
scale = self.p2[1]
else:
scale = 1.0
d = len(x)
_, U, S = self.get_cov(chain)
cd = 2.38 * scale / np.sqrt(d)
ind = np.unique(np.random.randint(0, self.dim, 1))
x_new = x + np.random.randn() * cd * np.sqrt(S[ind]) * U[:, ind].flatten()
x_new = self.sort_by(x_new)
if self.addNoise:
dx = 0.05 * np.random.normal(0.0, 1.0e-6 / self.dim, self.dim)
x_new = 0.95 * x_new + dx
return x_new, 0, None, 0
[docs]
class ReMHA(Proposal):
"""A Regional Metropolis Hastings Algorithm sampler."""
def __init__(self, names, **kwargs):
"""Set scaling probability and factor."""
super().__init__(names, **kwargs)
self.name = "ReMHA"
[docs]
def ReMHA(self, x, chain=None, **kwargs):
"""Return actual sample by jumping in one component direction.
Args:
x: the current position
chain: all additional runtime data needed to propose a new point.
Returns:
y: the new position
status: 0 (do MH decision), 1 (accept) or -1 (reject)
logy: the log-likelihood at y position if already computed.
qxy: 0 for symmetric proposal distribution
"""
d = len(x)
if not chain.modes:
chain.update_modes()
Nm = len(chain.modes)
means = np.array([md[0] for md in chain.modes])
covs = np.array([md[1] for md in chain.modes])
weights = np.array([md[2] for md in chain.modes])
## draw randomly from iy mode
iy = np.random.choice(len(weights), p=weights)
mn_y = np.array(means[iy, :])
cv_y = 2.38**2 * np.array(covs[iy, :, :]) / d
y = np.random.multivariate_normal(mn_y, cv_y)
y = self.sort_by(y)
return y, 0, None, 0
[docs]
class DE(Proposal):
"""A differential evolution sampler."""
def __init__(self, names, DE_skip=1000, **kwargs):
"""Set scaling probability and factor."""
super().__init__(names, **kwargs)
self.DE_skip = DE_skip
self.p = (0.5, 1.0)
self.pc = 0.7
self.name = "DE"
[docs]
def DE(self, x, chain=None, **kwargs):
"""Return actual sample using differential evolution of a given chain.
Args:
x: the current position
chain: all additional runtime data needed to propose a new point.
Returns:
y: the new position
status: 0 (do MH decision), 1 (accept) or -1 (reject)
logy: the log-likelihood at y position if already computed.
qxy: 0 for symmetric proposal distribution
"""
burn = self.DE_skip
chain_size = chain.get_size()
if chain_size <= burn + 5:
return x, -1, None, 0
mm = np.random.randint(burn, chain_size)
nn = np.random.randint(burn, chain_size)
while mm == nn:
nn = np.random.randint(burn, chain_size)
scale = self.get_scale(chain)
dx = self.get_sample(chain, mm) - self.get_sample(chain, nn)
x_new = x + dx * scale
x_new = self.sort_by(x_new)
return x_new, 0, None, 0
def get_scale(self, chain):
""" """
prob = np.random.rand()
## choose scale of the jump
if prob > self.p[0]:
scale = self.p[1]
else:
scale = (
np.random.rand()
* 2.4
/ np.sqrt(self.dim)
* np.sqrt(chain.anneal / chain.beta)
)
return scale
[docs]
def DE_all(self, x, chains=None, chain=None, **kwargs):
"""Return actual sample using differential evolution of all chains.
Args:
x: the current position
chains, chain: all additional runtime data needed to propose a new point.
Returns:
y: the new position
status: 0 (do MH decision), 1 (accept) or -1 (reject)
logy: the log-likelihood at y position if already computed.
qxy: 0 for symmetric proposal distribution
"""
burn = self.DE_skip
ci1, ci2 = np.random.randint(0, len(chains), size=2) ## mixing all chains
chain_size1 = chains[ci1].get_size()
chain_size2 = chains[ci2].get_size()
if chain_size1 <= burn + 5 or chain_size2 <= burn + 5:
return x, -1, None, 0
# we will use last 30% of each chain
imin1 = max(burn, int(chain_size1 * self.pc))
imin2 = max(burn, int(chain_size2 * self.pc))
mm = np.random.randint(imin1, chain_size1)
nn = np.random.randint(imin2, chain_size2)
scale = self.get_scale(chain)
dx = chains[ci1].get_sample(mm) - chains[ci2].get_sample(nn)
x_new = x + dx * scale
x_new = self.sort_by(x_new)
return x_new, 0, None, 0
[docs]
class AdaptiveKDE(Proposal):
"""Adaptive KDE proposal base on hypKDE library"""
def __init__(self, names, **kwargs):
"""Check that hyperkde is installed."""
if HyperKDE is None:
print("ImportError: hyperkde is not installed.")
super().__init__(names, **kwargs)
self.name = "adaptKDE"
[docs]
def kde_jump(self, x, chain, ims=0, **kwargs):
"""Return new sample draw from KDE distribution."""
if chain.kde is None:
return x, -1, None, 0
x_new, qxy = chain.kde.draw_from_random_hyp_kde(x)
return x_new, 0, None, qxy
[docs]
class Prior(Proposal):
"""A simple sampler based on priors."""
def __init__(self, names, prior, **kwargs):
"""Set prior range."""
super().__init__(names, **kwargs)
self.prior = np.array(prior)
self.name = "prior"
[docs]
def sample_prior(self, x, chain=None, **kwargs):
"""Return actual sample.
Args:
x: the current position
chain: all additional runtime data needed to propose a new point.
Returns:
y: the new position
status: 0 (do MH decision), 1 (accept) or -1 (reject)
logy: the log-likelihood at y position if already computed.
qxy: 0 for symmetric proposal distribution
"""
sz = np.shape(self.prior)[0]
y = np.random.random(sz)
for i in range(sz):
y[i] = y[i] * (self.prior[i, 1] - self.prior[i, 0]) + self.prior[i, 0]
y = self.sort_by(y)
return y, 0, None, 0
[docs]
class Gibbs(Proposal):
"""
Proposal with jump in a subset of parameters only
"""
def __init__(self, names, subset_names=[], cov=None, **kwargs):
"""
subset_names: list of parameters used for the jump
"""
super().__init__(names, **kwargs)
# array of indices for params we want to vary
self.ipars = [list(names).index(p) for p in subset_names]
if not self.ipars:
return
if cov is None:
self.cov = np.diag(np.ones(len(subset_names)) * 0.01**2)
assert len(subset_names) == self.cov.shape[0]
U, S, v = np.linalg.svd(self.cov)
self.U = U
self.S = S
self._scam = SCAM(subset_names, get_cov=self.get_cov)
self._slice = Slice(
subset_names, get_dir=self.get_direction, get_cov=self.get_cov
)
self._slice.max_cnt = 20
[docs]
def update_cov(self, sub_chain):
"""Updates covariance of a subset of chain"""
if not self.ipars:
return
sub_chain = sub_chain[:, self.ipars]
al = get_autocorr_len(sub_chain, burn=0.25)
istart = int(0.5 * len(sub_chain)) # skip 50% of the chain
x_tot = np.array(sub_chain)[istart :: int(al)]
cov = np.cov(x_tot.T)
U, S, v = np.linalg.svd(cov)
self.U = U
self.S = S
[docs]
def get_cov(self, chain=None):
"""Return covariance SVD decomposition."""
return self.cov, self.U, self.S
[docs]
def SCAM(self, x):
"""A Single Component Adaptive Metropolis sampler."""
y = x.copy()
if not self.ipars:
return y, 0, None, 0
x_new, status, logy, qxy = self._scam.SCAM(x[self.ipars])
y[self.ipars] = x_new
return y, status, logy, qxy
[docs]
def get_direction(self, x, U, S):
"""Design the direction in which we move."""
prob = np.random.rand()
if prob > self._slice.p1[0]:
scale = self._slice.p1[1]
elif prob > self._slice.p2[0]:
scale = self._slice.p2[1]
else:
scale = 1.0
d = len(x[self.ipars])
cd = scale * 2.38 / np.sqrt(d)
dx = np.dot(U, np.random.randn(d) * cd * np.sqrt(S))
if prob <= self._slice.jump[0]: # jump in the coordinate direction
ix = np.random.choice(d) # choosing the coordinate
del_x = (
self._slice.jump[1] * dx[ix]
) # choose 3 sigma jump in that coord. dir
dx = np.zeros(d)
dx[ix] = del_x
DX = np.zeros(len(x))
DX[self.ipars] = dx
return DX
[docs]
def slice(self, x, chain=None):
"""Return actual sample."""
if not self.ipars:
return x, 0, None, 0
x_new, status, logy, qxy = self._slice.slice(x, chain=chain)
return x_new, status, logy, qxy
[docs]
class HyperModelWhat(Proposal):
"""A proposal handling hyper models, where one parameter controls
model selection.
"""
def __init__(
self,
names,
nindex=0,
bins=[0, 0.5, 1],
model_names=[],
update_step=1000,
choose_model=np.random.random,
**kwargs,
):
"""Set scaling probability and factor.
nindex: index for models jumps (int)
bins: model selection range, [0, 0.5, 1] gives two models [0-0.5][0.5,1]
model_names: list of parameter names for each model
covs: initial covariance matrix for each model
update_step: when to update cov
"""
self.name = "HM"
assert nindex < len(names) - 1 and nindex >= 0
self.kk = nindex
assert bins[0] == 0 and bins[-1] == 1
assert np.all(bins[:-1] <= bins[1:]) # check sorted
self.bins = bins
self.nmodel = len(bins) - 1
assert len(model_names) == self.nmodel
self.adapt = update_step
self.choose_model = choose_model
# initialize GibbsSCAM fro each model
self.gibbsc = [Gibbs(names, subset_names=mnames) for mnames in model_names]
self.cnt = 1
self.scam_jump = kwargs.get("scam_jump", True)
self.slice_jump = kwargs.get("slice_jump", True)
[docs]
def check_update(self, chain, force=False):
"""Check if update of cov is needed, and do it."""
if self.cnt % self.adapt == 0 or force:
for j in range(len(self.gibbsc)):
if force:
self.gibbsc[j].update_cov(chain.chn)
else:
limits = [self.bins[j], self.bins[j + 1]] # limits of kappa
ind_sub = (chain.chn[:, self.kk] > limits[0]) & (
chain.chn[:, self.kk] <= limits[1]
)
if ind_sub.sum() > 100:
self.gibbsc[j].update_cov(chain.chn[ind_sub, :])
[docs]
def model_selection(self, kappa, jump=True):
"""Random model selection"""
## making jump in model: always uniform
if jump:
kappa = self.choose_model()
### which model?
# i = int(np.ceil(kappa*self.nmodel)) # model index
i = np.searchsorted(self.bins, kappa)
return i - 1, kappa
[docs]
def HMJump(self, x, chain=None, **kwargs):
"""Jump from one model to another (uniform).
Args:
x: the current position
chain: all additional runtime data needed to propose a new point.
Returns:
y: the new position
status: 0 (do MH decision), 1 (accept) or -1 (reject)
logy: the log-likelihood at y position if already computed.
qxy: 0 for symmetric proposal distribution
"""
self.cnt += 1
y = x.copy()
_, y[self.kk] = self.model_selection(x[self.kk], jump=True)
return y, 0, None, 0
[docs]
def HMSCAM(self, x, chain=None, **kwargs):
"""Return actual sample by jumping in one component direction.
Args:
x: the current position
chain: all additional runtime data needed to propose a new point.
Returns:
y: the new position
status: 0 (do MH decision), 1 (accept) or -1 (reject)
logy: the log-likelihood at y position if already computed.
qxy: 0 for symmetric proposal distribution
"""
self.cnt += 1
self.check_update(chain)
i, kappa = self.model_selection(x[self.kk], jump=self.scam_jump)
y, stat, logy, qxy = self.gibbsc[i].SCAM(x)
y[self.kk] = kappa
return y, stat, None, qxy
[docs]
def HMslice(self, x, chain=None, **kwargs):
"""Return actual sample by jumping in one component direction.
Args:
x: the current position
chain: all additional runtime data needed to propose a new point.
Returns:
y: the new position
status: 0 (do MH decision), 1 (accept) or -1 (reject)
logy: the log-likelihood at y position if already computed.
qxy: 0 for symmetric proposal distribution
"""
self.cnt += 1
self.check_update(chain)
i, kappa = self.model_selection(x[self.kk], jump=self.slice_jump)
y = x.copy()
y[self.kk] = kappa
y, stat, logy, qxy = self.gibbsc[i].slice(y, chain=chain)
return y, stat, None, qxy
[docs]
class CovProposal(Proposal):
"""
Propose jumps out of the gaussian distibution defined by the
covariance matrix and mode built from chain.
Besides the standard arguments of the :attr:`Proposal`: class
it accepts the following parameters:
:param int adapt: frequency (in steps) of the covariance update.
:param np.array ini_mode: initial mode of the proposal distribution.
Default: null vector.
:param np.array ini_cov: initial covariance of the proposal dist.
Default: diagonal 0.1 matrix.
:param bool get_qxy: return the ration of the proposal
probabilities. Default: True.
:return: propsed point, O, None, qxy
:rtype: list
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cnt = 0
self.adapt = kwargs.get('adapt', 1000)
self.dim = len(self.names)
self.mean = kwargs.get('ini_mode', np.zeros(self.dim))[:self.dim]
self.cov = np.diag(
np.ones(self.dim) * kwargs.get('ini_cov', .1)
)
self.get_qxy = kwargs.get('get_qxy', True)
self.lp = gauss_ll(self.mean, self.cov)
[docs]
def update_cov(self, chain):
"""
Update the covariance matrix
"""
if chain is None:
return
ims = chain.get_size()
if ims > self.adapt:
self.cov, _, _ = self.get_cov(chain)
istart = int(0.75 * len(chain.chn[0:ims, :])) # skip first 25% of the chain
# TODO: using the mask is likey not very effcient.
self.mean = np.mean(chain.chn[istart:ims, self.chn_mask(chain)], axis=0)
self.lp = gauss_ll(self.mean, self.cov)
[docs]
def get_proposal(self, y, chain=None, chains=None):
""" use update_cov=True in sampler options"""
self.cnt += 1
if (self.cnt % self.adapt == 0) or (self.cnt == 1):
self.update_cov(chain)
x = np.random.multivariate_normal(
self.mean,
self.cov
)
_qxy = self.lp(y) - self.lp(x) if self.get_qxy else 0
return x, 0, None, _qxy
[docs]
class ParSplitProposal(Proposal):
"""
Parameter splitting (and multi trial) proposal.
:params int ntrials: number of fast trials. Default 1000.
:param function slowp: proposition for slow pars. Signature
slowp(old_slow, chain=None, chains=None)
returns: new_point, <ignored>, <ignored>, proposal_prob
it will be accessible via the proposal
`slow` method call.
:param function fastp: proposition for fast pars. Signature
fastp(old_fast, chain=None, chains=None, **extra)
returns: new_point, <ignored>, <ignored>, <ignored>
where extra are the parameters returned by the
`slow_fun` in ChainMultiTrial.
it is wrapped by the proposal `fast` method
to generate `ntrial` fast points.
:param function fastlq: proposition probability for fast pars. Signature
fastlq(old_fast, chain=None, chains=None, **extra)
returns the proposal probability (float)
where extra are the parameters returned by the
`slow_fun` in ChainMultiTrial.
it will be accessible via the proposal
`lq` method call.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ntrials = kwargs.get('ntrials', 1000)
self.slow = kwargs['slowp']
self._fast = kwargs['fastp']
self.lq = kwargs['fastlq']
[docs]
def fast(self, x, chain=None, chains=None, **extra):
"""
Returns `ntrial` fast points.
"""
return [self._fast(x, chain=chain, chains=chains, **extra)[0]
for _ in range(self.ntrials)]