Finding the optimal maintenance policy via Markov Decision Process and Policy iteration algorithm (dynamic programming)

Bechir Trabelsi
16 min readSep 17, 2023

--

Introduction:

Solving optimal maintenance policies using Markov Decision Processes (MDPs) and dynamic programming is a powerful approach in the field of operations research and maintenance engineering. This methodology leverages the principles of stochastic processes and decision theory to determine the most cost-effective strategies for maintaining systems subject to wear and deterioration.

In this context, a system’s condition is often modeled as a Markov process, where transitions between different states (representing various levels of degradation or health) occur with probabilities that depend solely on the current state. The objective is to find a maintenance policy that minimizes long-term expected costs or maximizes expected rewards over time.

Dynamic programming comes into play as a key technique for solving MDPs. It involves breaking down a complex decision-making problem into smaller, more manageable subproblems. By recursively calculating the value function (representing the expected cumulative rewards) for each state in the system, dynamic programming enables the determination of an optimal policy.

The outcome of this approach is a set of decision rules that dictate when and how maintenance actions should be performed based on the observed state of the system. These policies are designed to balance the costs of maintenance against the benefits of extended system lifespan and reliability.

In this article, we’ll be heavily making use of the amazing book Mastering Reinforcement Learning with Python By Enes Bilgin. We’ll be borrowing and reusing some their code for generic functions that apply to any problem, and using some of their excellent mathematical explaining to do our own for our specific problem. Please check it out if you’re interested in RL.

Problem definition

The first step we have to do is define the rules of our problem.

Imagine a scenario where a machine’s performance is contingent on its health status. The machine can be in one of four health states: ‘good’, ‘fair’, ‘poor’, or ‘failed’. The likelihood of transitioning between these states varies based on their current health.

Additionally, the machine operates over a span of seven days, starting from Monday (‘Mon’) to Sunday (‘Sun’). At the end of each day, a maintenance decision must be made: whether to perform maintenance or not, maintenance has a cost obviously.

In addition, if the machine is a poor or failed state, it will fail to reach a certain production quota, which we translate by having a cost when in these states.

The Machine’s Condition

The machine’s condition is represented by a discrete state space, denoted by a tuple (health, day, maintenance). ‘health’ indicates the machine’s current health state, ‘day’ denotes the current day of operation, and ‘maintenance’ is a binary value indicating whether maintenance was performed (1) or not (0).

The health states are categorized as follows:

  • ‘good’: Represents an optimally functioning machine.
  • ‘fair’: Indicates a machine with some minor performance issues.
  • ‘poor’: Signifies a machine with notable performance degradation.
  • ‘failed’: Represents a machine that has completely malfunctioned.

We define the initial probabilty distribtion as follows (I will have to switch to latex and take screenshots several times in this article whenever there are a lot of equations and mathematical formulas):

Next we define the transition probabilies:

Now we can create is a simulation environment for the machine as per the dynamics we described above. we will use the popular framework designed exactly for the same purpose, which is OpenAI’s Gym library.

class machine(gym.Env):
def __init__(self):
self.machine_health = ['good', 'fair', 'poor', 'failed']
self.p_health = [0.7, 0.2, 0.05, 0.05]
self.days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat','Sun']
self.maintenance_cost = 30
self.failure_cost = 300 # Cost associated with being in failure state
self.poor_cost = 100 # Cost associated with being in poor state
self.fair_cost = 50 # Cost associated with being in fair state
self.net_revenue = 7
self.action_space = [0, 1]
self.state_space = [(h,d,i) for h in self.machine_health for d in self.days for i in self.action_space]

Next, we need to define a method that calculates the next state and the reward along with the relevant quantities, given the current state and action

def get_next_state_reward(self, state, action):
health, day, maintenance = state
# to discourage the model from applying maitenance to a machine in good health
if health=='good' and action==1:
self.maintenance_cost = 150
if action == 1:
health = 'good'
maintenance = 0

