codigo actualizado

marta 1 year ago
parent 3355e5e7a3
commit 2ec02b2a44
  1. 620

@ -0,0 +1,620 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
Created on Tue Feb 6 19:02:32 2024
@author: marta
import numpy as np
import plotly.graph_objects as go
from tqdm.notebook import tqdm
import as px
import matplotlib as mpl
mpl.rcParams['figure.dpi'] = 300
import matplotlib.pyplot as plt
import seaborn as sns
import os
from wand.image import Image as WImage
# sns.set(palette="husl",font_scale=1)
# %config InlineBackend.figure_format = 'retina'
import copy
#%load_ext line_profiler
#Define constants
#L = 2*np.pi # periodic domain size
# define boundaries of simulation box
x0 = 0
x1 = L
z0 = 0
z1 = L
# define reinforcement learning problem
N_states = 4 # number of states - one for each coarse-grained degree of vorticity
N_actions = 2 # number of actions - one for each coarse-grained swimming direction
# numerical parameters
dt = 0.0001 # timestep size
#Utility functions
def moving_average(a, n=3) :
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret[n - 1:] / n
# Runga-Kutta 4(5) integration for one step
# see
def DoPri45Step(f,t,x,h):
k1 = f(t,x)
k2 = f(t + 1./5*h, x + h*(1./5*k1) )
k3 = f(t + 3./10*h, x + h*(3./40*k1 + 9./40*k2) )
k4 = f(t + 4./5*h, x + h*(44./45*k1 - 56./15*k2 + 32./9*k3) )
k5 = f(t + 8./9*h, x + h*(19372./6561*k1 - 25360./2187*k2 + 64448./6561*k3 - 212./729*k4) )
k6 = f(t + h, x + h*(9017./3168*k1 - 355./33*k2 + 46732./5247*k3 + 49./176*k4 - 5103./18656*k5) )
v5 = 35./384*k1 + 500./1113*k3 + 125./192*k4 - 2187./6784*k5 + 11./84*k6
k7 = f(t + h, x + h*v5)
v4 = 5179./57600*k1 + 7571./16695*k3 + 393./640*k4 - 92097./339200*k5 + 187./2100*k6 + 1./40*k7;
return v4,v5
#Define useful data structures
#Define a dictionary of the possible states and their assigned indices
distance_states = ["ri", "rni"] #ri es rij<rct y rni es rij>rct
frecuency_states = ["wo", "wh"] #wo es w<wc y wh es w>wc
product_states = [(x,y) for x in distance_states for y in frecuency_states] # all possible states
state_lookup_table = {product_states[i]:i for i in range(len(product_states))} # returns index of given state
# print(product_states) # to view mapping
#Define an agent class for reinforcement learning
class Agent:
def __init__(self, Ns):
self.r = np.zeros(Ns) # reward for each stage
self.t = 0 # time
# calculate reward given from entering a new state after a selected action is undertaken
def calc_reward(self):
# enforce implementation by subclass
if self.__class__ == AbstractClass:
raise NotImplementedError
def update_state(self):
# enforce implementation by subclass
if self.__class__ == AbstractClass:
raise NotImplementedError
def take_random_action(self):
# enforce implementation by subclass
if self.__class__ == AbstractClass:
raise NotImplementedError
def take_greedy_action(self, Q):
# enforce implementation by subclass
if self.__class__ == AbstractClass:
raise NotImplementedError
#Define swimmer class derived from agent
class Swimmer(Agent):
def __init__(self, Ns, ni, sigma):
# call init for superclass
super().__init__(Ns) = ni
self.sigma = sigma
# local position within the periodic box. X = [x, z]^T with 0 <= x < 2 pi and 0 <= z < 2 pi
self.X = np.array([np.random.uniform(0, L), np.random.uniform(0, L), 0])
# absolute position. -inf. <= x_total < inf. and -inf. <= z_total < inf.
self.X_total = self.X
# particle orientation
self.theta = np.random.uniform(0, 2*np.pi) # polar angle theta in the x-z plane
self.p = np.array([np.cos(self.theta), np.sin(self.theta)]) # p = [px, pz]^T
# translational and rotational velocity
self.U = np.zeros(3, float)
self.W = np.array([0., 0., 1.]) #Velocidad angular aleatoria
#distancia entre el swimmer y el obstáculo
self.R=np.random.uniform(0, 2.5, 1)
# preferred swimming direction (equal to [1,0], [0,1], [-1,0], or [0,-1])
self.ka = np.array([0,1])
# history of local and global position. Only store information for this episode.
self.history_X = [self.X]
self.history_X_total = [self.X_total]
# local vorticity at the current location
_, _, self.w = tgv(self.X[0], self.X[1])
# update coarse-grained state
self.obstacles= self.generate_obstacles()
#distancia entre el swimmer y el obstáculo
#self.R = [np.linalg.norm(self.X[:2] - np.array([obstacle_x, obstacle_y])) for obstacle_x, obstacle_y in zip(self.obstacles[::2], self.obstacles[1::2])]
def generate_obstacles(self):
obstacles=[] #el numero de obstáculos será 10*10
cell_spacing= L/4
for i in range(4):
for j in range(4):
obstacle_x= i*cell_spacing
obstacle_y= j*cell_spacing
return obstacles
def interaction_with_obstacles(self,obstacles,kappa,alpha,beta,gamma,Pe,dt):
F= np.array([0.,0.,0.])
for i in range(len(obstacles)//2):
obstacle_position = np.array([obstacles[2*i],obstacles[2*i+1], 0])
r=self.X - obstacle_position
Re= (.5*self.sigma)**2*np.linalg.norm(self.W)/
#F de atracción
F_attr= gamma*(np.exp(-r_norm/kappa)/r_norm**2)*(kappa+r_norm)*r
#Fuerza total
xi=np.random.normal(0,1, size=2) #vector de números aleatorios generados a partir de una distribución normal estándar con dos componentes, xi creo que es un vector de ruido estocástico (modela el ruido térmico)
dr_therm = np.sqrt(2*self.sigma**2*dt/Pe)*xi
dr = F[:-1]*dt + dr_therm
#actualizamos la posición del spinner
self.X[:-1] += dr
self.U = np.array([dr[0]/dt, dr[1]/dt, 0])
all_dists = np.empty(len(obstacles)//2)
for i in range(len(obstacles)//2):
all_dists[i] = np.linalg.norm(self.X-np.array([obstacles[2*i],obstacles[2*i+1], 0]))
self.R = np.amin(all_dists)
#comprobamos que el spinner siga dentro del box periódico
def reinitialize(self):
self.X = np.array([np.random.uniform(0, L), np.random.uniform(0, L)])
self.X_total = self.X
self.theta = np.random.uniform(0, 2*np.pi) # polar angle theta in the x-z plane
self.p = np.array([np.cos(self.theta), np.sin(self.theta)]) # p = [px, pz]^T # orientación del nadador
self.U = np.zeros(2)
self.W = np.array([0, 0, 1])
self.ka = np.array([0,1])
self.history_X = [self.X]
self.history_X_total = [self.X_total]
self.R=np.random.uniform(0, 2.5)
self.t = 0
def update_kinematics(self, Φ, Ψ, D0 = 0, Dr = 0, int_method = "euler"): # Actualiza la posición y orientación del nadador según un método de integración especificado.
if int_method == "rk45":
y0 = np.concatenate((self.X,self.p))
_, v5 = DoPri45Step(self.calc_velocity_rk45,self.t,y0,dt)
y = y0 + dt*v5
self.X = y[:2]
self.p = y[2:]
dx = self.X - self.history_X[-1]
self.X_total = self.X_total + dx
# check if still in the periodic box
# ensure the vector p has unit length
self.p /= (self.p[0]**2 + self.p[1]**2)**(1/2)
# update polar angle
x = self.p[0]
yy = self.p[1]
self.theta = np.arctan2(yy,x) if yy >= 0 else (np.arctan2(yy,x) + 2*np.pi)
# store positions
elif int_method == "euler":
# calculate new translational and rotational velocity
self.calc_velocity(Φ, Ψ)
self.update_position(int_method, D0)
self.update_orientation(int_method, Dr)
raise Exception("Integration method must be 'Euler' or 'rk45'")
self.t = self.t + dt
def calc_velocity_rk45(self, t, y): #calcula la velocidad del nadador en un determinado tiempo 't' y estado 'y' utilizando Rk45
x = y[0]
z = y[1]
px = y[2]
pz = y[3]
ux, uz, self.w = tgv(x, z) #tgv proporciona velocidades de flujo en la posición (x,z), w es la vorticidad
#cálculo de las velocidades translacionales
U0 = ux + Φ*px #ux y uz son las velocidades del flujo
U1 = uz + Φ*pz
#cálculo de las velocidades rotacionales
ka_dot_p = self.ka[0]*px + self.ka[1]*pz #alineación del vector de nado preferido con la dirección del nadador
W0 = 1/2/Ψ*(self.ka[0] - ka_dot_p*px) + 1/2*pz*self.w
W1 = 1/2/Ψ*(self.ka[1] - ka_dot_p*pz) + 1/2*-px*self.w
return np.array([U0, U1, W0, W1])
def update_position(self, int_method, D0): #D0 representa la difusión
# use explicit euler to update
dx = dt*self.U
if D0 > 0: dx = dx + np.sqrt(2*D0*dt)*np.random.normal(size=2) #posible efecto de la difusión browniana
self.X = self.X + dx
self.X_total = self.X_total + dx
# check if still in the periodic box
# store positions
def update_orientation(self, int_method, Dr):
self.p = self.p + dt*self.W #W velocidad angular
# ensure the vector p has unit length
self.p /= (self.p[0]**2 + self.p[1]**2)**(1/2)
# if rotational diffusion is present
if Dr > 0: #Dr representa difucion rotacional
px = self.p[0]
pz = self.p[1]
cross = px*pz
A = np.array([[1-px**2, -cross], [-cross, 1-pz**2]]) #A es una matriz
v = np.sqrt(2*Dr*dt)*np.random.normal(size=2) #v es un vector de valores aleatorios
self.p[0] = self.p[0] + A[0,0]*v[0] + A[0,1]*v[1] #Se calcula un cambio aleatorio en la orientación usando A y v
self.p[1] = self.p[1] + A[1,0]*v[0] + A[1,1]*v[1]
self.p /= (self.p[0]**2 + self.p[1]**2)**(1/2)
# update polar angle
x = self.p[0]
y = self.p[1]
self.theta = np.arctan2(y,x) if y >= 0 else (np.arctan2(y,x) + 2*np.pi)
def calc_velocity(self, Φ, Ψ):
ux, uz, self.w = tgv(self.X[0], self.X[1])
# careful - computing in the following way is significantly slower: self.U = np.array(ux, uz) + Φ*self.p
self.U[0] = ux + Φ*self.p[0]
self.U[1] = uz + Φ*self.p[1]
px = self.p[0]
pz = self.p[1]
ka_dot_p = self.ka[0]*px + self.ka[1]*pz
self.W[0] = 1/2/Ψ*(self.ka[0] - ka_dot_p*px) + 1/2*pz*self.w
self.W[1] = 1/2/Ψ*(self.ka[1] - ka_dot_p*pz) + 1/2*-px*self.w
def check_in_box(self): # Este método verifica si el nadador todavía está dentro del cuadro periódico
if self.X[0] < x0:
self.X[0] += L
elif self.X[0] > x1:
self.X[0] -= L
if self.X[1] < z0:
self.X[1] += L
elif self.X[1] > z1:
self.X[1] -= L
def calc_reward(self, n):
self.r[n] = self.history_X_total[-1][1]-self.history_X_total[-2][1]
def update_state(self):
#componente z de la velocidad angular
W_z = self.W[2]
if W_z <= 0.175**self.sigma*self.sigma):
W_state = "wo"
elif W_z > 0.175**self.sigma*self.sigma):
W_state = "wh"
raise Exception("Invalid value of w detected: ", W_z)
if self.R <= 1.25:
R_state = "ri"
elif self.R > 1.25:
R_state = "rni"
raise Exception ("Invalid value of r detected: ", self.R)
self.my_state = (R_state, W_state)
def take_greedy_action(self, Q):
state_index = state_lookup_table[self.my_state]
action_index = np.argmax(Q[state_index]) # find largest entry in this row of Q (i.e. this state)
if action_index == 0: # aumenta 1/8W
self.W[2] += 1./8*Wc
elif action_index == 1: # disminuye 1/8W
self.W[2] -= 1./8*Wc
raise Exception ("Action index out of bounds: ", action_index)
return action_index
def take_random_action(self):
action_index = np.random.randint(0, 2, 1)
if action_index == 0: # aumenta 1/8W
self.W[2] += 1./8*Wc
else: # disminuye 1/8W
self.W[2] -= 1./8*Wc
return action_index
#Define Taylor-Green vortex
# given position, return local velocity and vorticity
def tgv(x, z):
ux = -1/2*np.cos(x)*np.sin(z)
uz = 1/2*np.sin(x)*np.cos(z)
w = -np.cos(x)*np.cos(z)
return ux, uz, w
def training(alpha0, Φ, Ψ, Ns=4000, Ne=5000, gamma=0.999, eps0=0.0, D0=0, Dr=0, n_updates=1000, \
RIC=False, method="Qlearning", lr_decay=None, omega=0.85, eps_decay=False, Qin=None):
# n_updates - how often to plot the trajectory undertaken by the particle during the learning process
# Ne - number of episodes
# Ns - number of steps in an episode
# alpha0 - learning rate (or starting learning rate when employing LR decay)
# gamma - discount factor, i.e. how much we weigh future to present rewards. Close to 0 = myopic view.
# eps0 - fraction of the time we allow for exploration in selecting the following action. 0 = always greedy.
# D0 - translational diffusivity
# Dr - rotational diffusivity
# RIC - Reset of Initial Conditions. First time a state-action pair is encountered, set Q[s,a] = reward
# method - choose from Q-learning, Double Q-learning (, or Expected SARSA
# lr_decay - whether or not to use learning rate decay. Options are none, or polynomial (lr=1/#(s,a)**omega)
# omega - exponent used in lr_decay: lr = 1/#(s,a)**omega
# eps_decay - whether or not to decay epsilon linearly: eff_eps = eps0/k for the k-th step
# Qin - initial Q matrix. Useful for testing performance after an extensive exploration phase.
# if using the expected SARSA method, turn on epsilon decay since eps = 0 is simply Q-learning anyway
if method=="expSARSA":
eps_decay = True
if eps0 == 0: eps0 = 1
# Total reward for each episode
hist_R_tot_smart = np.zeros(Ne)
hist_R_tot_naive= np.zeros(Ne)
# learning gain per episode
Σ = np.zeros(Ne)
smart_stored_histories = [] # store position = f(t) every so often for an episode (smart particles)
naive_stored_histories = [] # store position = f(t) every so often for an episode (naive particles)
# number of times each state-action pair has been explored
state_action_counter = np.zeros((N_states,N_actions))
# initialize a naive and a smart gyrotactic particle
naive = Swimmer(Ns)
smart = Swimmer(Ns)
# initialize Q matrix to large value
if method=="doubleQ":
Q1 = L*Ns*np.ones((4, 2))
Q2 = L*Ns*np.ones((4, 2))
Q = L*Ns*np.ones((4, 2)) # 4 states, 2 possible actions. Each column is an action, w.
if Qin is not None: Q = Qin
# store average Q for each episode to track convergence
avg_Q_history = np.zeros((Ne,4,2))
# store initial position and orientation for each episode
initial_coords = np.zeros((Ne,3))
# iterate over episodes
k = 0
for ep in tqdm(range(Ne)):
# assign random orientation and position
naive = copy.deepcopy(smart) # have naive and smart share initial conditions for visualization purposes
# store initialization
initial_coords[ep,0:2] = smart.X
initial_coords[ep,2] = smart.theta
# save selected actions and particle orientation for last episodes
if ep == Ne - 1:
chosen_actions = np.zeros(Ns)
theta_history = np.zeros(Ns)
# iterate over stages within an episode
for stage in range(Ns):
# select an action eps-greedily. Note naive never changes its action/strategy (i.e. trying to swim up)
Qinput = Q1 + Q2 if method=="doubleQ" else Q
k = k + 1 # k-th update
eff_eps = eps0/k**omega if eps_decay else eps0 # decrease amount of exploration as time proceeds
if np.random.uniform(0, 1) < eff_eps:
action = smart.take_random_action()
action = smart.take_greedy_action(Qinput)
# record action and orientation on last episode
if ep == Ne - 1:
chosen_actions[stage] = action
theta_history[stage] = smart.theta
# record index of the prior state
old_s = state_lookup_table[smart.my_state]
# given selected action, update the state
naive.update_kinematics(Φ, Ψ, D0, Dr)
smart.update_kinematics(Φ, Ψ, D0, Dr)
smart.update_state() # only need to update smart particle since naive has ka = [0, 1] always
# calculate reward based on new state
new_s = state_lookup_table[smart.my_state]
state_action_counter[new_s,action] += 1
# employ learning rate decay if applicable
alpha = alpha0/(1+state_action_counter[old_s,action])**omega if lr_decay else alpha0
# update Q matrix
if method=="doubleQ":
if np.random.uniform(0, 1) < 0.5: # update Q1
if Q1[old_s, action] == L*Ns and RIC==True: # apply Reset of Initial Conditions (RIC)
Q1[old_s, action] = smart.r[stage]
Q1[old_s, action] = Q1[old_s, action] + alpha*(smart.r[stage] + \
else: # update Q2
if Q2[old_s, action] == L*Ns and RIC==True:
Q2[old_s, action] = smart.r[stage]
Q2[old_s, action] = Q2[old_s, action] + alpha*(smart.r[stage] + \
if method=="expSARSA":
# calculate V, the expected Q value for the next state-actio pair
V = 0
greedy_action = np.argmax(Q[new_s]) # would-be greedy action for new state
for new_action in range(N_actions):
pi = (1 - eff_eps) + eff_eps/N_actions if new_action == greedy_action else eff_eps/N_actions
V = V + pi*Q[new_s, new_action]
if Q[old_s, action] == L*Ns and RIC==True:
Q[old_s, action] = smart.r[stage]
Q[old_s, action] = Q[old_s, action] + alpha*(smart.r[stage] + gamma*V - Q[old_s,action])
if Q[old_s, action] == L*Ns and RIC==True:
Q[old_s, action] = smart.r[stage]
Q[old_s, action] = Q[old_s, action] + alpha*(smart.r[stage] + \
# store average Q for each episode to track convergence
avg_Q_history[ep] = avg_Q_history[ep] + Q1 + Q2 if method=="doubleQ" else avg_Q_history[ep] + Q
avg_Q_history[ep] = avg_Q_history[ep]/Ns
# calculate Rtot for this episode
R_tot_naive = np.sum(naive.r)
R_tot_smart = np.sum(smart.r)
# calculate learning gain for this episode
Σ[ep] = R_tot_smart/R_tot_naive - 1
hist_R_tot_smart[ep] = R_tot_smart
hist_R_tot_naive[ep] = R_tot_naive
# plot trajectory every so often
if ep%n_updates==0 or ep==Ne-1:
smart_history_X_total = np.array(smart.history_X_total)
naive_history_X_total = np.array(naive.history_X_total)
# save optimal policy
if ep==Ne-1:
filename = "Policies/Q_alpha_" + str(alpha).replace(".","d") + "_Ns_" + str(Ns) + "_Ne_" + str(Ne) + \
"_Φ_" + str(Φ).replace(".","d") + "_Ψ_" + str(Ψ).replace(".","d") + "_eps_" \
+ str(eff_eps).replace(".","d") + "_epsdecay_" + str(eps_decay)
if lr_decay: filename = filename + "_omega_" + str(omega)
if method=="doubleQ": filename = filename + "_" + str(method)
if RIC: filename = filename + "_RIC_" + str(RIC)
Qout = Q1 + Q2 if method=="doubleQ" else Q, Qout)
return Qout, Σ, smart, naive, hist_R_tot_smart, hist_R_tot_naive, smart_stored_histories, naive_stored_histories, \
state_action_counter, chosen_actions, avg_Q_history, initial_coords, theta_history
Q = np.random.rand(4, 2)
#Q = np.array([[1, 0],
#[2, 1],
#[0, 0],
#[1, 2]])
Ns = 5000
spinner = Swimmer(Ns, 1, 1)
traj = []
obstacles = spinner.generate_obstacles()
for i in range(Ns):
spinner.interaction_with_obstacles(obstacles, 2.5, 1, 1., 2.5e-4, 10000, 0.001)
action_index = spinner.take_greedy_action(Q)
#print("Mi estado", spinner.my_state)
#print("Valor de Wz después de tomar la acción:", spinner.W[2])
fig, ax= plt.subplots(1,1)
ax.plot(traj[::2], traj[1::2], '.')
ax.plot(obstacles[::2], obstacles[1::2], '.')
#comprobación de que W va cambiando