238 lines
9.8 KiB
Python
238 lines
9.8 KiB
Python
import numpy as np
|
|
|
|
from maths import gaussian
|
|
|
|
class MarkovModel:
|
|
"""Describes a single training iteration including likelihoods and reestimation params"""
|
|
|
|
def __init__(self, states: list, observations: list = list(), state_transitions: list = list()):
|
|
self.observations = observations
|
|
self.state_transitions = state_transitions
|
|
# ^ use state number not state index, is padded by entry and exit probs
|
|
|
|
self.states = states
|
|
|
|
self.forward = np.zeros((len(states), len(observations)))
|
|
self.p_obs_forward = 0
|
|
|
|
self.backward = np.zeros((len(states), len(observations)))
|
|
self.p_obs_backward = 0
|
|
|
|
self.occupation = np.zeros((len(states), len(observations)))
|
|
|
|
def get_other_state_index(self, state_in):
|
|
"""For when state changes, get other index for retrieving state transitions (FOR 0 INDEXING)"""
|
|
if state_in == 0:
|
|
return 1
|
|
elif state_in == 1:
|
|
return 0
|
|
else:
|
|
print(f"invalid state index provided, ({state_in})")
|
|
|
|
def get_other_state_number(self, state_in):
|
|
"""For when state changes, get other number for retrieving state transitions (FOR 1 INDEXING)"""
|
|
return self.get_other_state_index(state_in - 1) + 1
|
|
|
|
def populate(self):
|
|
"""Calculate all likelihoods and both P(O|model)'s"""
|
|
|
|
self.populate_forward()
|
|
self.calculate_p_obs_forward()
|
|
self.populate_backward()
|
|
self.calculate_p_obs_backward()
|
|
self.populate_occupation()
|
|
return self
|
|
|
|
@property
|
|
def observation_likelihood(self):
|
|
"""abstraction for getting P(O|model) for future calculations (occupation/transition)"""
|
|
return self.p_obs_forward
|
|
|
|
####################################
|
|
# Likelihoods
|
|
####################################
|
|
|
|
def populate_forward(self):
|
|
"""Populate forward likelihoods for all states/times"""
|
|
|
|
for t, observation in enumerate(self.observations):
|
|
# iterate through observations (time)
|
|
|
|
for state_index, state in enumerate(self.states):
|
|
# both states at each step
|
|
|
|
state_number = state_index + 1
|
|
# ^ for easier reading (arrays 0-indexed, _number 1-indexed)
|
|
|
|
if t == 0: # calcualte initial, 0 = first row = initial
|
|
self.forward[state_index, t] = self.state_transitions[0, state_number] * gaussian(observation, state.mean, state.std_dev)
|
|
else:
|
|
# each state for each time has two paths leading to it,
|
|
# the same state (this) and the other state (other)
|
|
|
|
other_index = self.get_other_state_index(state_index)
|
|
other_number = other_index + 1 # for 1 indexing
|
|
|
|
# previous value * prob of changing from previous state to current
|
|
this_to_this = self.forward[state_index, t - 1] * self.state_transitions[state_number, state_number]
|
|
other_to_this = self.forward[other_index, t - 1] * self.state_transitions[other_number, state_number]
|
|
|
|
self.forward[state_index, t] = (this_to_this + other_to_this) * gaussian(observation, state.mean, state.std_dev)
|
|
|
|
return self.forward
|
|
|
|
def calculate_p_obs_forward(self):
|
|
"""Calculate, store and return P(O|model) going forwards"""
|
|
|
|
sum = 0
|
|
for state_index, final_likelihood in enumerate(self.forward[:, -1]):
|
|
sum += final_likelihood * self.state_transitions[state_index + 1, -1]
|
|
# get exit prob from state transitions ^
|
|
|
|
self.p_obs_forward = sum
|
|
return sum
|
|
|
|
def populate_backward(self):
|
|
"""Populate backward likelihoods for all states/times"""
|
|
|
|
# initialise with exit probabilities
|
|
self.backward[:, -1] = self.state_transitions[1:len(self.states) + 1, -1]
|
|
|
|
# below iterator skips first observation
|
|
# (will be used when finalising P(O|model))
|
|
# iterate backwards through observations (time) [::-1] <- reverses list
|
|
for t, observation in list(enumerate(self.observations[1:]))[::-1]:
|
|
|
|
# print(t, observation)
|
|
for state_index in range(len(self.states)):
|
|
|
|
state_number = state_index + 1
|
|
# ^ for easier reading (arrays 0-indexed, _number 1-indexed)
|
|
|
|
other_index = self.get_other_state_index(state_index)
|
|
other_number = other_index + 1 # for 1 indexing
|
|
|
|
# observation for transitions from the same state
|
|
this_state_gaussian = gaussian(observation, self.states[state_index].mean, self.states[state_index].std_dev)
|
|
# observation for transitions from the other state
|
|
other_state_gaussian = gaussian(observation, self.states[other_index].mean, self.states[other_index].std_dev)
|
|
|
|
# a * b * beta
|
|
this_from_this = self.state_transitions[state_number, state_number] * this_state_gaussian * self.backward[state_index, t + 1]
|
|
other_from_this = self.state_transitions[state_number, other_number] * other_state_gaussian * self.backward[other_index, t + 1]
|
|
|
|
self.backward[state_index, t] = this_from_this + other_from_this
|
|
|
|
return self.backward
|
|
|
|
def calculate_p_obs_backward(self):
|
|
"""Calculate, store and return P(O|model) going backwards"""
|
|
|
|
sum = 0
|
|
for state_index, initial_likelihood in enumerate(self.backward[:, 0]):
|
|
|
|
pi = self.state_transitions[0, state_index + 1]
|
|
b = gaussian(self.observations[0],
|
|
self.states[state_index].mean,
|
|
self.states[state_index].std_dev)
|
|
beta = initial_likelihood
|
|
|
|
sum += pi * b * beta
|
|
|
|
self.p_obs_backward = sum
|
|
return sum
|
|
|
|
def populate_occupation(self):
|
|
"""Populate occupation likelihoods for all states/times"""
|
|
|
|
for t in range(len(self.observations)):
|
|
# iterate through observations (time)
|
|
|
|
for state_index in range(len(self.states)):
|
|
|
|
forward_backward = self.forward[state_index, t] * self.backward[state_index, t]
|
|
self.occupation[state_index, t] = forward_backward / self.observation_likelihood
|
|
|
|
return self.occupation
|
|
|
|
def transition_likelihood(self, from_index, to_index, t):
|
|
"""Get specific transition likelihood given state index either side and the timestep"""
|
|
#from_index = i, from equations in the notes
|
|
#to_index = j, from equations in the notes
|
|
|
|
if t == 0:
|
|
print("no transition likelihood for t == 0")
|
|
|
|
forward = self.forward[from_index, t - 1]
|
|
transition = self.state_transitions[from_index + 1, to_index + 1]
|
|
emission = gaussian(self.observations[t],
|
|
self.states[to_index].mean,
|
|
self.states[to_index].std_dev)
|
|
backward = self.backward[to_index, t]
|
|
|
|
return (forward * transition * emission * backward) / self.observation_likelihood
|
|
|
|
####################################
|
|
# Baum-Welch Re-estimations
|
|
####################################
|
|
|
|
def reestimated_state_transitions(self):
|
|
"""Re-estimate state transitions using Baum-Welch training (Not on mark scheme)"""
|
|
|
|
length = len(self.states)
|
|
new_transitions = np.zeros((length, length))
|
|
|
|
# i
|
|
for from_index in range(length):
|
|
# j
|
|
for to_index in range(length):
|
|
|
|
# numerator iterates from t = 1 (when 0 indexing, 2 in the notes)
|
|
transition_sum = sum(self.transition_likelihood(from_index, to_index, t)
|
|
for t in range(1, len(self.observations)))
|
|
occupation_sum = sum(self.occupation[from_index, t]
|
|
for t in range(0, len(self.observations)))
|
|
|
|
new_transitions[from_index, to_index] = transition_sum / occupation_sum
|
|
|
|
return new_transitions
|
|
|
|
def reestimated_state_mean(self, state_index):
|
|
"""Re-estimate the gaussian mean for a state using occupation likelihoods, baum-welch"""
|
|
|
|
numerator = 0 # sum over observations( occupation * observation )
|
|
denominator = 0 # sum over observations( occupation )
|
|
for t, observation in enumerate(self.observations):
|
|
# iterate through observations (time)
|
|
|
|
occupation_likelihood = self.occupation[state_index, t]
|
|
|
|
numerator += occupation_likelihood * observation
|
|
denominator += occupation_likelihood
|
|
|
|
return numerator / denominator
|
|
|
|
def reestimated_mean(self):
|
|
"""Get all re-estimated gaussian means using occupation likelihoods"""
|
|
return [self.reestimated_state_mean(idx) for idx in range(len(self.states))]
|
|
|
|
|
|
def reestimated_state_variance(self, state_index):
|
|
"""Re-estimate the gaussian variance for a state using occupation likelihoods, baum-welch"""
|
|
|
|
numerator = 0 # sum over observations( occupation * (observation - mean)^2 )
|
|
denominator = 0 # sum over observations( occupation )
|
|
for t, observation in enumerate(self.observations):
|
|
# iterate through observations (time)
|
|
|
|
occupation_likelihood = self.occupation[state_index, t]
|
|
|
|
numerator += occupation_likelihood * pow(observation - self.states[state_index].mean, 2)
|
|
denominator += occupation_likelihood
|
|
|
|
return numerator / denominator
|
|
|
|
def reestimated_variance(self):
|
|
"""Get all re-estimated gaussian variances using occupation likelihoods"""
|
|
return [self.reestimated_state_variance(idx) for idx in range(len(self.states))]
|