# Define the output value based on health
output = 0
health_cost = 0
if health == 'good':
output = 50
elif health == 'fair':
output = 30
health_cost = self.fair_cost
elif health == 'poor':
output = -10
health_cost = self.poor_cost
elif health == 'failed':
output = -50
health_cost = self.failure_cost
# Calculate the reward
reward = (output * self.net_revenue) - health_cost - action*self.maintenance_cost
day = self.days[self.days.index(day) + 1]
next_state = (health, day, maintenance)
return next_state, reward

Now, we’ll need to define a method that returns all possible transitions and rewards for a given state and action pair using the get_next_state_reward method, together with the corresponding probabilities.

def get_transition_prob(self, state, action):
next_s_r_prob = {}
for ix, _ in enumerate(self.machine_health):
next_s, reward = self.get_next_state_reward(state,
action
)

prob = self.p_health[ix]
if (next_s, reward) not in next_s_r_prob:
next_s_r_prob[next_s, reward] = prob
else:
next_s_r_prob[next_s, reward] += prob
return next_s_r_prob

Policy evaluation

In the context of our problem, the objective is to find the best strategy (policy) for maintaining the machine’s health over time. To achieve this, it’s crucial to be able to evaluate the effectiveness of a given policy. This process, known as policy evaluation, enables us to assess how well a particular maintenance strategy performs.

Iterative policy evaluation is a technique used to estimate the value of each state under a fixed policy. It involves iteratively updating the value estimates based on the expected returns obtained from different state-action pairs according to the given policy. This approach allows us to refine our understanding of how valuable each state is in the context of the chosen policy.

To elaborate further, we’ll start by examining the existing policy employed by the owner of the machine, referred to as the “base policy.” This policy likely outlines specific actions to take in different states, aiming to optimize the machine’s performance and minimize costs.

By employing iterative policy evaluation, we can compute the value of each state within the framework of this base policy. This process involves repeatedly estimating the expected return from a state and updating the value estimate accordingly. This iterative approach allows us to progressively refine our understanding of the effectiveness of the policy.

Ultimately, the goal is to determine which policy yields the highest expected returns and, consequently, represents the most favorable maintenance strategy for the machine. This evaluation process lays the foundation for subsequent steps in the decision-making process, such as policy improvement and optimization.

Next let’s write a function to define a basic policy we can test, let’s say that each day you choose randomly with a probability p to do maintenance or not , we can translate this as follows

def base_policy(states, prob_1, prob_0):
policy = {}
for s in states:
health, day, maintenance = s
prob_a = {}

if health == 'good':
prob_a[0] = 1
else:
prob_a[1] = prob_1 # action 1 corresponds to maintenance
prob_a[0] = prob_0 # action 0 corresponds to no maintenance

policy[s] = prob_a
return policy

what is happening here ? we pass the function a set of states, and the probabilities of performing maintenance or not , the function then iterates of over the states, checks the health and takes a decision, here if health is good there is no need to do maintenance, so prob(0) = 1, else we do maintenance with a probability of prob_1.

Next, we define a function that will calculate the expected update for a given state and the corresponding policy for that state, meaning we compute the below term for a given state s:

def expected_update(env, v, s, prob_a, gamma):
expected_value = 0
for a in prob_a:
prob_next_s_r = env.get_transition_prob(s, a)

for next_s, r in prob_next_s_r:
expected_value += prob_a[a] \
* prob_next_s_r[next_s, r] \
* (r + gamma * v[next_s])
return expected_value

Next we define a policy evaluation function that performs expected updates for all states until the state values converge or it reaches a maximum number of iterations.

def policy_evaluation(env, policy, max_iter=100, v=None, eps=0.001, gamma=0.8):
if not v:
v = {s: 0 for s in env.state_space}
k = 0
while True:
max_delta = 0
for s in v:

