super(HamiltonianMCda, self).__init__(model=model, grad_log_pdf=grad_log_pdf,
discretize_time=discretize_time)
def sample(self, theta0, num_adapt, num_samples, Lambda, epsilon=None):
Method to return samples using Hamiltonian Monte Carlo
Parameters
----------
theta0: A 1d array type object or a row matrix of shape 1 X d
or d X 1.(Will be converted to d X 1)
Vector representing values of parameter theta, the starting
state in markov chain.
num_adapt: int
The number of interations to run the adaptation of epsilon,
after Madapt iterations adaptation will be stopped
num_samples: int
Number of samples to be generated
Lambda: int or float
Target trajectory length, epsilon * number of steps(L),
where L is the number of steps taken per HMC iteration,
and epsilon is step size for splitting time method.
epsilon: float , defaults to None
The step size for descrete time method
If None, then will be choosen suitably
Returns
-------
list: A list of numpy array type objects containing samples
Examples
---------
>>> from pgmpy.inference import HamiltonianMCda as HMCda
>>> from pgmpy.inference import JointGaussianDistribution as JGD, GradLogPDFGaussian as GLPG
>>> from pgmpy.inference import LeapFrog
>>> import numpy as np
>>> mean = np.array([1, 1])
>>> cov_matrix = np.array([[1, 0.7], [0.7, 3]])
>>> model = JGD(mean, cov_matrix)
>>> sampler = HMCda(model=model, grad_log_pdf=GLPG, discretize_time=LeapFrog)
>>> samples = sampler.sample(np.array([[1], [1]]), num_adapt=10000,
... num_samples = 10000, Lambda=2, epsilon=None)
>>> samples_array = np.concatenate(samples, axis=1)
>>> np.cov(samples_array)
array([[ 0.98432155, 0.66517394],
[ 0.66517394, 2.95449533]])
// TODO: Proper parameterization
if isinstance(theta0, (np.matrix, np.ndarray, list, tuple, set, frozenset)):
theta0 = np.array(theta0).flatten()
theta0 = np.reshape(theta0, (len(theta0), 1))
else:
raise TypeError("theta should be a 1d array type object")
if epsilon is None:
epsilon = self._find_reasonable_epsilon(theta0)
if num_adapt <= 1: // Return samples genrated using Simple HMC algorithm
return HamiltonianMC.sample(self, theta0, num_samples, epsilon)
mu = np.log(10.0 * epsilon) // freely chosen point, after each iteration xt(/theta) is shrunk towards it
// log(10 * epsilon) large values to save computation
epsilon_bar = 1.0
hbar = 0.0
gamma = 0.05 // free parameter that controls the amount of shrinkage towards mu
t0 = 10.0 // free parameter that stabilizes the initial iterations of the algorithm
kappa = 0.75
// See equation (6) section 3.2.1 for details
samples = [theta0.copy()]
theta_m = theta0.copy()
for i in range(1, num_samples):
// Genrating sample
// Resampling momentum
momentum0 = np.reshape(np.random.normal(0, 1, len(theta0)), theta0.shape)
// theta_m here will be the previous sampled value of theta
theta_bar, momentum_bar = theta_m.copy(), momentum0.copy()
// Number of steps L to run discretize time algorithm
lsteps = int(max(1, round(Lambda / epsilon, 0)))
for _ in range(lsteps):
// Taking L steps in time
theta_bar, momentum_bar = self.discretize_time(self.grad_log_pdf, self.model, theta_bar.copy(),
momentum_bar.copy(), epsilon).discretize_time()
acceptance_prob = self._acceptance_prob(theta_m.copy(), theta_bar.copy(), momentum0, momentum_bar)
// Metropolis acceptance probability
alpha = min(1, acceptance_prob)
// Accept or reject the new proposed value of theta, i.e theta_bar
if np.random.rand() < alpha:
theta_m = theta_bar.copy()