Source code for plafosim.infrastructure

#
# Copyright (c) 2020-2025 Julian Heinovski <heinovski@ccs-labs.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#

import logging
import sys
from typing import TYPE_CHECKING

from plafosim.algorithms import *  # noqa 401
from plafosim.platooning_vehicle import PlatooningVehicle

if TYPE_CHECKING:
    from plafosim.simulator import Simulator  # noqa 401

LOG = logging.getLogger(__name__)


[docs]class Infrastructure: """ A class representing road side infrastructure / road side units. The infrastructure can execute formation algorithms. """
[docs] def __init__( self, simulator: 'Simulator', iid: int, position: int, formation_algorithm: str, execution_interval: int, **kw_args, ): """ Initialize an Infrastructure instance. Parameters ---------- simulator : Simulator The global simulator object iid : int The id of the infrastructure position : int The position (x) of the infrastructure formation_algorithm : str The platoon formation (i.e., assignment calculation) algorithm to run execution_interval : int The execution interval for the formation algorithm """ self._simulator = simulator # the simulator self._iid = iid # the id of the infrastructure self._position = position # the x position of the infrastructure if formation_algorithm: # initialize formation algorithm try: self._formation_algorithm = globals()[formation_algorithm](self, **kw_args) except KeyError: sys.exit(f"ERROR [{__name__}]: Unknown formation algorithm {formation_algorithm}! Did you import it?") self._execution_interval = execution_interval # initialize timer self._last_formation_step = 0 # initialize with vehicle start else: self._formation_algorithm = None
@property def iid(self) -> int: """ Return the id of an infrastructure. """ return self._iid @property def position(self) -> int: """ Return the position of an infrastructure. """ return self._position
[docs] def action(self, step: int): """ Triggers actions of an infrastructure. Parameters ---------- step : int The current simulation step """ # What has to be triggered periodically? if self._simulator._actions: self._action(step)
[docs] def _action(self, step: int): """ Triggers concrete actions of an infrastructure. Parameters ---------- step : int The current simulation step """ LOG.trace(f"{self.iid} was triggered at {step}") if self._formation_algorithm: if step >= self._last_formation_step + self._execution_interval: # search for a platoon (depending on the algorithm) self._formation_algorithm.do_formation() self._last_formation_step = step
# TODO currently not used --> remove?
[docs] def _get_neighbors(self): neighbors = [] for vehicle in self._simulator._vehicles.values(): # filter vehicles that are technically not able to do platooning if not isinstance(vehicle, PlatooningVehicle): continue # filter based on communication range communication_range = self._simulator.road_length if abs(vehicle.position - self.position) > communication_range: continue neighbors.append(vehicle) return neighbors
[docs] def finish(self): """ Clean up the instance of the infrastructure. """ if self._formation_algorithm is not None: self._formation_algorithm.finish()