if not env.is_terminal(s):
v_old = v[s]
prob_a = policy[s]
v[s] = expected_update(env, v, s, prob_a, gamma)
max_delta = max(max_delta, abs(v[s] - v_old))
k += 1
if max_delta < eps:
print("Converged in", k, "iterations.")
break
elif k == max_iter:
print("Terminating after", k, "iterations.")
break
return v

Here’s how it operates:

a) The `policy_evaluation` function takes an environment object, which, in our case, will be an instance of the `machine` class.

b) It evaluates the specified policy, which is represented as a dictionary mapping states to action probabilities.

c) Unless an initialization is provided, all state values are initially set to 0. The state values for terminal states (those corresponding to Sundays in this context) remain unchanged since we don’t anticipate any further rewards beyond that point.

d) An epsilon value is defined to serve as a convergence threshold. If, in a given round, the maximum change in state values is below this threshold, the evaluation process concludes.

e) Given that this is an episodic task with a finite number of steps, we default the discount factor gamma to 1.

f) The function returns the state values, which will be useful for subsequent steps.

Now, let’s evaluate the base policy using this function. First, we create a `machine` object from the class defined earlier, then we define 3 base policies, and see what happens when we start on monday with poor health:

  • policy 1 : we’ll say each day if the state is not good you perform maintenance with a probability of 50%
  • policy 2: you never perform maintenance regardless of health state
  • policy 3: You always perform maintenance

So as you can see , in this specific scenario, performing maintenance whenever health is not good is the optimal solution.

Simulating the environment:

Now to fully be able to run simulations, we’ll have to add some methods to our machine class:

  1. Reset Method:
    The `reset` method initializes the environment to a default state. In this case, it sets the health to ‘good’, the day to Monday, and the action to 0, representing no maintenance. It then returns a tuple representing the state.
  def reset(self):
self.health = 'good'
self.day = "Mon"
self.action = 0
state = (self.health, self.day, self.action)
return statepp

2. Terminal State Check Method:
The `is_terminal` method checks if a given state is terminal. A state is considered terminal if it corresponds to Sunday or if it’s the last day of the week, which is determined based on the predefined list of days.

  def is_terminal(self, state):
health, day, maintenance = state
if day == "Sun" or self.days.index(day) == len(self.days):
return True
else:
return False

3. Step Method:
The `step` method simulates a time step in the environment. It takes the current state, applies an action, and returns the updated state, immediate reward, a flag indicating whether the episode is complete, and additional information.
Inside the method:
— It assumes the input ‘state’ contains information about the initial health, day, and maintenance.
— It uses a predefined transition matrix to probabilistically determine the next health state based on the current health state.
— It then updates the health variable with the newly generated health state.
— The method uses the `get_next_state_reward` function (which is assumed to be defined elsewhere) to calculate the next state and reward based on the current state and chosen action.
— It checks if the next state is terminal using the `is_terminal` method.
— Finally, it returns the updated state, reward, terminal flag, and a dictionary of additional information.

def step(self, action, state):
# Assuming 'state' is the current state
init_health, day, maintenance = state
print('initial_state',init_health, day, maintenance)
transition_matrix = [
[0.6, 0.2, 0.1, 0.1],
[0, 0.8, 0.1, 0.1],
[0, 0, 0.7, 0.3],
[0, 0, 0, 1]
]

# Convert the transition matrix to a numpy array for efficient calculations
transition_matrix = np.array(transition_matrix)

# Assuming self.machine_health is ordered as ['good', 'fair', 'poor', 'failed']
health_states = ['good', 'fair', 'poor', 'failed']

# Assuming init_health is the index of the initial health state in health_states
init_health_index = health_states.index(init_health)

# Generate the next health state based on transition probabilities
next_health_index = np.random.choice(len(health_states), p=transition_matrix[init_health_index])
next_health = health_states[next_health_index]

# Update the 'health' variable with the newly generated health state
health = next_health

# Update state based on action
next_state, reward = self.get_next_state_reward(state, action)

done = self.is_terminal(next_state)

