@ -20,7 +20,7 @@ from wand.image import Image as WImage
# sns.set(palette="husl",font_scale=1)
# %config InlineBackend.figure_format = 'retina'
import copy
np . random . seed ( 40 32 )
np . random . seed ( 486 32 )
#%load_ext line_profiler
@ -40,7 +40,7 @@ N_states = 4 # number of states - one for each coarse-grained degree of vorticit
N_actions = 2 # number of actions - one for each coarse-grained swimming direction
# numerical parameters
dt = 0.0001 # timestep size
#dt = 0.00001 # timestep size
@ -128,7 +128,7 @@ class Swimmer(Agent):
# Comprobamos si esta inicialmente esta dentro de un obstáculo
for i in range ( len ( self . obstacles ) / / 2 ) :
obstacle_position = np . array ( [ self . obstacles [ 2 * i ] , self . obstacles [ 2 * i + 1 ] , 0 ] )
if np . linalg . norm ( self . X - obstacle_position ) < 0.5 * self . sigma :
if np . linalg . norm ( self . X - obstacle_position ) < 0.8 * self . sigma :
valid_initial_position = False
break
@ -145,7 +145,7 @@ class Swimmer(Agent):
#distancia entre el swimmer y el obstáculo
self . R = np . random . uniform ( 0 , 2.5 , 1 )
print ( self . R )
#print(self.R )
# preferred swimming direction (equal to [1,0], [0,1], [-1,0], or [0,-1])
self . ka = np . array ( [ 0 , 1 ] )
@ -211,6 +211,7 @@ class Swimmer(Agent):
#actualizamos la posición del spinner
self . X [ : - 1 ] + = dr
self . X_total [ : - 1 ] + = dr
self . U = np . array ( [ dr [ 0 ] / dt , dr [ 1 ] / dt , 0 ] )
all_dists = np . empty ( len ( obstacles ) / / 2 )
@ -220,27 +221,47 @@ class Swimmer(Agent):
self . R = np . amin ( all_dists )
#comprobamos que el spinner siga dentro del box periódico
self . history_X_total . append ( self . X_total )
self . periodic_boundaries ( )
self . history_X . append ( self . X )
self . check_in_box ( )
def reinitialize ( self ) :
self . X = np . array ( [ np . random . uniform ( 0 , L ) , np . random . uniform ( 0 , L ) ] )
self . X_total = self . X
def reinitialize ( self , obstacles ) :
# absolute position. -inf. <= x_total < inf. and -inf. <= z_total < inf.
self . X = self . history_X [ 0 ]
self . X_total = self . history_X_total [ 0 ]
# particle orientation
self . theta = np . random . uniform ( 0 , 2 * np . pi ) # polar angle theta in the x-z plane
self . p = np . array ( [ np . cos ( self . theta ) , np . sin ( self . theta ) ] ) # p = [px, pz]^T # orientación del nadador
self . p = np . array ( [ np . cos ( self . theta ) , np . sin ( self . theta ) ] ) # p = [px, pz]^T
self . U = np . zeros ( 2 )
self . W = np . array ( [ 0 , 0 , 1 ] )
# translational and rotational velocity
self . U = np . zeros ( 3 , float )
self . W = np . array ( [ 0. , 0. , 1. ] ) #Velocidad angular aleatoria
#distancia entre el swimmer y el obstáculo
self . R = np . random . uniform ( 0 , 2.5 , 1 )
#print(self.R)
# preferred swimming direction (equal to [1,0], [0,1], [-1,0], or [0,-1])
self . ka = np . array ( [ 0 , 1 ] )
self . history_X = [ self . X ]
self . history_X_total = [ self . X_total ]
# history of local and global position. Only store information for this episode.
self . history_X . clear ( )
self . history_X_total . clear ( )
self . history_X . append ( self . X )
self . history_X_total . append ( self . X_total )
# local vorticity at the current location
_ , _ , self . w = tgv ( self . X [ 0 ] , self . X [ 1 ] )
self . R = np . random . uniform ( 0 , 2.5 )
# update coarse-grained state
self . update_state ( )
self . t = 0
self . obstacles = obstacles
def update_kinematics ( self , Φ , Ψ , D0 = 0 , Dr = 0 , int_method = " euler " ) : # Actualiza la posición y orientación del nadador según un método de integración especificado.
if int_method == " rk45 " :
@ -431,7 +452,7 @@ def tgv(x, z):
w = - np . cos ( x ) * np . cos ( z )
return ux , uz , w
def training ( alpha0 , Φ , Ψ , Ns = 4000 , Ne = 5000 , gamma = 0.999 , eps0 = 0.0 , D0 = 0 , Dr = 0 , n_updates = 1000 , \
def training ( alpha0 , kappa , alphaMAG , beta , gammaYUK , Pe , dt , ni , sigma , Ns = 4000 , Ne = 5000 , gamma = 0.999 , eps0 = 0.0 , n_updates = 1000 , \
RIC = False , method = " Qlearning " , lr_decay = None , omega = 0.85 , eps_decay = False , Qin = None ) :
# n_updates - how often to plot the trajectory undertaken by the particle during the learning process
# Ne - number of episodes
@ -467,9 +488,10 @@ def training(alpha0, Φ, Ψ, Ns=4000, Ne=5000, gamma=0.999, eps0=0.0, D0=0, Dr=0
state_action_counter = np . zeros ( ( N_states , N_actions ) )
# initialize a naive and a smart gyrotactic particle
naive = Swimmer ( Ns )
smart = Swimmer ( Ns )
naive = Swimmer ( Ns , ni , sigma )
smart = Swimmer ( Ns , ni , sigma )
naive . obstacles = smart . obstacles
obstacles = naive . obstacles
# initialize Q matrix to large value
if method == " doubleQ " :
Q1 = L * Ns * np . ones ( ( 4 , 2 ) )
@ -483,20 +505,21 @@ def training(alpha0, Φ, Ψ, Ns=4000, Ne=5000, gamma=0.999, eps0=0.0, D0=0, Dr=0
avg_Q_history = np . zeros ( ( Ne , 4 , 2 ) )
# store initial position and orientation for each episode
initial_coords = np . zeros ( ( Ne , 3 ) )
initial_coords = np . empty ( [ Ne , 3 ] , float )
for k in range ( Ne ) :
initial_coords [ k , : ] = smart . X
# iterate over episodes
k = 0
for ep in tqdm ( range ( Ne ) ) :
# assign random orientation and position
smart . reinitialize ( )
naive . reinitialize ( )
smart . reinitialize ( obstacles )
naive . reinitialize ( obstacles )
naive = copy . deepcopy ( smart ) # have naive and smart share initial conditions for visualization purposes
# store initialization
initial_coords [ ep , 0 : 2 ] = smart . X
initial_coords [ ep , 2 ] = smart . theta
initial_coords [ ep , 0 : 3 ] = smart . X
# save selected actions and particle orientation for last episodes
if ep == Ne - 1 :
@ -525,8 +548,8 @@ def training(alpha0, Φ, Ψ, Ns=4000, Ne=5000, gamma=0.999, eps0=0.0, D0=0, Dr=0
old_s = state_lookup_table [ smart . my_state ]
# given selected action, update the state
naive . update_kinematics ( Φ , Ψ , D0 , Dr )
smart . update_kinematics ( Φ , Ψ , D0 , Dr )
naive . interaction_with_obstacles ( naive . obstacles , kappa , alphaMAG , beta , gammaYUK , Pe , dt )
smart . interaction_with_obstacles ( smart . obstacles , kappa , alphaMAG , beta , gammaYUK , Pe , dt )
smart . update_state ( ) # only need to update smart particle since naive has ka = [0, 1] always
# calculate reward based on new state
@ -597,7 +620,7 @@ def training(alpha0, Φ, Ψ, Ns=4000, Ne=5000, gamma=0.999, eps0=0.0, D0=0, Dr=0
# save optimal policy
if ep == Ne - 1 :
filename = " Policies/Q_alpha_ " + str ( alpha ) . replace ( " . " , " d " ) + " _Ns_ " + str ( Ns ) + " _Ne_ " + str ( Ne ) + \
" _Φ _ " + str ( Φ ) . replace ( " . " , " d " ) + " _Ψ _ " + str ( Ψ ) . replace ( " . " , " d " ) + " _eps_ " \
" _sigma _ " + str ( sigma ) . replace ( " . " , " d " ) + " _Pe _ " + str ( Pe ) . replace ( " . " , " d " ) + " _eps_ " \
+ str ( eff_eps ) . replace ( " . " , " d " ) + " _epsdecay_ " + str ( eps_decay )
if lr_decay : filename = filename + " _omega_ " + str ( omega )
if method == " doubleQ " : filename = filename + " _ " + str ( method )
@ -606,7 +629,7 @@ def training(alpha0, Φ, Ψ, Ns=4000, Ne=5000, gamma=0.999, eps0=0.0, D0=0, Dr=0
np . save ( filename , Qout )
return Qout , Σ , smart , naive , hist_R_tot_smart , hist_R_tot_naive , smart_stored_histories , naive_stored_histories , \
state_action_counter , chosen_actions , avg_Q_history , initial_coords , theta_history
state_action_counter , chosen_actions , avg_Q_history , initial_coords , theta_history , obstacles
#Plot
Q = np . random . rand ( 4 , 2 )
@ -618,34 +641,49 @@ Q = np.random.rand(4, 2)
print ( Q )
Ns = 5 000
Ns = 10 000
spinner = Swimmer ( Ns , 1 , 1 )
traj = [ ]
obstacles = spinner . generate_obstacles ( )
for i in range ( Ns ) :
spinner . interaction_with_obstacles ( obstacles , 2.5 , 1 , 1. , 2.5e-4 , 10000 , 0.001 )
traj . append ( spinner . X [ 0 ] )
traj . append ( spinner . X [ 1 ] )
#obstacles = spinner.generate_obstacles( )
#for i in range(Ns) :
#spinner.interaction_with_obstacles(obstacles, 2.5, 1, 1., 2.5e-4, 10000, 0.00001 )
#traj.append(spinner.X[0] )
#traj.append(spinner.X[1] )
spinner . periodic_boundaries ( )
#spinner.periodic_boundaries( )
action_index = spinner . take_greedy_action ( Q )
#action_index = spinner.take_greedy_action(Q )
spinner . update_state ( )
#spinner.update_state( )
#print("Mi estado", spinner.my_state)
#print("Valor de Wz después de tomar la acción:", spinner.W[2])
my_alpha0 = 1.0
my_eps0 = 1.0
Ne = 20
stepsupdate = 2
Q , Σ , smart , naive , hist_R_tot_smart , hist_R_tot_naive , smart_stored_histories , naive_stored_histories , \
state_action_counter , chosen_actions , avg_Q_hist , initial_coords , theta_history , obstacles \
= training ( my_alpha0 , 2.5 , 1 , 1. , 2.5e-4 , 10000 , 0.00001 , 1. , 1. , Ns , Ne , 0.999 , 0.0 , stepsupdate )
#print(smart_stored_histories[1][1][3, :])
#print(len(smart_stored_histories), smart_stored_histories[0].shape)
fig , ax = plt . subplots ( 1 , 1 )
ax . plot ( traj [ : : 2 ] , traj [ 1 : : 2 ] , ' . ' )
ax . plot ( obstacles [ : : 2 ] , obstacles [ 1 : : 2 ] , ' . ' )
#ax.plot(np.array(traj[::2]) + L/8., np.array(traj[1::2]) + L/8., '.')
ax . plot ( np . array ( obstacles [ : : 2 ] ) + L / 8. , np . array ( obstacles [ 1 : : 2 ] ) + L / 8. , ' . ' )
for i in range ( 0 , stepsupdate , Ne ) :
ax . plot ( smart_stored_histories [ i ] [ 1 ] [ : , 0 ] , smart_stored_histories [ i ] [ 1 ] [ : , 1 ] , ' . ' , label = ' episode %d ' % i )
if i == Ne - 1 :
ax . plot ( naive_stored_histories [ i ] [ 1 ] [ : , 0 ] , naive_stored_histories [ i ] [ 1 ] [ : , 1 ] , ' . ' , label = ' naive spinner ' )
ax . set_aspect ( ' equal ' )
print ( obstacles [ : : 2 ] )
#ax.legend( )
plt . show ( )