finished code, beginning writeup

This commit is contained in:
aj 2020-12-31 19:30:39 +00:00
parent 7f4775fc9c
commit 534b42f4ef
13 changed files with 2923 additions and 71 deletions

2
.gitignore vendored
View File

@ -4,7 +4,7 @@ __pycache__/
*$py.class
*.pdf
*~
*~*
# C extensions
*.so

File diff suppressed because one or more lines are too long

127
markov.py
View File

@ -1,11 +1,9 @@
from dataclasses import dataclass, field
from typing import List
import numpy as np
from numpy import log as ln
from maths import gaussian
class MarkovModel:
"""Describes a single training iteration including likelihoods and reestimation params"""
def __init__(self, states: list, observations: list = list(), state_transitions: list = list()):
self.observations = observations
@ -36,15 +34,29 @@ class MarkovModel:
return self.get_other_state_index(state_in - 1) + 1
def populate(self):
"""Calculate all likelihoods and both P(O|model)'s"""
self.populate_forward()
self.calculate_p_obs_forward()
self.populate_backward()
self.calculate_p_obs_backward()
self.populate_occupation()
return self
@property
def observation_likelihood(self):
"""abstraction for getting P(O|model) for future calculations (occupation/transition)"""
return self.p_obs_forward
####################################
# Likelihoods
####################################
def populate_forward(self):
"""Populate forward likelihoods for all states/times"""
for t, observation in enumerate(self.observations): # iterate through observations (time)
for state_index, state in enumerate(self.states):
for state_index, state in enumerate(self.states): # both states at each step
state_number = state_index + 1 # for easier reading (arrays 0-indexed, numbers start at 1)
@ -62,28 +74,29 @@ class MarkovModel:
self.forward[state_index, t] = (this_to_this + other_to_this) * gaussian(observation, state.mean, state.std_dev)
@property
def observation_likelihood(self):
"""abstraction for getting p(obs|model) for future calculations (occupation/transition)"""
return self.p_obs_forward
return self.forward
def calculate_p_obs_forward(self):
"""Calculate, store and return P(O|model) going forwards"""
sum = 0
for state_index, final_likelihood in enumerate(self.forward[:, -1]):
for state_index, final_likelihood in enumerate(self.forward[:, -1]):
sum += final_likelihood * self.state_transitions[state_index + 1, -1] # get exit prob from state transitions
self.p_obs_forward = sum
return sum
def populate_backward(self):
"""Populate backward likelihoods for all states/times"""
# initialise from exit probabilities
# initialise with exit probabilities
self.backward[:, -1] = self.state_transitions[1:len(self.states) + 1, -1]
# below iterator skips first observation (will be used when finalising P(O|model)) then reverses list [::-1]
for t, observation in list(enumerate(self.observations[1:]))[::-1]: # iterate backwards through observations (time)
# print(t, observation)
for state_index, state in enumerate(self.states):
for state_index in range(len(self.states)):
state_number = state_index + 1 # for easier reading (arrays 0-indexed, numbers start at 1)
@ -95,30 +108,45 @@ class MarkovModel:
# observation for transitions from the other state
other_state_gaussian = gaussian(observation, self.states[other_index].mean, self.states[other_index].std_dev)
# beta * a * b
this_from_this = self.backward[state_index, t + 1] * self.state_transitions[state_number, state_number] * this_state_gaussian
other_from_this = self.backward[other_index, t + 1] * self.state_transitions[other_number, state_number] * other_state_gaussian
# a * b * beta
this_from_this = self.state_transitions[state_number, state_number] * this_state_gaussian * self.backward[state_index, t + 1]
other_from_this = self.state_transitions[state_number, other_number] * other_state_gaussian * self.backward[other_index, t + 1]
self.backward[state_index, t] = (this_from_this + other_from_this)
self.backward[state_index, t] = this_from_this + other_from_this
return self.backward
def calculate_p_obs_backward(self):
"""Calculate, store and return P(O|model) going backwards"""
sum = 0
for state_index, initial_likelihood in enumerate(self.backward[:, 0]):
# pi * b * beta
sum += self.state_transitions[0, state_index + 1] * gaussian(self.observations[0], self.states[state_index].mean, self.states[state_index].std_dev) * initial_likelihood
pi = self.state_transitions[0, state_index + 1]
b = gaussian(self.observations[0], self.states[state_index].mean, self.states[state_index].std_dev)
beta = initial_likelihood
sum += pi * b * beta
self.p_obs_backward = sum
return sum
def populate_occupation(self):
for t, observation in enumerate(self.observations): # iterate through observations (time)
for state_index, state in enumerate(self.states):
"""Populate occupation likelihoods for all states/times"""
for t in range(len(self.observations)): # iterate through observations (time)
for state_index in range(len(self.states)):
forward_backward = self.forward[state_index, t] * self.backward[state_index, t]
self.occupation[state_index, t] = forward_backward / self.observation_likelihood
return self.occupation
def transition_likelihood(self, from_index, to_index, t):
"""Get specific transition likelihood given state index either side and the timestep"""
#from_index = i, from equations in the notes
#to_index = j, from equations in the notes
if t == 0:
print("no transition likelihood for t == 0")
@ -129,31 +157,62 @@ class MarkovModel:
return (forward * transition * emission * backward) / self.observation_likelihood
def baum_welch_state_transitions(self):
####################################
# Baum-Welch Re-estimations
####################################
new_transitions = np.zeros((len(self.states), len(self.states)))
def reestimated_state_transitions(self):
"""Re-estimate state transitions using Baum-Welch training (Not on mark scheme)"""
length = len(self.states)
new_transitions = np.zeros((length, length))
# i
for from_index, from_state in enumerate(self.states):
for from_index in range(length):
# j
for to_index, to_state in enumerate(self.states):
transition_sum = 0
for t in range(1, len(self.observations)):
transition_sum += self.transition_likelihood(from_index, to_index, t)
for to_index in range(length):
occupation_sum = 0
for t in range(0, len(self.observations)):
occupation_sum = self.occupation[to_index, t]
# numerator iterates from t = 1 (when 0 indexing, 2 in the notes)
transition_sum = sum(self.transition_likelihood(from_index, to_index, t) for t in range(1, len(self.observations)))
occupation_sum = sum(self.occupation[from_index, t] for t in range(0, len(self.observations)))
new_transitions[from_index, to_index] = transition_sum / occupation_sum
return new_transitions
def reestimated_state_mean(self, state_index):
"""Re-estimate the gaussian mean for a state using occupation likelihoods, baum-welch"""
numerator = 0 # sum over observations( occupation * observation )
denominator = 0 # sum over observations( occupation )
for t, observation in enumerate(self.observations): # iterate through observations (time)
occupation_likelihood = self.occupation[state_index, t]
numerator += occupation_likelihood * observation
denominator += occupation_likelihood
return numerator / denominator
def reestimated_mean(self):
"""Get all re-estimated gaussian means using occupation likelihoods"""
return [self.reestimated_state_mean(idx) for idx in range(len(self.states))]
# child object to replace normal prob/likeli operations with log prob operations (normal prob for debugging)
class LogMarkovModel(MarkovModel):
def reestimated_state_variance(self, state_index):
"""Re-estimate the gaussian variance for a state using occupation likelihoods, baum-welch"""
numerator = 0 # sum over observations( occupation * (observation - mean)^2 )
denominator = 0 # sum over observations( occupation )
for t, observation in enumerate(self.observations): # iterate through observations (time)
def log_state_transitions(self):
self.state_transitions = ln(self.state_transitions)
occupation_likelihood = self.occupation[state_index, t]
numerator += occupation_likelihood * pow(observation - self.states[state_index].mean, 2)
denominator += occupation_likelihood
return numerator / denominator
def reestimated_variance(self):
"""Get all re-estimated gaussian variances using occupation likelihoods"""
return [self.reestimated_state_variance(idx) for idx in range(len(self.states))]

