Skip to content

Commit

Permalink
Added POMDP-value-iteration (aimacode#929)
Browse files Browse the repository at this point in the history
* Added POMDP value iteration

* Added plot_pomdp_utility function

* Added tests for pomdp-value-iteration

* Updated README.md

* Fixed notebook import

* Changed colors

* Added notebook sections for POMDP and pomdp_value_iteration

* Fixed notebook parsing error

* Replace pomdp.ipynb

* Updated README.md

* Fixed line endings

* Fixed line endings

* Fixed line endings

* Fixed line endings

* Removed numpy dependency

* Added docstrings

* Fix tests

* Added a test for pomdp_value_iteration

* Remove numpy dependencies from mdp.ipynb

* Added POMDP to mdp_apps.ipynb
  • Loading branch information
ad71 authored and norvig committed Jul 11, 2018
1 parent 68327a8 commit 4f1eb25
Show file tree
Hide file tree
Showing 7 changed files with 1,418 additions and 254 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Here is a table of algorithms, the figure, name of the algorithm in the book and
| 16.9 | Information-Gathering-Agent | | | | |
| 17.4 | Value-Iteration | `value_iteration` | [`mdp.py`][mdp] | Done | Included |
| 17.7 | Policy-Iteration | `policy_iteration` | [`mdp.py`][mdp] | Done | Included |
| 17.9 | POMDP-Value-Iteration | | | | |
| 17.9 | POMDP-Value-Iteration | `pomdp_value_iteration` | [`mdp.py`][mdp] | Done | Included |
| 18.5 | Decision-Tree-Learning | `DecisionTreeLearner` | [`learning.py`][learning] | Done | Included |
| 18.8 | Cross-Validation | `cross_validation` | [`learning.py`][learning] | | |
| 18.11 | Decision-List-Learning | `DecisionListLearner` | [`learning.py`][learning]\* | | |
Expand Down
778 changes: 772 additions & 6 deletions mdp.ipynb

Large diffs are not rendered by default.

209 changes: 208 additions & 1 deletion mdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from utils import argmax, vector_add, orientations, turn_right, turn_left

import random
import numpy as np
from collections import defaultdict


class MDP:
Expand Down Expand Up @@ -51,11 +53,13 @@ def __init__(self, init, actlist, terminals, transitions=None, reward=None, stat

def R(self, state):
"""Return a numeric reward for this state."""

return self.reward[state]

def T(self, state, action):
"""Transition model. From a state and an action, return a list
of (probability, result-state) pairs."""

if not self.transitions:
raise ValueError("Transition model is missing")
else:
Expand All @@ -65,6 +69,7 @@ def actions(self, state):
"""Return a list of actions that can be performed in this state. By default, a
fixed list of actions, except for terminal states. Override this
method if you need to specialize by state."""

if state in self.terminals:
return [None]
else:
Expand Down Expand Up @@ -106,7 +111,10 @@ def check_consistency(self):

class MDP2(MDP):

"""Inherits from MDP. Handles terminal states, and transitions to and from terminal states better."""
"""
Inherits from MDP. Handles terminal states, and transitions to and from terminal states better.
"""

def __init__(self, init, actlist, terminals, transitions, reward=None, gamma=0.9):
MDP.__init__(self, init, actlist, terminals, transitions, reward, gamma=gamma)

Expand Down Expand Up @@ -160,11 +168,13 @@ def T(self, state, action):

def go(self, state, direction):
"""Return the state that results from going in this direction."""

state1 = vector_add(state, direction)
return state1 if state1 in self.states else state

def to_grid(self, mapping):
"""Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""

return list(reversed([[mapping.get((x, y), None)
for x in range(self.cols)]
for y in range(self.rows)]))
Expand All @@ -190,6 +200,7 @@ def to_arrows(self, policy):

def value_iteration(mdp, epsilon=0.001):
"""Solving an MDP by value iteration. [Figure 17.4]"""

U1 = {s: 0 for s in mdp.states}
R, T, gamma = mdp.R, mdp.T, mdp.gamma
while True:
Expand All @@ -206,6 +217,7 @@ def value_iteration(mdp, epsilon=0.001):
def best_policy(mdp, U):
"""Given an MDP and a utility function U, determine the best policy,
as a mapping from state to action. (Equation 17.4)"""

pi = {}
for s in mdp.states:
pi[s] = argmax(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
Expand All @@ -214,13 +226,15 @@ def best_policy(mdp, U):

def expected_utility(a, s, U, mdp):
"""The expected utility of doing a in state s, according to the MDP and U."""

return sum(p*U[s1] for (p, s1) in mdp.T(s, a))

# ______________________________________________________________________________


def policy_iteration(mdp):
"""Solve an MDP by policy iteration [Figure 17.7]"""

U = {s: 0 for s in mdp.states}
pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}
while True:
Expand All @@ -238,13 +252,206 @@ def policy_iteration(mdp):
def policy_evaluation(pi, U, mdp, k=20):
"""Return an updated utility mapping U from each state in the MDP to its
utility, using an approximation (modified policy iteration)."""

R, T, gamma = mdp.R, mdp.T, mdp.gamma
for i in range(k):
for s in mdp.states:
U[s] = R(s) + gamma*sum(p*U[s1] for (p, s1) in T(s, pi[s]))
return U


class POMDP(MDP):

"""A Partially Observable Markov Decision Process, defined by
a transition model P(s'|s,a), actions A(s), a reward function R(s),
and a sensor model P(e|s). We also keep track of a gamma value,
for use by algorithms. The transition and the sensor models
are defined as matrices. We also keep track of the possible states
and actions for each state. [page 659]."""

def __init__(self, actions, transitions=None, evidences=None, rewards=None, states=None, gamma=0.95):
"""Initialize variables of the pomdp"""

if not (0 < gamma <= 1):
raise ValueError('A POMDP must have 0 < gamma <= 1')

self.states = states
self.actions = actions

# transition model cannot be undefined
self.t_prob = transitions or {}
if not self.t_prob:
print('Warning: Transition model is undefined')

# sensor model cannot be undefined
self.e_prob = evidences or {}
if not self.e_prob:
print('Warning: Sensor model is undefined')

self.gamma = gamma
self.rewards = rewards

def remove_dominated_plans(self, input_values):
"""
Remove dominated plans.
This method finds all the lines contributing to the
upper surface and removes those which don't.
"""

values = [val for action in input_values for val in input_values[action]]
values.sort(key=lambda x: x[0], reverse=True)

best = [values[0]]
y1_max = max(val[1] for val in values)
tgt = values[0]
prev_b = 0
prev_ix = 0
while tgt[1] != y1_max:
min_b = 1
min_ix = 0
for i in range(prev_ix + 1, len(values)):
if values[i][0] - tgt[0] + tgt[1] - values[i][1] != 0:
trans_b = (values[i][0] - tgt[0]) / (values[i][0] - tgt[0] + tgt[1] - values[i][1])
if 0 <= trans_b <= 1 and trans_b > prev_b and trans_b < min_b:
min_b = trans_b
min_ix = i
prev_b = min_b
prev_ix = min_ix
tgt = values[min_ix]
best.append(tgt)

return self.generate_mapping(best, input_values)

def remove_dominated_plans_fast(self, input_values):
"""
Remove dominated plans using approximations.
Resamples the upper boundary at intervals of 100 and
finds the maximum values at these points.
"""

values = [val for action in input_values for val in input_values[action]]
values.sort(key=lambda x: x[0], reverse=True)

best = []
sr = 100
for i in range(sr + 1):
x = i / float(sr)
maximum = (values[0][1] - values[0][0]) * x + values[0][0]
tgt = values[0]
for value in values:
val = (value[1] - value[0]) * x + value[0]
if val > maximum:
maximum = val
tgt = value

if all(any(tgt != v) for v in best):
best.append(np.array(tgt))

return self.generate_mapping(best, input_values)

def generate_mapping(self, best, input_values):
"""Generate mappings after removing dominated plans"""

mapping = defaultdict(list)
for value in best:
for action in input_values:
if any(all(value == v) for v in input_values[action]):
mapping[action].append(value)

return mapping

def max_difference(self, U1, U2):
"""Find maximum difference between two utility mappings"""

for k, v in U1.items():
sum1 = 0
for element in U1[k]:
sum1 += sum(element)
sum2 = 0
for element in U2[k]:
sum2 += sum(element)
return abs(sum1 - sum2)


class Matrix:
"""Matrix operations class"""

@staticmethod
def add(A, B):
"""Add two matrices A and B"""

res = []
for i in range(len(A)):
row = []
for j in range(len(A[0])):
row.append(A[i][j] + B[i][j])
res.append(row)
return res

@staticmethod
def scalar_multiply(a, B):
"""Multiply scalar a to matrix B"""

for i in range(len(B)):
for j in range(len(B[0])):
B[i][j] = a * B[i][j]
return B

@staticmethod
def multiply(A, B):
"""Multiply two matrices A and B element-wise"""

matrix = []
for i in range(len(B)):
row = []
for j in range(len(B[0])):
row.append(B[i][j] * A[j][i])
matrix.append(row)

return matrix

@staticmethod
def matmul(A, B):
"""Inner-product of two matrices"""

return [[sum(ele_a*ele_b for ele_a, ele_b in zip(row_a, col_b)) for col_b in list(zip(*B))] for row_a in A]

@staticmethod
def transpose(A):
"""Transpose a matrix"""

return [list(i) for i in zip(*A)]


def pomdp_value_iteration(pomdp, epsilon=0.1):
"""Solving a POMDP by value iteration."""

U = {'':[[0]* len(pomdp.states)]}
count = 0
while True:
count += 1
prev_U = U
values = [val for action in U for val in U[action]]
value_matxs = []
for i in values:
for j in values:
value_matxs.append([i, j])

U1 = defaultdict(list)
for action in pomdp.actions:
for u in value_matxs:
u1 = Matrix.matmul(Matrix.matmul(pomdp.t_prob[int(action)], Matrix.multiply(pomdp.e_prob[int(action)], Matrix.transpose(u))), [[1], [1]])
u1 = Matrix.add(Matrix.scalar_multiply(pomdp.gamma, Matrix.transpose(u1)), [pomdp.rewards[int(action)]])
U1[action].append(u1[0])

U = pomdp.remove_dominated_plans_fast(U1)
# replace with U = pomdp.remove_dominated_plans(U1) for accurate calculations

if count > 10:
if pomdp.max_difference(U, prev_U) < epsilon * (1 - pomdp.gamma) / pomdp.gamma:
return U


__doc__ += """
>>> pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .01))
Expand Down
Loading

0 comments on commit 4f1eb25

Please sign in to comment.