You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							396 lines
						
					
					
						
							13 KiB
						
					
					
				
			
		
		
	
	
							396 lines
						
					
					
						
							13 KiB
						
					
					
				| import numpy as np
 | |
| import plotly.graph_objects as go
 | |
| from tqdm.notebook import tqdm
 | |
| import plotly.express as px
 | |
| import matplotlib as mpl
 | |
| mpl.rcParams['figure.dpi'] = 300
 | |
| import matplotlib.pyplot as plt
 | |
| import seaborn as sns
 | |
| import os
 | |
| from wand.image import Image as WImage
 | |
| # sns.set(palette="husl",font_scale=1)
 | |
| # %config InlineBackend.figure_format = 'retina'
 | |
| import copy
 | |
| np.random.seed(0)
 | |
| %load_ext line_profiler
 | |
| 
 | |
| 
 | |
| 
 | |
| #Define constants
 | |
| 
 | |
| #L = 2*np.pi # periodic domain size
 | |
| L=20
 | |
| # define boundaries of simulation box
 | |
| x0 = 0
 | |
| x1 = L
 | |
| z0 = 0
 | |
| z1 = L
 | |
| 
 | |
| # define reinforcement learning problem
 | |
| N_states = 12 # number of states - one for each coarse-grained degree of vorticity
 | |
| N_actions = 4 # number of actions - one for each coarse-grained swimming direction
 | |
| 
 | |
| # numerical parameters
 | |
| dt = 0.01 # timestep size
 | |
| 
 | |
| 
 | |
| 
 | |
| #Utility functions
 | |
| 
 | |
| def moving_average(a, n=3) :
 | |
|     ret = np.cumsum(a, dtype=float)
 | |
|     ret[n:] = ret[n:] - ret[:-n]
 | |
|     return ret[n - 1:] / n
 | |
| 
 | |
| # Runga-Kutta 4(5) integration for one step
 | |
|     # see https://stackoverflow.com/questions/54494770/how-to-set-fixed-step-size-with-scipy-integrate
 | |
| def DoPri45Step(f,t,x,h):
 | |
| 
 | |
|     k1 = f(t,x)
 | |
|     k2 = f(t + 1./5*h, x + h*(1./5*k1) )
 | |
|     k3 = f(t + 3./10*h, x + h*(3./40*k1 + 9./40*k2) )
 | |
|     k4 = f(t + 4./5*h, x + h*(44./45*k1 - 56./15*k2 + 32./9*k3) )
 | |
|     k5 = f(t + 8./9*h, x + h*(19372./6561*k1 - 25360./2187*k2 + 64448./6561*k3 - 212./729*k4) )
 | |
|     k6 = f(t + h, x + h*(9017./3168*k1 - 355./33*k2 + 46732./5247*k3 + 49./176*k4 - 5103./18656*k5) )
 | |
| 
 | |
|     v5 = 35./384*k1 + 500./1113*k3 + 125./192*k4 - 2187./6784*k5 + 11./84*k6
 | |
|     k7 = f(t + h, x + h*v5)
 | |
|     v4 = 5179./57600*k1 + 7571./16695*k3 + 393./640*k4 - 92097./339200*k5 + 187./2100*k6 + 1./40*k7;
 | |
| 
 | |
|     return v4,v5
 | |
| 
 | |
| 
 | |
| 
 | |
| #Define useful data structures
 | |
| #Define a dictionary of the possible states and their assigned indices
 | |
| 
 | |
| direction_states = ["right","down","left","up"] # coarse-grained directions
 | |
| vort_states = ["w+", "w0", "w-"] # coarse-grained levels of vorticity
 | |
| product_states = [(x,y) for x in direction_states for y in vort_states]  # all possible states
 | |
| state_lookup_table = {product_states[i]:i for i in range(len(product_states))} # returns index of given state
 | |
| # print(product_states) # to view mapping
 | |
| 
 | |
| #Define an agent class for reinforcement learning
 | |
| 
 | |
| class Agent:
 | |
|     def __init__(self, Ns):
 | |
|         self.r = np.zeros(Ns) # reward for each stage
 | |