10
markovlog.py Normal file
View File

@ -0,0 +1,10 @@
from numpy import log as ln
from maths import gaussian
from markov import MarkovModel
# child object to replace normal prob/likeli operations with log prob operations (normal prob for debugging)
class LogMarkovModel(MarkovModel):
def log_state_transitions(self):
self.state_transitions = ln(self.state_transitions)

255
notebook.py Normal file
View File

@ -0,0 +1,255 @@
# %%
#IMPORTS AND COMMON VARIABLES
import matplotlib.pyplot as plt
from matplotlib import cm
import numpy as np
from math import sqrt
from constants import *
from maths import gaussian
from markov import MarkovModel
from markovlog import LogMarkovModel
x = np.linspace(-4, 8, 120) # x values for figures
x_label = "Observation Space"
y_label = "Probability Density"
# %% [markdown]
# State Probability Functions (1)
# ===================
# %%
state_1_y = [gaussian(i, state1.mean, state1.std_dev) for i in x]
state_2_y = [gaussian(i, state2.mean, state2.std_dev) for i in x]
plt.plot(x, state_1_y, c='r', label="State 1")
plt.plot(x, state_2_y, c='b', label="State 2")
plt.legend()
plt.title("State Probability Density Functions")
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.grid(linestyle="--")
plt.show()
# %% [markdown]
# Output Probability Densities (2)
# ==========
# %%
for obs in observations:
print(f'{obs} -> State 1: {gaussian(obs, state1.mean, state1.std_dev)}, State 2: {gaussian(obs, state2.mean, state2.std_dev)}')
# %%
state_1_y = [gaussian(i, state1.mean, state1.std_dev) for i in x]
state_2_y = [gaussian(i, state2.mean, state2.std_dev) for i in x]
plt.plot(x, state_1_y, c='r', label="State 1")
plt.plot(x, state_2_y, c='b', label="State 2")
plt.legend()
plt.title("State Probability Density Functions With Observations")
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.grid(linestyle="--")
state1_pd = [gaussian(i, state1.mean, state1.std_dev) for i in observations]
state2_pd = [gaussian(i, state2.mean, state2.std_dev) for i in observations]
#############################################
# Observation Marks
#############################################
config = {
"s": 65,
"marker": 'x'
}
plt.scatter(observations, state1_pd, color=(0.5, 0, 0), **config)
plt.scatter(observations, state2_pd, color=(0, 0, 0.5), **config)
plt.show()
# %% [markdown]
# # Forward Procedure (3)
# %%
model = MarkovModel(states=[state1, state2], observations=observations, state_transitions=state_transition)
model.populate_forward()
print(model.forward)
forward = model.forward
model.calculate_p_obs_forward()
# %% [markdown]
# # Backward Procedure (4)
# %%
model = MarkovModel(states=[state1, state2], observations=observations, state_transitions=state_transition)
model.populate_backward()
print(model.backward)
backward = model.backward
model.calculate_p_obs_backward()
# %% [markdown]
# # Compare Forward/Backward Final
# %%
model = MarkovModel(states=[state1, state2], observations=observations, state_transitions=state_transition)
model.populate_forward()
model.populate_backward()
print("forward:", model.calculate_p_obs_forward())
print("backward:", model.calculate_p_obs_backward())
print("diff: ", model.p_obs_forward - model.p_obs_backward)
# %% [markdown]
# # Occupation Likelihoods (5)
# %%
model = MarkovModel(states=[state1, state2], observations=observations, state_transitions=state_transition).populate()
occupation = model.occupation
print(model.occupation)
# %% [markdown]
# # Re-estimate Mean & Variance (6)
# %%
model = MarkovModel(states=[state1, state2], observations=observations, state_transitions=state_transition).populate()
print("mean: ", [state1.mean, state2.mean])
print("variance: ", [state1.variance, state2.variance])
print()
print("mean: ", model.reestimated_mean())
print("variance: ", model.reestimated_variance())
# %% [markdown]
# New PDFs (7)
# ===================
# %%
model = MarkovModel(states=[state1, state2], observations=observations, state_transitions=state_transition).populate()
new_mean = model.reestimated_mean()
new_var = model.reestimated_variance()
new_std_dev = [sqrt(x) for x in new_var]
state_1_y = [gaussian(i, new_mean[0], new_std_dev[0]) for i in x]
state_2_y = [gaussian(i, new_mean[1], new_std_dev[1]) for i in x]
plt.plot(x, state_1_y, c='r', label="State 1")
plt.plot(x, state_2_y, c='b', label="State 2")
plt.legend()
plt.title("Re-estimated Probability Density Functions")
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.grid(linestyle="--")
plt.show()
# %% [markdown]
# # Compare PDFs (7)
# %%
model = MarkovModel(states=[state1, state2], observations=observations, state_transitions=state_transition).populate()
new_mean = model.reestimated_mean()
new_var = model.reestimated_variance()
new_std_dev = [sqrt(x) for x in new_var]
#######################################
# Original
#######################################
state_1_y = [gaussian(i, state1.mean, state1.std_dev) for i in x]
state_2_y = [gaussian(i, state2.mean, state2.std_dev) for i in x]
plt.plot(x, state_1_y, '--', c='r', label="State 1", linewidth=1.0)
plt.plot(x, state_2_y, '--', c='b', label="State 2", linewidth=1.0)
#######################################
# Re-Estimated
#######################################
state_1_new_y = [gaussian(i, new_mean[0], new_std_dev[0]) for i in x]
state_2_new_y = [gaussian(i, new_mean[1], new_std_dev[1]) for i in x]
plt.plot(x, state_1_new_y, c='r', label="New State 1")
plt.plot(x, state_2_new_y, c='b', label="New State 2")
plt.legend()
plt.title("Re-estimated Probability Density Functions")
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.grid(linestyle="--")
plt.show()
# %% [markdown]
# # Multiple Iterations
# %%
iterations = 5
mean = [state1.mean, state2.mean]
var = [state1.variance, state2.variance]
plt.plot(x, [gaussian(i, mean[0], sqrt(var[0])) for i in x], '--', c='r', linewidth=1.0)
plt.plot(x, [gaussian(i, mean[1], sqrt(var[1])) for i in x], '--', c='b', linewidth=1.0)
for i in range(iterations):
model = MarkovModel(states=[State(mean[0], var[0], state1.entry, state1.exit), State(mean[1], var[1], state2.entry, state2.exit)],
observations=observations,
state_transitions=state_transition)
model.populate()
mean = model.reestimated_mean()
var = model.reestimated_variance()
print(f"mean ({i}): ", mean)
print(f"var ({i}): ", var)
print()
state_1_y = [gaussian(i, mean[0], sqrt(var[0])) for i in x]
state_2_y = [gaussian(i, mean[1], sqrt(var[1])) for i in x]
style = '--'
linewidth = 1.0
if i == iterations - 1:
style = '-'
linewidth = 2.0
plt.plot(x, state_1_y, style, c='r', linewidth=linewidth)
plt.plot(x, state_2_y, style, c='b', linewidth=linewidth)
plt.title("Probability Density Function Iterations")
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.grid(linestyle="--")
plt.show()
# %% [markdown]
# # Baum-Welch State Transition Re-estimations
# %%
model = MarkovModel(states=[state1, state2], observations=observations, state_transitions=state_transition).populate()
print(a_matrix)
model.reestimated_state_transitions()
# %%

BIN
report/StateTopology.odg Normal file

Binary file not shown.

BIN
report/StateTopology.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 145 KiB

View File

@ -0,0 +1,11 @@
@misc{towards-data-science-markov-intro,
author = {Rocca, Joseph},
howpublished = {Online},
month = feb,
organization = {Towards Data Science},
title = {Introduction to Markov chains},
url = {https://towardsdatascience.com/brief-introduction-to-markov-chains-2c8cab9c98ab},
urldate = {2020-12-31},
year = {2019}
}

File diff suppressed because it is too large Load Diff

BIN
report/res/pdfs-w-obs.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

BIN
report/res/pdfs.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

BIN
report/res/re-est-pdfs.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

View File

@ -11,7 +11,8 @@
"\n",
"from constants import *\n",
"from maths import gaussian\n",
"from markov import MarkovModel, LogMarkovModel"
"from markov import MarkovModel\n",
"from markovlog import LogMarkovModel"
]
},
{
@ -344,7 +345,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.4-final"
"version": "3.8.6-final"
}
},
"nbformat": 4,