commit
						3355e5e7a3
					
				@ -0,0 +1,396 @@ | 
				
			||||
import numpy as np | 
				
			||||
import plotly.graph_objects as go | 
				
			||||
from tqdm.notebook import tqdm | 
				
			||||
import plotly.express as px | 
				
			||||
import matplotlib as mpl | 
				
			||||
mpl.rcParams['figure.dpi'] = 300 | 
				
			||||
import matplotlib.pyplot as plt | 
				
			||||
import seaborn as sns | 
				
			||||
import os | 
				
			||||
from wand.image import Image as WImage | 
				
			||||
# sns.set(palette="husl",font_scale=1) | 
				
			||||
# %config InlineBackend.figure_format = 'retina' | 
				
			||||
import copy | 
				
			||||
np.random.seed(0) | 
				
			||||
%load_ext line_profiler | 
				
			||||
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
#Define constants | 
				
			||||
 | 
				
			||||
#L = 2*np.pi # periodic domain size | 
				
			||||
L=20 | 
				
			||||
# define boundaries of simulation box | 
				
			||||
x0 = 0 | 
				
			||||
x1 = L | 
				
			||||
z0 = 0 | 
				
			||||
z1 = L | 
				
			||||
 | 
				
			||||
# define reinforcement learning problem | 
				
			||||
N_states = 12 # number of states - one for each coarse-grained degree of vorticity | 
				
			||||
N_actions = 4 # number of actions - one for each coarse-grained swimming direction | 
				
			||||
 | 
				
			||||
# numerical parameters | 
				
			||||
dt = 0.01 # timestep size | 
				
			||||
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
#Utility functions | 
				
			||||
 | 
				
			||||
def moving_average(a, n=3) : | 
				
			||||
    ret = np.cumsum(a, dtype=float) | 
				
			||||
    ret[n:] = ret[n:] - ret[:-n] | 
				
			||||
    return ret[n - 1:] / n | 
				
			||||
 | 
				
			||||
# Runga-Kutta 4(5) integration for one step | 
				
			||||
    # see https://stackoverflow.com/questions/54494770/how-to-set-fixed-step-size-with-scipy-integrate | 
				
			||||
def DoPri45Step(f,t,x,h): | 
				
			||||
 | 
				
			||||
    k1 = f(t,x) | 
				
			||||
    k2 = f(t + 1./5*h, x + h*(1./5*k1) ) | 
				
			||||
    k3 = f(t + 3./10*h, x + h*(3./40*k1 + 9./40*k2) ) | 
				
			||||
    k4 = f(t + 4./5*h, x + h*(44./45*k1 - 56./15*k2 + 32./9*k3) ) | 
				
			||||
    k5 = f(t + 8./9*h, x + h*(19372./6561*k1 - 25360./2187*k2 + 64448./6561*k3 - 212./729*k4) ) | 
				
			||||
    k6 = f(t + h, x + h*(9017./3168*k1 - 355./33*k2 + 46732./5247*k3 + 49./176*k4 - 5103./18656*k5) ) | 
				
			||||
 | 
				
			||||
    v5 = 35./384*k1 + 500./1113*k3 + 125./192*k4 - 2187./6784*k5 + 11./84*k6 | 
				
			||||
    k7 = f(t + h, x + h*v5) | 
				
			||||
    v4 = 5179./57600*k1 + 7571./16695*k3 + 393./640*k4 - 92097./339200*k5 + 187./2100*k6 + 1./40*k7; | 
				
			||||
 | 
				
			||||
    return v4,v5 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
#Define useful data structures | 
				
			||||
#Define a dictionary of the possible states and their assigned indices | 
				
			||||
 | 
				
			||||
direction_states = ["right","down","left","up"] # coarse-grained directions | 
				
			||||
vort_states = ["w+", "w0", "w-"] # coarse-grained levels of vorticity | 
				
			||||