info = {'health': health, 'action': action, 'reward':reward}

return next_state, reward, done, info

4. Choosing an Action:
The `choose_action` function selects an action based on a given policy and the current state. It considers the probabilities associated with each action in the policy to make a random selection.

def choose_action(state, policy):
prob_a = policy[state]
action = np.random.choice(a=list(prob_a.keys()),
p=list(prob_a.values()))
return action

5. Simulating a Policy:
The `simulate_policy` function executes a specified policy for a defined number of episodes. It operates as follows:
— Initializes a random seed for reproducibility.
— Creates an instance of the `machine` class to represent the environment.
— Tracks the rewards obtained during the episodes.
For each episode:
— Starts from the provided state.
— Iterates through time steps until the episode terminates, choosing actions based on the policy.
— Accumulates the rewards obtained.
— Calculates and prints the expected weekly profit based on the collected rewards.

def simulate_policy(state, policy, n_episodes):
np.random.seed(0)
env = machine()
rewards = []
for i_episode in range(n_episodes):
state = state
done = False
ep_reward = 0
while not done:
action = choose_action(state, policy)
state, reward, done, info = env.step(action, state)
print('NEXT STATE:==>',state, reward, done, info)
ep_reward += reward
rewards.append(ep_reward)
print("Expected weekly profit:", np.mean(rewards))

These functions collectively enable the simulation of your machine health problem using a specified policy, allowing for evaluation and refinement of policies to find the most optimal one for your specific scenario.

Let’s run a simulation for one episode and see what happens:

So here we started with a poor machine on Monday, thus the policy decides we need to perform maintenance hence action = 1 in the second line, from tuesday till the end of the week the machine stayed in good health so there was no need to perform any action.

Policy Iteration

Policy Iteration is a process that enables the comparison and iterative enhancement of policies. It consists of several steps:

  1. Policy Comparison and Improvement:

We begin by having two policies denoted as π′ and π′, which we aim to assess. The policy π′ is considered to be as good as π if the state values produced by π′ are greater than or equal to those generated by π for all states in the state space S. In mathematical terms, this is expressed as:

In essence, if the expected cumulative reward from any given state onwards is higher or equal under policy π′ compared to policy π, then we consider π′ to be as good as π. If this inequality is strictly true for at least one state, then π′ is considered to be a superior policy to π. This interpretation aligns with the notion that state-values reflect the expected rewards accumulated from a specific point onwards.