|         self.t = 0            # time
 | |
| 
 | |
|     # calculate reward given from entering a new state after a selected action is undertaken
 | |
|     def calc_reward(self):
 | |
|         # enforce implementation by subclass
 | |
|         if self.__class__ == AbstractClass:
 | |
|                 raise NotImplementedError
 | |
| 
 | |
|     def update_state(self):
 | |
|         # enforce implementation by subclass
 | |
|         if self.__class__ == AbstractClass:
 | |
|                 raise NotImplementedError
 | |
| 
 | |
|     def take_random_action(self):
 | |
|         # enforce implementation by subclass
 | |
|         if self.__class__ == AbstractClass:
 | |
|                 raise NotImplementedError
 | |
| 
 | |
|     def take_greedy_action(self, Q):
 | |
|         # enforce implementation by subclass
 | |
|         if self.__class__ == AbstractClass:
 | |
|                 raise NotImplementedError
 | |
|                 
 | |
| #Define swimmer class derived from agent
 | |
| 
 | |
| class Swimmer(Agent):
 | |
|     def __init__(self, Ns):
 | |
|         # call init for superclass
 | |
|         super().__init__(Ns)
 | |
| 
 | |
|         # local position within the periodic box. X = [x, z]^T with 0 <= x < 2 pi and 0 <= z < 2 pi
 | |
|         self.X = np.array([np.random.uniform(0, L), np.random.uniform(0, L), 0])
 | |
| 
 | |
|         # absolute position. -inf. <= x_total < inf. and -inf. <= z_total < inf.
 | |
|         self.X_total = self.X
 | |
| 
 | |
|         # particle orientation
 | |
|         self.theta = np.random.uniform(0, 2*np.pi) # polar angle theta in the x-z plane
 | |
|         self.p = np.array([np.cos(self.theta), np.sin(self.theta)]) # p = [px, pz]^T
 | |
| 
 | |
|         # translational and rotational velocity
 | |
|         self.U = np.zeros(3)
 | |
|         self.W = np.array([0, 0, 1]) #Velocidad angular aleatoria
 | |
| 
 | |
|         # preferred swimming direction (equal to [1,0], [0,1], [-1,0], or [0,-1])
 | |
|         self.ka = np.array([0,1])
 | |
| 
 | |
|         # history of local and global position. Only store information for this episode.
 | |
|         self.history_X = [self.X]
 | |
|         self.history_X_total = [self.X_total]
 | |
| 
 | |
|         # local vorticity at the current location
 | |
|         _, _, self.w = tgv(self.X[0], self.X[1])
 | |
| 
 | |
|         # update coarse-grained state
 | |
|         self.update_state()
 | |
| 
 | |
|         #obstáculos
 | |
|         self.obstacles= self.generate_obstacles()
 | |
| 
 | |
|     def generate_obstacles(self):
 | |
|         obstacles=[] #el numero de obstáculos será 10*10
 | |
|         cell_spacing= L/30
 | |
| 
 | |
|         for i in range(20):
 | |
|           for j in range(20):
 | |
|             obstacle_x= i + cell_spacing
 | |
|             obstacle_z= j + cell_spacing
 | |
|             obstacles.append(obstacle_x)
 | |
|             obstacles.append(obstacle_z)
 | |
| 
 | |
|         return obstacles
 | |
| 
 | |
|     def interaction_with_obstacles(self,obstacles, sigma,ni,kappa,alpha,beta,gamma,Pe,dt):
 | |
|       F= np.array([0.,0.,0.])
 | |
