Source code for plafosim.vehicle

#
# Copyright (c) 2020-2025 Julian Heinovski <heinovski@ccs-labs.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#

import logging
from typing import TYPE_CHECKING

from plafosim.mobility import CF_Model
from plafosim.statistics import (
    record_emission_trace_prefix,
    record_emission_trace_suffix,
    record_emission_trace_value,
    record_vehicle_emission,
    record_vehicle_trace,
    record_vehicle_trip,
)
from plafosim.util import speed2distance
from plafosim.vehicle_type import VehicleType

if TYPE_CHECKING:
    from numpy.typing import ArrayLike

    from .simulator import Simulator  # noqa 401
else:
    ArrayLike = float

LOG = logging.getLogger(__name__)

SPEED_NO_PREDECESSOR = 1e15
REARPOSITION_NO_PREDECESSOR = 1e15


[docs]class Vehicle: """ A collection of state information for a vehicle in the simulation. A vehicle can really be anything that can move and can be defined by a vehicle type. It does not necessarily be driven by a computer (i.e., autonomous). However, by default it does have V2X functionality. """
[docs] def __init__( self, simulator: 'Simulator', vid: int, vehicle_type: VehicleType, depart_position: int, arrival_position: int, desired_speed: float, depart_lane: int, depart_speed: float, depart_time: float, depart_delay: float, communication_range: int, pre_filled: bool = False, ): """ Initialize a vehicle instance. Parameters ---------- simulator : Simulator The global simulator object vid : int The id of the vehicle vehicle_type : VehicleType The vehicle type of the vehicle depart_position : int The departure position of the vehicle arrival_position : int The arrival position of the vehicle desired_speed : float The desired driving speed of the vehicle depart_lane : int The departure lane of the vehicle depart_speed : float The departure speed of the vehicle depart_time : float The actual departure time of the vehicle depart_delay : float The time the vehicle had to wait before starting its trip communication_range : int The maximum communication range of the vehicle pre_filled : bool Whether this vehicle was pre-filled """ self._simulator = simulator # the simulator self._started = False # flag indicating whether the vehicles has actually started self._vid = vid # the id of the vehicle self._vehicle_type = vehicle_type # the vehicle type of the vehicle # trip details self._depart_position = depart_position # the departure position of the vehicle self._arrival_position = arrival_position # the arrival position of the vehicle self._desired_speed = desired_speed # the desired driving speed of the vehicle self._depart_lane = depart_lane # the departure lane of the vehicle self._depart_speed = depart_speed # the departure speed of the vehicle self._depart_time = depart_time # the departure time of the vehicle self._depart_delay = depart_delay # the departure delay of the vehicle self._pre_filled = pre_filled # whether this vehicle was pre-filled # vehicle details self._position = self._depart_position # the current position of the vehicle self._lane = self._depart_lane # the current lane of the vehicle self._speed = self._depart_speed # the current speed of the vehicle self._blocked_front = False # whether the vehicle is blocked by a slower vehicle in front self._acceleration = 0 # the current acceleration of the vehicle, used (only) for the emission model self._cf_model = CF_Model.HUMAN # the current car following model self._cf_target_speed = desired_speed # the target speed for the car following # communication properties # TODO move to platooning vehicle self._communication_range = communication_range # the maximum communication range between two vehicles # statistics self._time_loss = 0 # SUMO: "The time lost due to driving below the ideal speed." self._emissions = { "CO": 0, # the total carbon monoxide (CO) emission in mg "CO2": 0, # the total carbon dioxide (CO2) emission in mg "HC": 0, # the total hydro carbon (HC) emission in mg "NOx": 0, # the total nitrogen oxides (NO and NO2) emission in mg "PMx": 0, # the total fine-particle (PMx) emission in mg "fuel": 0, # the total fuel consumption emission in ml } # gui properties self._color = ( self._simulator._rng.randrange(0, 255, 1), self._simulator._rng.randrange(0, 255, 1), self._simulator._rng.randrange(0, 255, 1), )
@property def vid(self) -> int: """ Return the id of the vehicle. """ return self._vid @property def vehicle_type(self) -> VehicleType: """ Return the VehicleType of the vehicle. """ return self._vehicle_type @property def length(self) -> int: """ Return the length of the vehicle. This is based on the vehicle type. """ return self._vehicle_type._length @property def max_speed(self) -> float: """ Return the maximum speed of the vehicle. This is based on the vehicle type. """ return self._vehicle_type._max_speed @property def max_acceleration(self) -> float: """ Return the maximum acceleration of the vehicle. This is based on the vehicle type. """ return self._vehicle_type._max_acceleration @property def max_deceleration(self) -> float: """ Return the maximum deceleration of the vehicle. This is based on the vehicle type. """ return self._vehicle_type._max_deceleration @property def min_gap(self) -> float: """ Return the minimum safety gap to the vehicle in front of the vehicle. This is based on the vehicle type. """ return self._vehicle_type._min_gap @property def headway_time(self) -> float: """ Return the human headway time of the vehicle. This is based on the vehicle type. """ return self._vehicle_type._headway_time @property def desired_headway_time(self) -> float: """ Return the desired headway time of the vehicle. """ return self._vehicle_type._headway_time @property def depart_position(self) -> int: """ Return the departure position of the vehicle. """ return self._depart_position @property def arrival_position(self) -> int: """ Return the arrival position of the vehicle. """ return self._arrival_position @property def desired_speed(self) -> float: """ Return the desired driving speed of the vehicle. """ return self._desired_speed @property def desired_gap(self) -> float: """ Return the desired gap to the vehicle in front of the vehicle. This is based on the desired headway time and the current driving speed. """ # use potential other desired headway time # TODO should this be target speed? return speed2distance(self.desired_headway_time * self._speed, self._simulator._step_length) @property def depart_lane(self) -> int: """ Return the departure lane of the vehicle. """ return self._depart_lane @property def depart_speed(self) -> float: """ Return the departure speed of the vehicle. """ return self._depart_speed @property def depart_time(self) -> float: """ Return the departure time of the vehicle. """ return self._depart_time @property def position(self) -> float: """ Return the current position of the vehicle. """ return self._position @property def rear_position(self) -> int: """ Return the current rear position of the vehicle. """ position = self._position - self._vehicle_type._length assert position >= 0 return position @property def lane(self) -> int: """ Return the current lane of the vehicle. """ return self._lane @property def speed(self) -> float: """ Return the current driving speed of the vehicle. """ return self._speed @property def cf_model(self) -> CF_Model: """ Return the currently activated car following model of the vehicle. """ return self._cf_model @property def travel_distance(self) -> float: """ Return the current traveled distance of the vehicle. """ return self._position - self._depart_position @property def travel_time(self) -> float: """ Return the current traveled time of the vehicle. """ return self._simulator.step - self._depart_time @property def blocked_front(self) -> bool: """ Return whether the vehicle is currently blocked by a slow vehicle in the front. """ return self._blocked_front @property def color(self) -> tuple: """Return the current color of the vehicle.""" return self._color
[docs] def action(self, step: int): """ Triggers actions of a vehicle. Parameters ---------- step : int The current simulation step """ # we started (right now) self._start() # log status information if LOG.getEffectiveLevel() <= logging.TRACE: LOG.trace(self.info()) # record periodic statistics self._statistics() # What has to be triggered periodically? if self._simulator._actions: self._action(step)
[docs] def _action(self, step: float): """ Triggers specific actions of a vehicle. Parameters ---------- step : float The current simulation step """ pass # this vehicle has no application running
# TODO: obsolete?
[docs] def _start(self): """ Start this Vehicle. """ if self._started: return self._started = True
[docs] def info(self) -> str: """ Return information about the vehicle. """ estimated_remaining_travel_time = ( (self._arrival_position - self._position) / self._speed if self._speed > 0 else self.desired_speed # use potential other desired driving speed ) return f"{self._vid} at {self._position}-{self.rear_position}, {self._lane} with {self._speed}, takes {estimated_remaining_travel_time}s to reach {self._arrival_position}"
[docs] def _statistics(self): """ Write continuous statistics for the vehicle. """ if not self._simulator._record_prefilled and self._depart_time == -1: # we do not record statistics for pre-filled vehicles return # calculate time loss # SUMO: "The time lost due to driving below the ideal speed." # can also use higher layer desired speed if self._speed < self._cf_target_speed: self._time_loss += self._simulator.step_length if self._simulator._record_vehicle_traces: # mobility/trip statistics record_vehicle_trace( basename=self._simulator._result_base_filename, step=self._simulator.step, vehicle=self, ) self._calculate_emissions()
# TODO current gap to front
[docs] def _calculate_emissions(self): """ Calculate the emitted pollutant amount using the given speed and acceleration based on the HBEFA3 model. As the functions are defining emissions in g/hour, the function's result is normed by 3.6 (seconds in an hour/1000) yielding mg/s. For fuel ml/s is returned. Negative acceleration results directly in zero emission. The amount emitted by the given emission class when moving with the given velocity and acceleration [mg/s or ml/s] """ if not self._simulator._record_prefilled and self._depart_time == -1: # we do not record statistics for pre-filled vehicles return if self._simulator._record_emission_traces: record_emission_trace_prefix( basename=self._simulator._result_base_filename, step=self._simulator.step, vid=self._vid, ) ec = self._vehicle_type.emission_class for variable in self._emissions.keys(): scale = 3.6 if variable == "fuel": if ec.is_diesel: scale *= 836.0 else: scale *= 742.0 value = ( self._calculate_emission( a=self._acceleration, v=self._speed, f=ec.emission_factors[variable], scale=scale, ) * self._simulator.step_length ) self._emissions[variable] += value if self._simulator._record_emission_traces: record_emission_trace_value(basename=self._simulator._result_base_filename, value=value) if self._simulator._record_emission_traces: record_emission_trace_suffix(basename=self._simulator._result_base_filename)
[docs] def _calculate_emission(self, a: float, v: float, f: list, scale: float) -> float: """ Calculate the actual emission of the vehicle. Parameters ---------- a : float The current acceleration v : float The current speed f : list The emission factors to use for current emission variable to be calculated scale : float The scale to normalize the calculated value Returns ------- float : The calculcated emission in ml/mg per s """ if a < 0: return 0 return max( ( f[0] + f[1] * a * v + f[2] * a * a * v + f[3] * v + f[4] * v * v + f[5] * v * v * v ) / scale, 0.0, )
[docs] def finish(self): """ Clean up the instance of the vehicle. This includes mostly statistic recording. """ if (self._position < self._arrival_position): LOG.warning(f"{self._vid}'s finish method was called even though vehicle did not arrive yet!") return expected_travel_time = (self._arrival_position - self._depart_position) / self._desired_speed # use explicit individual desired speed assert self.travel_time != 0 assert expected_travel_time != 0 travel_time_ratio = self.travel_time / expected_travel_time # NOTE: this also contains teleports average_driving_speed = self.travel_distance / self.travel_time average_deviation_desired_speed = average_driving_speed - self._desired_speed # use explicit individual desired speed LOG.info(f"{self._vid} arrived at {self._position}m,{self._lane} with {self._speed}m/s, took {self.travel_time}s, {self.travel_distance}m, loss: {self._time_loss}s, {travel_time_ratio * 100}% of expected duration") # statistic recording # TODO use pre_filled flag if not self._simulator._record_prefilled and self._depart_time == -1: # we do not record statistics for pre-filled vehicles LOG.debug(f"Not recording statistics for pre-filled vehicle {self._vid}") return # by this check, we should also already avoid logging if the minimum trip length has not been fulfilled # HACK: adding length here to cope for departPos="base" # TODO we might need to travel 'length' more than arrival position if self.travel_distance < (self._simulator._minimum_trip_length - self.length): # we are only interested in vehicles that did complete the minimum trip length # this should only be the case for pre-filled vehicles or if started as platoon # TODO use pre_filled flag assert ( self._simulator._record_prefilled and self._depart_time == -1 ) or self._simulator._start_as_platoon assert travel_time_ratio >= 0 assert average_driving_speed >= 0 if self._simulator._record_end_trace: # call trace recording once again self._statistics() if self._simulator._record_vehicle_trips: record_vehicle_trip( basename=self._simulator._result_base_filename, vehicle=self, time_loss=self._time_loss, depart_delay=self._depart_delay, expected_travel_time=expected_travel_time, travel_time_ratio=travel_time_ratio, average_driving_speed=average_driving_speed, average_deviation_desired_speed=average_deviation_desired_speed, ) if self._simulator._record_vehicle_emissions: record_vehicle_emission(basename=self._simulator._result_base_filename, vehicle=self)
def __str__(self) -> str: """ Return the str representation of the vehicle. """ self_dict = self.__dict__.copy() self_dict.update({'_vehicle_type': str(self._vehicle_type)}) # use str representation of vehicle type return str(self_dict)