Source code for simulation.fish

# Copyright 2025 Asaph Zylbertal & Sam Pink
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import pymunk
from .constants import MEDIUM_MASS, PHYS_DAMP, PIXELS_PER_MM
from .eye import Eye

[docs] class Fish: """ A class representing a fish agent in a 2D physics-based simulation environment. The Fish class models a fish with physical properties (body, head, mouth, tail), visual perception through two eyes, energy management, and action execution capabilities. It uses the pymunk physics engine for realistic movement and collision detection. Attributes: max_uv_range (float): Maximum range for UV light perception. env_variables (dict): Dictionary containing environment configuration parameters. body (pymunk.Body): The physics body representing the fish. rng (numpy.random.Generator): Random number generator for stochastic actions. actions (list): List of available actions the fish can perform. num_actions (int): Number of available actions. distance_to_impulse_factor (float): Conversion factor from distance to physics impulse. mouth (pymunk.Circle): Circular collision shape representing the fish's mouth. head (pymunk.Circle): Circular collision shape representing the fish's head. tail (pymunk.Poly): Polygonal collision shape representing the fish's tail. verg_angle (float): Vergence angle between the two eyes in radians. retinal_field (float): Visual field angle for each eye in radians. conv_state (int): Current convergence state of the eyes. left_eye (Eye): Left eye object for visual perception. right_eye (Eye): Right eye object for visual perception. hungry (float): Hunger level of the fish. prey_consumed (bool): Whether prey was consumed in the current step. touched_edge (bool): Whether the fish has touched the environment edge. touched_predator (bool): Whether the fish has touched a predator. making_capture (bool): Whether the fish is currently performing a capture action. capture_possible (bool): Whether a capture is currently possible. prev_action_distance (float): Distance component of the previous action. prev_action_angle (float): Angle component of the previous action. prev_action (int): Index of the previously executed action. energy_level (float): Current energy level of the fish (0.0 to 1.0). d_scaling_energy_cost (float): Energy cost scaling factor for distance-based movement. a_scaling_energy_cost (float): Energy cost scaling factor for angular movement. baseline_energy_use (float): Baseline energy consumption per time step. action_energy_reward_scaling (float): Scaling factor for energy-based rewards. touched_edge_this_step (bool): Whether the fish touched the edge in the current step. deterministic_action (bool): Whether to use deterministic (mean) action values. - The fish uses a physics simulation with configurable mass, inertia, and damping. - Collision types are set for different body parts to enable specific interactions. - Energy management includes consumption tracking and rewards based on energy efficiency. - The fish can operate in test modes (sensory system testing, paralysis) for debugging. """ def __init__(self, env_variables, max_uv_range, rng, actions): # For the purpose of producing a calibration curve. fish_inertia = pymunk.moment_for_circle(MEDIUM_MASS, 0, env_variables['fish_head_radius'], (0, 0)) self.max_uv_range = max_uv_range self.env_variables = env_variables self.body = pymunk.Body(MEDIUM_MASS, fish_inertia) self.rng = rng self.actions = actions self.num_actions = len(actions) # From mm, should be distance * pixels_per_mm * mass * (1-dampening^dt) / dt phys_dt = self.env_variables['sim_step_duration_seconds'] / self.env_variables['phys_steps_per_sim_step'] self.distance_to_impulse_factor = PIXELS_PER_MM * MEDIUM_MASS * (1 - PHYS_DAMP ** phys_dt) / phys_dt # Mouth self.mouth = pymunk.Circle(self.body, env_variables['fish_mouth_radius'], offset=(0, 0)) self.mouth.elasticity = 1.0 self.mouth.collision_type = 3 # Head self.head = pymunk.Circle(self.body, env_variables['fish_head_radius'], offset=(-env_variables['fish_head_radius'], 0)) self.head.elasticity = 1.0 self.head.collision_type = 6 # # Tail tail_coordinates = ((-env_variables['fish_head_radius'], 0), (-env_variables['fish_head_radius'], - env_variables['fish_head_radius']), (-env_variables['fish_head_radius'] - env_variables['fish_tail_length'], 0), (-env_variables['fish_head_radius'], env_variables['fish_head_radius'])) self.tail = pymunk.Poly(self.body, tail_coordinates) self.tail.elasticity = 1.0 self.tail.collision_type = 6 # Init visual fields. self.verg_angle = env_variables['eyes_verg_angle'] * (np.pi / 180) self.retinal_field = env_variables['eyes_visual_field'] * (np.pi / 180) self.conv_state = 0 self.left_eye = Eye(self.verg_angle, self.retinal_field, True, env_variables, max_uv_range=self.max_uv_range, rng=self.rng) self.right_eye = Eye(self.verg_angle, self.retinal_field, False, env_variables, max_uv_range=self.max_uv_range, rng=self.rng) self.hungry = 0 self.prey_consumed = False self.touched_edge = False self.touched_predator = False self.making_capture = False self.capture_possible = False self.prev_action_distance = 0 self.prev_action_angle = 0 self.prev_action = 0 # Energy system (new simulation) self.energy_level = 1.0 self.d_scaling_energy_cost = self.env_variables['energy_distance_factor'] self.a_scaling_energy_cost = self.env_variables['energy_angle_factor'] self.baseline_energy_use = self.env_variables['energy_baseline'] self.action_energy_reward_scaling = self.env_variables['reward_energy_use_factor'] # Touch edge - for penalty self.touched_edge_this_step = False self.deterministic_action = self.env_variables['fish_deterministic_action']
[docs] def draw_angle_dist(self, action): """ Draw angle and distance values from a multivariate normal distribution. This method samples from a multivariate normal distribution defined by the given action parameters and returns both the sampled values and the mean values. Args: action (dict): A dictionary containing: - 'mean': array-like of shape (2,) representing the mean values [distance_mean, angle_mean] - 'cov': array-like of shape (2, 2) representing the covariance matrix Returns: tuple: A 4-tuple containing: - float: Sampled angle value - float: Sampled distance value - float: Mean angle value - float: Mean distance value """ bout_vals = self.rng.multivariate_normal(action['mean'], action['cov'], 1) return bout_vals[0, 1], bout_vals[0, 0], action['mean'][1], action['mean'][0]
[docs] def take_action(self, action_id): """ Execute a specific action for the fish agent in the simulation. This method updates the fish's position and orientation based on the selected action. It handles different simulation modes including sensory system testing and fish paralysis. Args: action_id (int): The identifier of the action to be executed. Must be within the range of available actions (0 to num_actions-1). Side Effects: - Updates self.prev_action with the current action_id - Updates self.prev_action_distance and self.prev_action_angle - Modifies self.body.angle based on the action or test mode sequence - Applies impulse to self.body if fish is not paralyzed - Sets self.making_capture to True if action is a capture action Behavior Modes: - test_sensory_system: Rotates fish body by 0.1 radians, no movement - fish_paralyze: Sets distance and angle changes to 0 - normal: Draws angle and distance from action distribution, applies impulse Notes: - If deterministic_action is True, uses mean values instead of sampling - The impulse applied is scaled by distance_to_impulse_factor """ self.prev_action = action_id if self.env_variables['test_sensory_system']: self.body.angle += 0.1 self.prev_action_distance = 0 self.prev_action_angle = 0 else: if "fish_paralyze" in self.env_variables and self.env_variables["fish_paralyze"]: self.prev_action_distance = 0 self.prev_action_angle = 0 else: angle_change, distance, mean_angle, mean_distance = self.draw_angle_dist(self.actions[action_id]) if self.deterministic_action: angle_change = mean_angle distance = mean_distance if self.actions[action_id]['is_capture']: self.making_capture = True self.prev_action_angle = angle_change self.body.angle += self.prev_action_angle self.prev_action_distance = distance self.body.apply_impulse_at_local_point((self.distance_to_impulse_factor * self.prev_action_distance, 0)) if not action_id in range(self.num_actions): print("Invalid action given")
[docs] def update_energy_level(self, consumption): """ Update the fish's energy level based on consumption and energy costs. This method calculates the net energy change by considering energy gained from consumption and energy expended through movement and actions. It also computes a reward based on energy expenditure. Args: consumption (float): Amount of energy gained from consuming food or prey. Returns: float: Reward value calculated as negative energy use scaled by action_energy_reward_scaling. Side Effects: - Updates self.energy_level based on net energy change (unless fixed energy level mode is enabled via env_variables) - Caps energy_level at 1.0 maximum Notes: - If env_variables["fish_fixed_energy_level"] is True, energy level remains unchanged - Energy level is capped at 1.0 after update """ energy_gain = consumption energy_use = self.d_scaling_energy_cost * (abs(self.prev_action_distance)) + \ self.a_scaling_energy_cost * (abs(self.prev_action_angle)) + \ self.baseline_energy_use if self.actions[self.prev_action]['is_capture']: energy_use *= self.env_variables['capture_swim_energy_cost_scaling'] energy_change = energy_gain - energy_use reward = -energy_use * self.action_energy_reward_scaling if "fish_fixed_energy_level" in self.env_variables and self.env_variables["fish_fixed_energy_level"]: pass else: self.energy_level += energy_change self.energy_level = min(self.energy_level, 1.0) # Ensure energy level does not exceed 1.0 return reward