|       for i in range(len(obstacles)//2):
 | |
|         #F1
 | |
|         obstacle_position = np.array([obstacles[2*i],obstacles[2*i+1], 0])
 | |
|         r=self.X - obstacle_position
 | |
|         r_norm=np.linalg.norm(r)
 | |
|         Re= sigma**2*np.linalg.norm(self.W)/ni
 | |
|         S=1/(1+np.exp(-kappa*((Re/r_norm**3)-Re)))
 | |
|         F1=alpha*(Re/r**3)*np.cross(self.U,self.W)*S
 | |
| 
 | |
|         #F2
 | |
|         F2=beta*np.cross(self.W,r)/r_norm**3
 | |
| 
 | |
|         #F de atracción
 | |
|         F_attr= gamma*(np.exp(-r_norm/kappa)/r_norm**2)*(kappa+r_norm)*r
 | |
| 
 | |
| 
 | |
|         #Fuerza total
 | |
|         F+=F1+F2+F_attr
 | |
| 
 | |
|       xi=np.random.normal(0,1, size=2) #vector de números aleatorios generados a partir de una distribución normal estándar con dos componentes, xi creo que es un vector de ruido estocástico (modela el ruido térmico)
 | |
|       dr_therm = np.sqrt(2*sigma**2*dt/Pe)*xi
 | |
| 
 | |
|       dr = F[:-1]*dt + dr_therm
 | |
| 
 | |
|       #actualizamos la posición del spinner
 | |
|       self.X[:-1] += dr
 | |
| 
 | |
|       #comprobamos que el spinner siga dentro del box periódico
 | |
|       self.check_in_box()
 | |
| 
 | |
| 
 | |
|     def reinitialize(self):
 | |
|         self.X = np.array([np.random.uniform(0, L), np.random.uniform(0, L)])
 | |
|         self.X_total = self.X
 | |
| 
 | |
|         self.theta = np.random.uniform(0, 2*np.pi) # polar angle theta in the x-z plane
 | |
|         self.p = np.array([np.cos(self.theta), np.sin(self.theta)]) # p = [px, pz]^T # orientación del nadador
 | |
| 
 | |
|         self.U = np.zeros(2)
 | |
|         self.W = np.zeros(2)
 | |
| 
 | |
|         self.ka = np.array([0,1])
 | |
| 
 | |
|         self.history_X = [self.X]
 | |
|         self.history_X_total = [self.X_total]
 | |
| 
 | |
|         self.t = 0
 | |
| 
 | |
|     def update_kinematics(self, Φ, Ψ, D0 = 0, Dr = 0, int_method = "euler"): # Actualiza la posición y orientación del nadador según un método de integración especificado.
 | |
|         if int_method == "rk45":
 | |
|             y0 = np.concatenate((self.X,self.p))
 | |
|             _, v5 = DoPri45Step(self.calc_velocity_rk45,self.t,y0,dt)
 | |
|             y = y0 + dt*v5
 | |
|             self.X = y[:2]
 | |
|             self.p = y[2:]
 | |
|             dx = self.X - self.history_X[-1]
 | |
|             self.X_total = self.X_total + dx
 | |
| 
 | |
|             # check if still in the periodic box
 | |
|             self.check_in_box()
 | |
| 
 | |
|             # ensure the vector p has unit length
 | |
|             self.p /= (self.p[0]**2 + self.p[1]**2)**(1/2)
 | |
| 
 | |
|             # update polar angle
 | |
|             x = self.p[0]
 | |
|             yy = self.p[1]
 | |
|             self.theta = np.arctan2(yy,x) if yy >= 0 else (np.arctan2(yy,x) + 2*np.pi)
 | |
| 
 | |
|             # store positions
 | |
|             self.history_X.append(self.X)
 | |
|             self.history_X_total.append(self.X_total)
 | |
| 
 | |
|         elif int_method == "euler":
 | |
|             # calculate new translational and rotational velocity
 | |
|             self.calc_velocity(Φ, Ψ)
 | |
| 
 | |
|             self.update_position(int_method, D0)
 | |
|             self.update_orientation(int_method, Dr)
 | |
|         else:
 | |
|             raise Exception("Integration method must be 'Euler' or 'rk45'")
 | |
| 
 | |
|         self.t = self.t + dt
 | |
| 
 | |
|     def calc_velocity_rk45(self, t, y): #calcula la velocidad del nadador en un determinado tiempo 't' y estado 'y' utilizando Rk45
 | |
|         x = y[0]
 | |
|         z = y[1]
 | |
|         px = y[2]
 | |
|         pz = y[3]
 | |
|         ux, uz, self.w = tgv(x, z) #tgv proporciona velocidades de flujo en la posición (x,z), w es la vorticidad
 | |
| 
 | |
|         #cálculo de las velocidades translacionales
 | |
|         U0 = ux + Φ*px #ux y uz son las velocidades del flujo
 | |
|         U1 = uz + Φ*pz
 | |
| 
 | |
|        #cálculo de las velocidades rotacionales
 | |
|         ka_dot_p = self.ka[0]*px + self.ka[1]*pz #alineación del vector de nado preferido con la dirección del nadador
 | |
|         W0 = 1/2/Ψ*(self.ka[0] - ka_dot_p*px) + 1/2*pz*self.w
 | |
|         W1 = 1/2/Ψ*(self.ka[1] - ka_dot_p*pz) + 1/2*-px*self.w
 | |
| 
 | |
|         return np.array([U0, U1, W0, W1])
 | |
| 
 | |
| 
 | |
|     def update_position(self, int_method, D0): #D0 representa la difusión
 | |
|         # use explicit euler to update
 | |
|         dx = dt*self.U
 | |
|         if D0 > 0: dx = dx + np.sqrt(2*D0*dt)*np.random.normal(size=2) #posible efecto de la difusión browniana
 | |
|         self.X = self.X + dx
 | |
|         self.X_total = self.X_total + dx
 | |
| 
 | |
|         # check if still in the periodic box
 | |
|         self.check_in_box()
 | |
| 
 | |
|         # store positions
 | |
|         self.history_X.append(self.X)
 | |
|         self.history_X_total.append(self.X_total)
 | |
| 
 | |
| 
 | |
|     def update_orientation(self, int_method, Dr):
 | |
|         self.p = self.p + dt*self.W #W velocidad angular
 | |
| 
 | |
|         # ensure the vector p has unit length
 | |
|         self.p /= (self.p[0]**2 + self.p[1]**2)**(1/2)
 | |
| 
 | |
|         # if rotational diffusion is present
 | |
|         if Dr > 0: #Dr representa difucion rotacional
 | |
|             px = self.p[0]
 | |
|             pz = self.p[1]
 | |
|             cross = px*pz
 | |
|             A = np.array([[1-px**2, -cross], [-cross, 1-pz**2]]) #A es una matriz
 | |
|             v = np.sqrt(2*Dr*dt)*np.random.normal(size=2) #v es un vector de valores aleatorios
 | |
|             self.p[0] = self.p[0] + A[0,0]*v[0] + A[0,1]*v[1] #Se calcula un cambio aleatorio en la orientación usando A y v
 | |
|             self.p[1] = self.p[1] + A[1,0]*v[0] + A[1,1]*v[1]
 | |
|             self.p /= (self.p[0]**2 + self.p[1]**2)**(1/2)
 | |
| 
 | |
|         # update polar angle
 | |
|         x = self.p[0]
 | |
|         y = self.p[1]
 | |
|         self.theta = np.arctan2(y,x) if y >= 0 else (np.arctan2(y,x) + 2*np.pi)
 | |
| 
 | |
| 
 | |
| 
 | |
|     def calc_velocity(self, Φ, Ψ):
 | |
|         ux, uz, self.w = tgv(self.X[0], self.X[1])
 | |
| 
 | |
|         # careful - computing in the following way is significantly slower: self.U = np.array(ux, uz) + Φ*self.p
 | |
|         self.U[0] = ux + Φ*self.p[0]
 | |
|         self.U[1] = uz + Φ*self.p[1]
 | |
| 
 | |
|         px = self.p[0]
 | |
|         pz = self.p[1]
 | |
|         ka_dot_p = self.ka[0]*px + self.ka[1]*pz
 | |
|         self.W[0] = 1/2/Ψ*(self.ka[0] - ka_dot_p*px) + 1/2*pz*self.w
 | |
|         self.W[1] = 1/2/Ψ*(self.ka[1] - ka_dot_p*pz) + 1/2*-px*self.w
 | |
| 
 | |
| 
 | |
|     def check_in_box(self): # Este método verifica si el nadador todavía está dentro del cuadro periódico
 | |
|         if self.X[0] < x0:
 | |
|             self.X[0] += L
 | |
|         elif self.X[0] > x1:
 | |
|             self.X[0] -= L
 | |
|         if self.X[1] < z0:
 | |
|             self.X[1] += L
 | |
|         elif self.X[1] > z1:
 | |
|             self.X[1] -= L
 | |
| 
 | |
|     def calc_reward(self, n):
 | |
|         self.r[n] = self.history_X_total[-1][1]-self.history_X_total[-2][1]
 | |
| 
 | |
|     def update_state(self):
 | |
|         if self.w < -0.33:
 | |
|             w_state = "w-"
 | |
|         elif self.w >= -0.33 and self.w <= 0.33:
 | |
|             w_state = "w0"
 | |
|         elif self.w > 0.33:
 | |
|             w_state = "w+"
 | |
|         else:
 | |
|             raise Exception("Invalid value of w detected: ", w)
 | |
| 
 | |
|         if self.theta >= np.pi/4 and self.theta < 3*np.pi/4:
 | |
|             p_state = "up"
 | |
|         elif self.theta >= 3*np.pi/4 and self.theta < 5*np.pi/4:
 | |
|             p_state = "left"
 | |
|         elif self.theta >= 5*np.pi/4 and self.theta < 7*np.pi/4:
 | |
|             p_state = "down"
 | |
|         elif (self.theta >= 7*np.pi/4 and self.theta <= 2*np.pi) or (self.theta >= 0 and self.theta < np.pi/4):
 | |
|             p_state = "right"
 | |
|         else:
 | |
|             raise Exception("Invalid value of theta detected: ", theta)
 | |
| 
 | |
|         self.my_state = (p_state, w_state)
 | |
| 
 | |
|     def take_greedy_action(self, Q):
 | |
|         state_index = state_lookup_table[self.my_state]
 | |
|         action_index = np.argmax(Q[state_index])  # find largest entry in this row of Q (i.e. this state)
 | |
|         if action_index == 0:   # up
 | |
|             self.ka = [0, 1]
 | |
|         elif action_index == 1: # down
 | |
|             self.ka = [0, -1]
 | |
|         elif action_index == 2: # right
 | |
|             self.ka = [1, 0]
 | |
|         else:                   # left
 | |
|             self.ka = [-1, 0]
 | |
|         return action_index
 | |
| 
 | |
|     def take_random_action(self):
 | |
|         action_index = np.random.randint(4)
 | |
|         if action_index == 0:   # up
 | |
|             self.ka = [0, 1]
 | |
|         elif action_index == 1: # down
 | |
|             self.ka = [0, -1]
 | |
|         elif action_index == 2: # right
 | |
|             self.ka = [1, 0]
 | |
|         else:                   # left
 | |
|             self.ka = [-1, 0]
 | |
|         return action_index
 | |
|     
 | |
|  
 | |
| #Define Taylor-Green vortex
 | |
| 
 | |
| # given position, return local velocity and vorticity
 | |
| def tgv(x, z):
 | |
|     ux = -1/2*np.cos(x)*np.sin(z)
 | |
|     uz = 1/2*np.sin(x)*np.cos(z)
 | |
|     w = -np.cos(x)*np.cos(z)
 | |
|     return ux, uz, w
 | |
| 
 | |
| 
 | |
| #Plot
 | |
| 
 | |
| Ns = 5000
 | |
| spinner = Swimmer(Ns)
 | |
| traj = []
 | |
| obstacles = spinner.generate_obstacles()
 | |
| for i in range(Ns):
 | |
|   spinner.interaction_with_obstacles(obstacles, 1, 1e-6, 2.5, 1, 1., 2.5e-4, 10000, 0.001)
 | |
|   traj.append(spinner.X[0])
 | |
|   traj.append(spinner.X[1])
 | |
| 
 | |
| fig, ax= plt.subplots(1,1)
 | |
| ax.plot(traj[::2], traj[1::2])
 | |
| ax.plot(obstacles[::2], obstacles[1::2], '.')
 | |
| print(obstacles[::2])
 | |
| plt.show()
 | |
| 
 |