product_states = [(x,y) for x in direction_states for y in vort_states]  # all possible states | 
				
			||||
state_lookup_table = {product_states[i]:i for i in range(len(product_states))} # returns index of given state | 
				
			||||
# print(product_states) # to view mapping | 
				
			||||
 | 
				
			||||
#Define an agent class for reinforcement learning | 
				
			||||
 | 
				
			||||
class Agent: | 
				
			||||
    def __init__(self, Ns): | 
				
			||||
        self.r = np.zeros(Ns) # reward for each stage | 
				
			||||
        self.t = 0            # time | 
				
			||||
 | 
				
			||||
    # calculate reward given from entering a new state after a selected action is undertaken | 
				
			||||
    def calc_reward(self): | 
				
			||||
        # enforce implementation by subclass | 
				
			||||
        if self.__class__ == AbstractClass: | 
				
			||||
                raise NotImplementedError | 
				
			||||
 | 
				
			||||
    def update_state(self): | 
				
			||||
        # enforce implementation by subclass | 
				
			||||
        if self.__class__ == AbstractClass: | 
				
			||||
                raise NotImplementedError | 
				
			||||
 | 
				
			||||
    def take_random_action(self): | 
				
			||||
        # enforce implementation by subclass | 
				
			||||
        if self.__class__ == AbstractClass: | 
				
			||||
                raise NotImplementedError | 
				
			||||
 | 
				
			||||
    def take_greedy_action(self, Q): | 
				
			||||
        # enforce implementation by subclass | 
				
			||||
        if self.__class__ == AbstractClass: | 
				
			||||
                raise NotImplementedError | 
				
			||||
                 | 
				
			||||
#Define swimmer class derived from agent | 
				
			||||
 | 
				
			||||
