Write a program to evaluate the Bayesian belief network for fish in Example 3 of the textbook, using the same information on $P(a_j), P(b_j), P(x_l|a_i), P(x_l|b_j), P(c_k|x_l), and P(d_m|x_l)$.
Test your program on the cases given in that same Example in the textbook. Then, apply your program to the following cases – present the results in your report, and state any assumptions you make. You should also try/report other 'queries' (see examples in the figures below).
Hint: to solve this problem, you must remember what is $P(A, B | C)$ in terms of $P(A, B)$ and $P(C)$ – then, you must also remember how to find, say for the same example, $P(A,B)$ from $P(A,B,C)$ and $P(C)$ also from $P(A,B,C)$.
Figure from Pattern Classification Book by David G. Stork, Peter E. Hart, and Richard O. Duda
import numpy as np
from IPython.display import display, Math
According to the figure above, we have to set up our Bayesian belief network:
A = ("winter", "spring", "summer", "autumn")
B = ("north_atlantic", "south_atlantic")
X = ("salmon", "sea_bass")
C = ("light", "medium", "dark")
D = ("wide", "thin")
p_a = [.25, .25, .25, .25]
p_b = [.6, .4]
p_x_ab = [
[
[.5, .5],
[.7, .3]
],
[
[.6, .4],
[.8, .2]
],
[
[.4, .6],
[.1, .9]
],
[
[.2, .8],
[.3, .7]
]
]
p_c_x = [
[.6, .2, .2],
[.2, .3, .5]
]
p_d_x = [
[.3, .7],
[.6, .4]
]
I implemented Bayesian belief network using graph data strcture. So there are two classes here:
Node
BeliefNet
class Node():
def __init__(self, var, names, probs, parents=None):
self.var = var
self.names = names
self.vars = [self.var + str(i+1) for i in range(len(self.names))]
self.probs = probs
self.parents = parents
self.current = None
self.order = -1
where:
So, for instance, $$ P(a,b|c) = \frac{P(a,b,c)}{P(c)} $$ $$ P(x|c,b) = \frac{P(a,b,x,c,d)}{P(c,b)} $$
We assume that all random variables are i.i.d. (independent and identically distributed). Simply put, they are statistical indepedent. Thus, we can calculate a joint probability with a product of all probabilities. If a variable is influenced by other variables, it is a conditional probability. Otherwise, it is just a simple probability.
For instance, based on the belief net, we get: $$ P(a,b,x,c,d) = P(a) \cdot P(b) \cdot P(x|a,b) \cdot P(c|x) \cdot P(d|x) $$
Let $\Theta$ be all variables in a belief net. So, $P(\Theta)$ is the full joint distribution over all the variables. We can compute a probability distribution of a specific variable $x$ by summing the full joint distribution over all variables other than $x$ as follows: $$ P(x) = \sum_{\Theta - \{x\}}P(\Theta) $$
For example, $$ P(a,b,x,c) = \sum_{d}P(a,b,x,c,d) $$ $$ P(a,b,x) = \sum_{c,d}P(a,b,x,c,d) $$ $$ P(a,b) = \sum_{x,c,d}P(a,b,x,c,d) $$ $$ P(a) = \sum_{b,x,c,d} P(a,b,x,c,d) $$
$$ P(d) = \sum_{a,b,x,c} P(a,b,x,c,d) $$class BeliefNet():
def __init__(self):
self.nodes = []
self.order = 0
self.orders = {}
self.var_names = {}
self.node_var_nums = {}
def add_node(self, node):
node.order = self.order
self.nodes.append(node)
self.orders[node.var] = node.order
self.node_var_nums[node.var] = len(node.names)
self.order += 1
for n, v in zip(node.names, node.vars):
if n in self.var_names:
raise KeyError("Duplicate variable names")
self.var_names[n] = v
"""
PROBABILITY FUNCTIONS
"""
def joint_prob(self, query):
""" P(a,b,x,c,d) """
if isinstance(query, list):
V = self.query_to_vars(query)
else:
V, _ = self.extract_query(query)
V = self.sort_vars(V)
cp = 1
for i, v in enumerate(V):
cur = self.find_node(v)
idx = cur.vars.index(V[i])
if cur.parents is None:
prob = cur.probs[idx]
else:
parent_indices = []
prob = cur.probs
for parent in cur.parents:
parent_indices.append(parent.current)
prob = prob[parent.current]
prob = prob[idx]
cur.current = idx
cp *= prob
return cp
def marginal_prob(self, query):
""" P(d) = sum_{a,b,x,c} P(a,b,x,c,d) """
if isinstance(query, list):
V = self.query_to_vars(query)
else:
V, _ = self.extract_query(query)
VARS = ["a", "b", "x", "c", "d"]
for v in V:
VARS.remove(v[0])
combinations = []
for v in VARS:
combinations.append(np.arange(1, self.node_var_nums[v]+1, 1))
prob = 0
if combinations:
comb_nums = np.array(
np.meshgrid(*combinations)
).T.reshape(-1, len(combinations))
for comb in comb_nums:
in_vars = V.copy()
for v, c in zip(VARS, comb):
in_vars.append(v + str(c))
in_vars = self.sort_vars(in_vars)
prob += self.joint_prob(in_vars)
else:
prob = self.joint_prob(V)
return prob
def cond_prob(self, query):
""" P(x|e) = P(x,e) / P(e) """
of, given = self.extract_query(query)
V = of + given
numerator = self.marginal_prob(V)
denominator = self.marginal_prob(given)
prob = numerator / denominator
return prob
def prob(self, query):
if "|" in query: # conditional prob
return self.cond_prob(query)
else: # joint prob and marginal prob
return self.marginal_prob(query)
"""
UTILITY FUNCTIONS
"""
def extract_query(self, query):
query = query.replace(" ", "")
split_queries = query.split("|")
of = split_queries[0]
given = ""
if len(split_queries) == 2:
given = split_queries[1]
of = of.split(",")
given = given.split(",")
if not isinstance(of, list):
of = [of]
of = self.query_to_vars(of)
given = self.query_to_vars(given)
return of, given
def sort_vars(self, V):
sorted_V = [''] * len(self.nodes)
for v in V:
var_name = v[0]
sorted_V[self.orders[var_name]] = v
return sorted_V
def query_to_vars(self, Q):
V = []
for q in Q:
if q in self.var_names:
for node in self.nodes:
try:
V.append(node.var + str(node.names.index(q)+1))
except ValueError:
pass
else:
V.append(q)
return V
def find_node(self, v):
for node in self.nodes:
if v in node.vars or v in node.names:
return node
return None
Now, create an instance of our Bayesian belief net.
n1 = Node("a", A, p_a)
n2 = Node("b", B, p_b)
n3 = Node("x", X, p_x_ab, [n1, n2])
n4 = Node("c", C, p_c_x, [n3])
n5 = Node("d", D, p_d_x, [n3])
net = BeliefNet()
net.add_node(n1)
net.add_node(n2)
net.add_node(n3)
net.add_node(n4)
net.add_node(n5)
print("P(x1,b2,c1)", net.marginal_prob("salmon,south_atlantic,light"))
print("P(x1,b2,c1)", net.marginal_prob("x1,b2,c1"))
print("P(x1,b2,c1)", net.marginal_prob(["x1", "b2", "c1"]))
print("P(c1,b2)", net.marginal_prob(["c1", "b2"]))
print("P(light,south_atlantic)", net.marginal_prob(["light","south_atlantic"]))
Q = ["sea_bass", "thin", "north_atlantic", "summer", "dark"]
V = net.query_to_vars(Q)
print(Q, "->", V)
print("P(['a3', 'b1', 'x2', 'c3', 'd1'])",
net.joint_prob(['a3', 'b1', 'x2', 'c3', 'd1']))
print("P(a3,b1,x2,c3,d1)", net.joint_prob("a3,b1,x2,c3,d1"))
print("P(a3,b1,x2,c3,d1)", net.marginal_prob("a3,b1,x2,c3,d1"))
print("P(salmon|light,south_atlantic)", net.cond_prob("salmon|light,south_atlantic"))
print("P(x1|c1,b2)", net.cond_prob("x1|c1,b2"))
print("P(x2|c1,b2)", net.cond_prob("x2|c1,b2"))
P(x1,b2,c1) 0.11399999999999999 P(x1,b2,c1) 0.11399999999999999 P(x1,b2,c1) 0.11399999999999999 P(c1,b2) 0.156 P(light,south_atlantic) 0.156 ['sea_bass', 'thin', 'north_atlantic', 'summer', 'dark'] -> ['x2', 'd2', 'b1', 'a3', 'c3'] P(['a3', 'b1', 'x2', 'c3', 'd1']) 0.027 P(a3,b1,x2,c3,d1) 0.027 P(a3,b1,x2,c3,d1) 0.027 P(salmon|light,south_atlantic) 0.7307692307692307 P(x1|c1,b2) 0.7307692307692307 P(x2|c1,b2) 0.2692307692307693
We can achieve the same results as above by using a wrapper function named prob
which acts as a gateway for other functions. It redirects its input to the corresponding function. If our query is for conditional probability, it will be forwarded to the cond_prob
function. Otherwise, it will be sent to the marginal_prob
function (It can serve for both joint probability and marginal probability).
print("P(x1,b2,c1)", net.prob("salmon,south_atlantic,light"))
print("P(x1,b2,c1)", net.prob("x1,b2,c1"))
print("P(x1,b2,c1)", net.prob(["x1", "b2", "c1"]))
print("P(['a3', 'b1', 'x2', 'c3', 'd1'])", net.prob(['a3', 'b1', 'x2', 'c3', 'd1']))
print("P(a3,b1,x2,c3,d1)", net.prob("a3,b1,x2,c3,d1"))
print("P(a3,b1,x2,c3,d1)", net.prob("a3,b1,x2,c3,d1"))
print("P(salmon|light,south_atlantic)", net.prob("salmon|light,south_atlantic"))
print("P(x1|c1,b2)", net.prob("x1|c1,b2"))
print("P(x2|c1,b2)", net.prob("x2|c1,b2"))
P(x1,b2,c1) 0.11399999999999999 P(x1,b2,c1) 0.11399999999999999 P(x1,b2,c1) 0.11399999999999999 P(['a3', 'b1', 'x2', 'c3', 'd1']) 0.027 P(a3,b1,x2,c3,d1) 0.027 P(a3,b1,x2,c3,d1) 0.027 P(salmon|light,south_atlantic) 0.7307692307692307 P(x1|c1,b2) 0.7307692307692307 P(x2|c1,b2) 0.2692307692307693
It uses net.prob
under the hood.
def P(expr):
ans = net.prob(expr)
text = expr.replace("_", "\_")
display(Math(f'P({text}) = {ans:.4f}'))
P("summer,north_atlantic,sea_bass,dark,thin")
P("x1,b2,c1")
P("c1,b2,x2")
P("south_atlantic,light")
P("x1|b2,c1")
P("x2|b2,c1")
P("sea_bass|light,thin,north_atlantic,summer")
P("x2|c1,d2,b1,a3")
P("winter|north_atlantic,light,thin")
P("a1|north_atlantic,light,thin")
P("a1|d2,c1,b1")
P("winter|d2,c1,b1")
P("spring|north_atlantic,light,thin")
P("a2|north_atlantic,light,thin")
P("a2|d2,c1,b1")
P("spring|d2,c1,b1")
P("summer|north_atlantic,light,thin")
P("a3|north_atlantic,light,thin")
P("a3|d2,c1,b1")
P("summer|d2,c1,b1")
P("autumn|north_atlantic,light,thin")
P("a4|north_atlantic,light,thin")
P("a4|d2,c1,b1")
P("autumn|d2,c1,b1")
ans = net.prob("a1|d2,c1,b1") + net.prob("a2|d2,c1,b1") + \
net.prob("a3|d2,c1,b1") + net.prob("a4|d2,c1,b1")
display(Math("\sum_{i=1}^{4}P(a_i | d_2, c_1, b_1) =" + f"{ans}"))
P("north_atlantic|medium,thin,summer")
P("b1|c2,d2,a3")