2. Policy Improvement Theorem:
To transition from policy π to a potentially superior policy π′, we invoke the policy improvement theorem (see http://incompleteideas.net/book/ebook/node42.html ).
The action-value function quantifies the expected cumulative rewards of taking a specific action from a certain state. It plays a crucial role in policy improvement by guiding the selection of actions that lead to higher expected rewards.
The iterative nature of policy improvement ensures that at each
step, actions are chosen in a way that systematically enhances the
policy.

The action-value function, denoted as qπ(s, a), represents the expected cumulative future reward under policy π when taking action a in the current state s, and then subsequently following policy π.
To clarify, qπ(s, a) accounts for a single instance where there is a deviation from the policy π in the current step, as action a is taken instead.
This concept is integral to policy improvement. The policy improvement theorem asserts that if it is more advantageous to initiate action a in state s and then adhere to policy π for the remainder of the states, rather than consistently following policy π throughout, then adopting action a every time in state s becomes a superior policy compared to π.
In simpler terms, when qπ(s, a) > vπ(s), it implies that by selecting action a when in state S and subsequently following policy π, we can enhance the policy. This observation is a fundamental underpinning of policy improvement. While the detailed proof of this theorem can be found in Sutton and Barto’s work (2018), the logic behind it is notably intuitive.

Suppose we have two policies, π and π′, and we find that for every state s, the action-value function qπ(s, π′(s)) is greater than or equal to the state-value function vπ(s), denoted as qπ(s, π′(s)) ≥ vπ(s). This signifies that policy π is at least as effective as policy π′. To enhance a policy, all that’s required is to determine the action π′(s) that maximizes the action-value function qπ(s, a) for each state. In mathematical terms, this is expressed as
π′(s) ≜ argmax_a qπ(s, a).
Before we conclude, it’s important to note that while we’ve
discussed this policy improvement for a deterministic policy π, the
same principle applies seamlessly to stochastic policies as well

By combining policy evaluation and improvement, Policy Iteration
allows us to iteratively refine policies until an optimal one is found.
This process forms the cornerstone of reinforcement learning,
enabling agents to learn effective strategies through a combination
of evaluation and improvement steps.

Policy Iteration for optimal maintenance policy Problem

We have already established the code for policy evaluation and expected update. What remains is to incorporate the policy improvement step to obtain an optimal policy. Let’s get right into it.

We’ll initiate by implementing the policy improvement as described earlier:

def policy_improvement(env, v, s, actions, gamma):

prob_a = {}

if not env.is_terminal(s):

max_q = np.NINF

best_a = None

for a in actions:

q_sa = expected_update(env, v, s,

{a: 1}, gamma)

if q_sa >= max_q:

max_q = q_sa

best_a = a

prob_a[best_a] = 1

else:

max_q = 0

return prob_a, max_q

This function looks for the action that yields the highest q-value for a given state using the value functions derived from the current policy. For terminal states, the q-value remains at 0.

Now, let’s consolidate everything into a policy iteration algorithm:

def policy_iteration(env,  eps=0.1, gamma=1):

np.random.seed(1)

states = env.state_space

actions = env.action_space

policy = {s: {np.random.choice(actions): 1}

for s in states}

v = {s: 0 for s in states}

while True:

v = policy_evaluation(env, policy, v=v,

eps=eps, gamma=gamma)

old_policy = policy

policy = {}

for s in states:

policy[s], _ = policy_improvement(env, v, s,

actions, gamma)

if old_policy == policy:

break

print("Optimal policy found!")

return policy, v

This algorithm commences with a random policy. In each iteration, it executes policy evaluation and improvement. The process halts when the policy stabilizes, indicating that an optimal policy has been reached.

Okay, now we’re ready let’s try and find an optimal policy based on all the contraints we defined:

Looks like we found one after 7 iterations. Let’s examine it

As you will see below this actually the always do maintenance when health state is not ‘good’ policy, we looked into this earlier and found out it gave us the maxmimum profit per week. If we change the environment parameters we can expect to find other policies.

Conclusion

Dynamic Programming (DP) methods such as policy iteration, offer a solid foundation for solving Markov Decision Processes (MDPs) and gaining a deep understanding of their mechanics. These techniques are notably more efficient compared to alternative approaches like direct search algorithms or linear programming methods. However, practical applications of DP face significant challenges,

Curse of Dimensionality

One of the chief limitations of DP lies in the requirement to iterate over the entire state space multiple times to reach an optimal policy. This involves storing policy, state values, and action values for each state, which can quickly become unwieldy. In realistic scenarios, the state space can grow exponentially, a phenomenon known as the “curse of dimensionality.” This poses a significant challenge when attempting to apply DP to real-world problems.

Need for a Complete Model of the Environment

In DP, we have relied on accurate knowledge of the transition probabilities within the environment for policy evaluation, iteration, and value iteration. However, this is often a luxury that eludes us in practical applications. Calculating these probabilities for every possible transition can be exceedingly complex, if not impossible to enumerate. Instead, obtaining sample trajectories of transitions, whether from the environment or its simulation, is a more viable alternative.

The pivotal question then arises: how can we leverage these sample trajectories to learn (near) optimal policies? This question forms the crux of the subsequent chapters, where we explore Monte Carlo and temporal-difference methods. These concepts lie at the heart of many advanced reinforcement learning algorithms and provide a pathway to overcoming the limitations of DP in practical scenarios.

--

--

No responses yet