class Swimmer(Agent): | 
				
			||||
    def __init__(self, Ns): | 
				
			||||
        # call init for superclass | 
				
			||||
        super().__init__(Ns) | 
				
			||||
 | 
				
			||||
        # local position within the periodic box. X = [x, z]^T with 0 <= x < 2 pi and 0 <= z < 2 pi | 
				
			||||
        self.X = np.array([np.random.uniform(0, L), np.random.uniform(0, L), 0]) | 
				
			||||
 | 
				
			||||
        # absolute position. -inf. <= x_total < inf. and -inf. <= z_total < inf. | 
				
			||||
        self.X_total = self.X | 
				
			||||
 | 
				
			||||
        # particle orientation | 
				
			||||
        self.theta = np.random.uniform(0, 2*np.pi) # polar angle theta in the x-z plane | 
				
			||||
        self.p = np.array([np.cos(self.theta), np.sin(self.theta)]) # p = [px, pz]^T | 
				
			||||
 | 
				
			||||
        # translational and rotational velocity | 
				
			||||
        self.U = np.zeros(3) | 
				
			||||
        self.W = np.array([0, 0, 1]) #Velocidad angular aleatoria | 
				
			||||
 | 
				
			||||
        # preferred swimming direction (equal to [1,0], [0,1], [-1,0], or [0,-1]) | 
				
			||||
        self.ka = np.array([0,1]) | 
				
			||||
 | 
				
			||||
        # history of local and global position. Only store information for this episode. | 
				
			||||
        self.history_X = [self.X] | 
				
			||||
        self.history_X_total = [self.X_total] | 
				
			||||
 | 
				
			||||
        # local vorticity at the current location | 
				
			||||
        _, _, self.w = tgv(self.X[0], self.X[1]) | 
				
			||||
 | 
				
			||||
        # update coarse-grained state | 
				
			||||
        self.update_state() | 
				
			||||
 | 
				
			||||
        #obstáculos | 
				
			||||
        self.obstacles= self.generate_obstacles() | 
				
			||||
 | 
				
			||||
    def generate_obstacles(self): | 
				
			||||
        obstacles=[] #el numero de obstáculos será 10*10 | 
				
			||||
        cell_spacing= L/30 | 
				
			||||
 | 
				
			||||
        for i in range(20): | 
				
			||||
          for j in range(20): | 
				
			||||
            obstacle_x= i + cell_spacing | 
				
			||||
            obstacle_z= j + cell_spacing | 
				
			||||
            obstacles.append(obstacle_x) | 
				
			||||
            obstacles.append(obstacle_z) | 
				
			||||
 | 
				
			||||
        return obstacles | 
				
			||||
 | 
				
			||||
    def interaction_with_obstacles(self,obstacles, sigma,ni,kappa,alpha,beta,gamma,Pe,dt): | 
				
			||||
      F= np.array([0.,0.,0.]) | 
				
			||||
      for i in range(len(obstacles)//2): | 
				
			||||
        #F1 | 
				
			||||
        obstacle_position = np.array([obstacles[2*i],obstacles[2*i+1], 0]) | 
				
			||||
        r=self.X - obstacle_position | 
				
			||||
        r_norm=np.linalg.norm(r) | 
				
			||||
        Re= sigma**2*np.linalg.norm(self.W)/ni | 
				
			||||
        S=1/(1+np.exp(-kappa*((Re/r_norm**3)-Re))) | 
				
			||||
        F1=alpha*(Re/r**3)*np.cross(self.U,self.W)*S | 
				
			||||
 | 
				
			||||
        #F2 | 
				
			||||
        F2=beta*np.cross(self.W,r)/r_norm**3 | 
				
			||||
 | 
				
			||||
        #F de atracción | 
				
			||||
        F_attr= gamma*(np.exp(-r_norm/kappa)/r_norm**2)*(kappa+r_norm)*r | 
				
			||||
 | 
				
			||||
 | 
				
			||||
        #Fuerza total | 
				
			||||
        F+=F1+F2+F_attr | 
				
			||||
 | 
				
			||||
      xi=np.random.normal(0,1, size=2) #vector de números aleatorios generados a partir de una distribución normal estándar con dos componentes, xi creo que es un vector de ruido estocástico (modela el ruido térmico) | 
				
			||||
      dr_therm = np.sqrt(2*sigma**2*dt/Pe)*xi | 
				
			||||
 | 
				
			||||
      dr = F[:-1]*dt + dr_therm | 
				
			||||
 | 
				
			||||
      #actualizamos la posición del spinner | 
				
			||||
      self.X[:-1] += dr | 
				
			||||
 | 
				
			||||
      #comprobamos que el spinner siga dentro del box periódico | 
				
			||||
      self.check_in_box() | 
				
			||||
 | 
				
			||||
 | 
				
			||||
    def reinitialize(self): | 
				
			||||
        self.X = np.array([np.random.uniform(0, L), np.random.uniform(0, L)]) | 
				
			||||
        self.X_total = self.X | 
				
			||||
 | 
				
			||||
        self.theta = np.random.uniform(0, 2*np.pi) # polar angle theta in the x-z plane | 
				
			||||
        self.p = np.array([np.cos(self.theta), np.sin(self.theta)]) # p = [px, pz]^T # orientación del nadador | 
				
			||||
 | 
				
			||||
        self.U = np.zeros(2) | 
				
			||||
        self.W = np.zeros(2) | 
				
			||||
 | 
				
			||||
        self.ka = np.array([0,1]) | 
				
			||||
 | 
				
			||||
        self.history_X = [self.X] | 
				
			||||
        self.history_X_total = [self.X_total] | 
				
			||||
 | 
				
			||||
        self.t = 0 | 
				
			||||
 | 
				
			||||
    def update_kinematics(self, Φ, Ψ, D0 = 0, Dr = 0, int_method = "euler"): # Actualiza la posición y orientación del nadador según un método de integración especificado. | 
				
			||||
        if int_method == "rk45": | 
				
			||||
            y0 = np.concatenate((self.X,self.p)) | 
				
			||||
            _, v5 = DoPri45Step(self.calc_velocity_rk45,self.t,y0,dt) | 
				
			||||
            y = y0 + dt*v5 | 
				
			||||
            self.X = y[:2] | 
				
			||||
            self.p = y[2:] | 
				
			||||
            dx = self.X - self.history_X[-1] | 
				
			||||
            self.X_total = self.X_total + dx | 
				
			||||
 | 
				
			||||
            # check if still in the periodic box | 
				
			||||
            self.check_in_box() | 
				
			||||
 | 
				
			||||
            # ensure the vector p has unit length | 
				
			||||
            self.p /= (self.p[0]**2 + self.p[1]**2)**(1/2) | 
				
			||||
 | 
				
			||||
            # update polar angle | 
				
			||||
            x = self.p[0] | 
				
			||||
            yy = self.p[1] | 
				
			||||
            self.theta = np.arctan2(yy,x) if yy >= 0 else (np.arctan2(yy,x) + 2*np.pi) | 
				
			||||
 | 
				
			||||
            # store positions | 
				
			||||
            self.history_X.append(self.X) | 
				
			||||
            self.history_X_total.append(self.X_total) | 
				
			||||
 | 
				
			||||
        elif int_method == "euler": | 
				
			||||
            # calculate new translational and rotational velocity | 
				
			||||
            self.calc_velocity(Φ, Ψ) | 
				
			||||
 | 
				
			||||
            self.update_position(int_method, D0) | 
				
			||||
            self.update_orientation(int_method, Dr) | 
				
			||||
        else: | 
				
			||||
            raise Exception("Integration method must be 'Euler' or 'rk45'") | 
				
			||||
 | 
				
			||||
        self.t = self.t + dt | 
				
			||||
 | 
				
			||||
    def calc_velocity_rk45(self, t, y): #calcula la velocidad del nadador en un determinado tiempo 't' y estado 'y' utilizando Rk45 | 
				
			||||
        x = y[0] | 
				
			||||
        z = y[1] | 
				
			||||
        px = y[2] | 
				
			||||
        pz = y[3] | 
				
			||||
        ux, uz, self.w = tgv(x, z) #tgv proporciona velocidades de flujo en la posición (x,z), w es la vorticidad | 
				
			||||
 | 
				
			||||
        #cálculo de las velocidades translacionales | 
				
			||||
        U0 = ux + Φ*px #ux y uz son las velocidades del flujo | 
				
			||||
        U1 = uz + Φ*pz | 
				
			||||
 | 
				
			||||
       #cálculo de las velocidades rotacionales | 
				
			||||
        ka_dot_p = self.ka[0]*px + self.ka[1]*pz #alineación del vector de nado preferido con la dirección del nadador | 
				
			||||
        W0 = 1/2/Ψ*(self.ka[0] - ka_dot_p*px) + 1/2*pz*self.w | 
				
			||||
        W1 = 1/2/Ψ*(self.ka[1] - ka_dot_p*pz) + 1/2*-px*self.w | 
				
			||||
 | 
				
			||||
        return np.array([U0, U1, W0, W1]) | 
				
			||||
 | 
				
			||||
 | 
				
			||||
    def update_position(self, int_method, D0): #D0 representa la difusión | 
				
			||||
        # use explicit euler to update | 
				
			||||
        dx = dt*self.U | 
				
			||||
        if D0 > 0: dx = dx + np.sqrt(2*D0*dt)*np.random.normal(size=2) #posible efecto de la difusión browniana | 
				
			||||
        self.X = self.X + dx | 
				
			||||
        self.X_total = self.X_total + dx | 
				
			||||
 | 
				
			||||
        # check if still in the periodic box | 
				
			||||
        self.check_in_box() | 
				
			||||
 | 
				
			||||
        # store positions | 
				
			||||
        self.history_X.append(self.X) | 
				
			||||
        self.history_X_total.append(self.X_total) | 
				
			||||
 | 
				
			||||
 | 
				
			||||
    def update_orientation(self, int_method, Dr): | 
				
			||||
        self.p = self.p + dt*self.W #W velocidad angular | 
				
			||||
 | 
				
			||||
        # ensure the vector p has unit length | 
				
			||||
        self.p /= (self.p[0]**2 + self.p[1]**2)**(1/2) | 
				
			||||
 | 
				
			||||
        # if rotational diffusion is present | 
				
			||||
        if Dr > 0: #Dr representa difucion rotacional | 
				
			||||
            px = self.p[0] | 
				
			||||
            pz = self.p[1] | 
				
			||||
            cross = px*pz | 
				
			||||
            A = np.array([[1-px**2, -cross], [-cross, 1-pz**2]]) #A es una matriz | 
				
			||||
            v = np.sqrt(2*Dr*dt)*np.random.normal(size=2) #v es un vector de valores aleatorios | 
				
			||||
            self.p[0] = self.p[0] + A[0,0]*v[0] + A[0,1]*v[1] #Se calcula un cambio aleatorio en la orientación usando A y v | 
				
			||||
            self.p[1] = self.p[1] + A[1,0]*v[0] + A[1,1]*v[1] | 
				
			||||
            self.p /= (self.p[0]**2 + self.p[1]**2)**(1/2) | 
				
			||||
 | 
				
			||||
        # update polar angle | 
				
			||||
        x = self.p[0] | 
				
			||||
        y = self.p[1] | 
				
			||||
        self.theta = np.arctan2(y,x) if y >= 0 else (np.arctan2(y,x) + 2*np.pi) | 
				
			||||
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
    def calc_velocity(self, Φ, Ψ): | 
				
			||||
        ux, uz, self.w = tgv(self.X[0], self.X[1]) | 
				
			||||
 | 
				
			||||
        # careful - computing in the following way is significantly slower: self.U = np.array(ux, uz) + Φ*self.p | 
				
			||||
        self.U[0] = ux + Φ*self.p[0] | 
				
			||||
        self.U[1] = uz + Φ*self.p[1] | 
				
			||||
 | 
				
			||||
        px = self.p[0] | 
				
			||||
        pz = self.p[1] | 
				
			||||
        ka_dot_p = self.ka[0]*px + self.ka[1]*pz | 
				
			||||
        self.W[0] = 1/2/Ψ*(self.ka[0] - ka_dot_p*px) + 1/2*pz*self.w | 
				
			||||
        self.W[1] = 1/2/Ψ*(self.ka[1] - ka_dot_p*pz) + 1/2*-px*self.w | 
				
			||||
 | 
				
			||||
 | 
				
			||||
    def check_in_box(self): # Este método verifica si el nadador todavía está dentro del cuadro periódico | 
				
			||||
        if self.X[0] < x0: | 
				
			||||
            self.X[0] += L | 
				
			||||
        elif self.X[0] > x1: | 
				
			||||
            self.X[0] -= L | 
				
			||||
        if self.X[1] < z0: | 
				
			||||
            self.X[1] += L | 
				
			||||
        elif self.X[1] > z1: | 
				
			||||
            self.X[1] -= L | 
				
			||||
 | 
				
			||||
    def calc_reward(self, n): | 
				
			||||
        self.r[n] = self.history_X_total[-1][1]-self.history_X_total[-2][1] | 
				
			||||
 | 
				
			||||
    def update_state(self): | 
				
			||||
        if self.w < -0.33: | 
				
			||||
            w_state = "w-" | 
				
			||||
        elif self.w >= -0.33 and self.w <= 0.33: | 
				
			||||
            w_state = "w0" | 
				
			||||
        elif self.w > 0.33: | 
				
			||||
            w_state = "w+" | 
				
			||||
        else: | 
				
			||||
            raise Exception("Invalid value of w detected: ", w) | 
				
			||||
 | 
				
			||||
        if self.theta >= np.pi/4 and self.theta < 3*np.pi/4: | 
				
			||||
            p_state = "up" | 
				
			||||
        elif self.theta >= 3*np.pi/4 and self.theta < 5*np.pi/4: | 
				
			||||
            p_state = "left" | 
				
			||||
        elif self.theta >= 5*np.pi/4 and self.theta < 7*np.pi/4: | 
				
			||||
            p_state = "down" | 
				
			||||
        elif (self.theta >= 7*np.pi/4 and self.theta <= 2*np.pi) or (self.theta >= 0 and self.theta < np.pi/4): | 
				
			||||
            p_state = "right" | 
				
			||||
        else: | 
				
			||||
            raise Exception("Invalid value of theta detected: ", theta) | 
				
			||||
 | 
				
			||||
        self.my_state = (p_state, w_state) | 
				
			||||
 | 
				
			||||
    def take_greedy_action(self, Q): | 
				
			||||
        state_index = state_lookup_table[self.my_state] | 
				
			||||
        action_index = np.argmax(Q[state_index])  # find largest entry in this row of Q (i.e. this state) | 
				
			||||
        if action_index == 0:   # up | 
				
			||||
            self.ka = [0, 1] | 
				
			||||
        elif action_index == 1: # down | 
				
			||||
            self.ka = [0, -1] | 
				
			||||
        elif action_index == 2: # right | 
				
			||||
            self.ka = [1, 0] | 
				
			||||
        else:                   # left | 
				
			||||
            self.ka = [-1, 0] | 
				
			||||
        return action_index | 
				
			||||
 | 
				
			||||
    def take_random_action(self): | 
				
			||||
        action_index = np.random.randint(4) | 
				
			||||
        if action_index == 0:   # up | 
				
			||||
            self.ka = [0, 1] | 
				
			||||
        elif action_index == 1: # down | 
				
			||||
            self.ka = [0, -1] | 
				
			||||
        elif action_index == 2: # right | 
				
			||||
            self.ka = [1, 0] | 
				
			||||
        else:                   # left | 
				
			||||
            self.ka = [-1, 0] | 
				
			||||
        return action_index | 
				
			||||
     | 
				
			||||
  | 
				
			||||
#Define Taylor-Green vortex | 
				
			||||
 | 
				
			||||
# given position, return local velocity and vorticity | 
				
			||||
def tgv(x, z): | 
				
			||||
    ux = -1/2*np.cos(x)*np.sin(z) | 
				
			||||
    uz = 1/2*np.sin(x)*np.cos(z) | 
				
			||||
    w = -np.cos(x)*np.cos(z) | 
				
			||||
    return ux, uz, w | 
				
			||||
 | 
				
			||||
 | 
				
			||||
#Plot | 
				
			||||
 | 
				
			||||
Ns = 5000 | 
				
			||||
spinner = Swimmer(Ns) | 
				
			||||
traj = [] | 
				
			||||
obstacles = spinner.generate_obstacles() | 
				
			||||
for i in range(Ns): | 
				
			||||
  spinner.interaction_with_obstacles(obstacles, 1, 1e-6, 2.5, 1, 1., 2.5e-4, 10000, 0.001) | 
				
			||||
  traj.append(spinner.X[0]) | 
				
			||||
  traj.append(spinner.X[1]) | 
				
			||||
 | 
				
			||||
fig, ax= plt.subplots(1,1) | 
				
			||||
ax.plot(traj[::2], traj[1::2]) | 
				
			||||
ax.plot(obstacles[::2], obstacles[1::2], '.') | 
				
			||||
print(obstacles[::2]) | 
				
			||||
plt.show() | 
				
			||||
					Loading…
					
					
				
		Reference in new issue