#
# Copyright (c) 2020-2025 Julian Heinovski <heinovski@ccs-labs.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import logging
import random
import sys
import time
from timeit import default_timer as timer
import numpy as np
import pandas as pd
from tqdm import tqdm
from plafosim.emissions import EmissionClass
from plafosim.gui import (
add_gui_vehicle,
check_and_prepare_gui,
close_gui,
draw_infrastructures,
draw_ramps,
draw_road_end,
gui_step,
move_gui_vehicle,
prune_vehicles,
remove_gui_vehicle,
set_gui_window,
start_gui,
)
from plafosim.infrastructure import Infrastructure
from plafosim.mobility import (
HIGHVAL,
CF_Model,
compute_lane_changes,
compute_new_speeds,
get_crashed_vehicles,
get_predecessors,
is_gap_safe,
lane_predecessors,
update_position,
)
from plafosim.platoon_role import PlatoonRole
from plafosim.platooning_vehicle import PlatooningVehicle
from plafosim.spawning import get_arrival_position, get_depart_speed, get_desired_speed
from plafosim.statistics import (
initialize_emission_traces,
initialize_platoon_changes,
initialize_platoon_formation,
initialize_platoon_maneuvers,
initialize_platoon_traces,
initialize_platoon_trips,
initialize_simulation_trace,
initialize_vehicle_changes,
initialize_vehicle_emissions,
initialize_vehicle_platoon_changes,
initialize_vehicle_platoon_traces,
initialize_vehicle_teleports,
initialize_vehicle_traces,
initialize_vehicle_trips,
record_general_data_begin,
record_general_data_end,
record_platoon_change,
record_simulation_trace,
record_vehicle_change,
record_vehicle_platoon_change,
record_vehicle_trace,
)
from plafosim.util import assert_index_equal
from plafosim.vehicle import Vehicle
from plafosim.vehicle_type import VehicleType
LOG = logging.getLogger(__name__)
# assumptions
# you just reach your arrival_position
# position is in the middle of the front bumper
# a vehicle ends at position + length
# crash detection does not work with step length greater than 1
# vehicle properties
# https://sumo.dlr.de/docs/Vehicle_Type_Parameter_Defaults.html
_length = 5 # m # TODO make parameter
_max_speed = 55 # m/s # TODO make parameter
_max_acceleration = 2.5 # m/s^2 # TODO make parameter
# https://copradar.com/chapts/references/acceleration.html
# TODO distinguish between normal (e.g., 4.5) and emergency (e.g., 7-10m/s^2) deceleration
_max_deceleration = 10 # m/s^2 # TODO make parameter
_min_gap = 2.5 # m # TODO make parameter
_desired_headway_time = 1.0 # s # TODO make parameter
# HBEFA3/PC_G_EU4 (a gasoline powered Euro norm 4 passenger car modeled using the HBEFA3 based model), default of SUMO
_emission_class = EmissionClass.PC_G_EU4.name # TODO make parameter
vtype = VehicleType(
"car",
_length,
_max_speed,
_max_acceleration,
_max_deceleration,
_min_gap,
_desired_headway_time,
_emission_class,
) # TODO support multiple vtypes
# vehicle data fram type collections
# TODO: extract to module or class later
CFModelDtype = pd.CategoricalDtype(list(CF_Model), ordered=True)
PlatoonRoleDtype = pd.CategoricalDtype(list(PlatoonRole), ordered=True)
# default values in SI units
# TODO move to __init__?
DEFAULTS = {
'road_length': 100 * 1000, # km -> m
'lanes': 3,
'ramp_interval': 5 * 1000, # km -> m
'pre_fill': False,
'vehicles': 100,
'vehicle_density': -1,
'max_speed': vtype.max_speed, # TODO not used currently
'acc_headway_time': vtype.headway_time,
'cacc_spacing': 5.0, # m
'penetration_rate': 1.0,
'random_depart_position': False,
'depart_all_lanes': True,
'desired_speed': 33.0, # m/s
'random_desired_speed': True,
'speed_variation': 0.1,
'min_desired_speed': 22.0, # m/s
'max_desired_speed': 44.0, # m/s
'random_depart_speed': False,
'depart_desired': False,
'depart_flow': False,
'depart_method': 'interval',
'depart_interval': 2.0, # s
'depart_probability': 1.0,
'depart_rate': 3600, # v/h
'random_arrival_position': False,
'minimum_trip_length': 0,
'maximum_trip_length': -1 * 1000, # km -> m
'communication_range': 500, # m
# TODO apply also to infrastructure-based approaches
'distributed_platoon_knowledge': True,
# TODO apply also to infrastructure-based approaches
'distributed_maneuver_knowledge': False,
'start_as_platoon': False,
'reduced_air_drag': True,
'maximum_teleport_distance': 2000, # m
'maximum_approach_time': 60, # s
'delay_teleports': True,
'update_desired_speed': True,
'formation_algorithm': None,
'formation_strategy': 'distributed', # TODO rename
'execution_interval': 10, # s
'infrastructures': 0,
'step_length': 1.0, # s
'max_step': 1 * 3600, # h -> s
'actions': True,
'collisions': True,
'random_seed': -1,
'log_level': logging.WARNING,
'progress': True,
'gui': False,
'gui_delay': 0, # ms
'gui_track_vehicle': -1,
'sumo_config': 'sumocfg/freeway.sumo.cfg',
'gui_play': True,
'gui_start': 0, # s
'draw_ramps': True,
'draw_ramp_labels': False,
'draw_road_end': True,
'draw_road_end_label': True,
'draw_infrastructures': True,
'draw_infrastructure_labels': True,
'result_base_filename': 'results',
'record_simulation_trace': False,
'record_end_trace': True,
'record_vehicle_trips': False,
'record_vehicle_emissions': False,
'record_vehicle_traces': False,
'record_vehicle_changes': False,
'record_emission_traces': False,
'record_platoon_trips': False,
'record_platoon_maneuvers': False,
'record_platoon_formation': False,
'record_platoon_traces': False,
'record_vehicle_platoon_traces': False,
'record_platoon_changes': False,
'record_infrastructure_assignments': False,
'record_vehicle_teleports': False,
'record_prefilled': False,
}
[docs]def report_rough_braking(
vdf: pd.DataFrame,
new_speed: pd.Series,
step_length: float,
rough_factor: float = 0.5,
) -> int:
"""
Report about vehicle preforming rough braking maneuvers.
Rough braking meaning more than rough_factor times max_deceleration.
Return number of rough braking vehicles
Parameters
----------
vdf : pandas.DataFrame
The Dataframe containing the vehicles as rows
index: vid
columns: [position, length, lane, ..]
new_speed : pandas.Series
The Series containing the new speeds of the vehicles
index: vid
step_length : float
The length of one simulation step in s
rough_factor : float, optional
The factor to apply to the maximum deceleration for setting a rough braking threshold
Returns
-------
int
The number of vehicles performing rough braking
"""
assert_index_equal(vdf, new_speed)
assert step_length > 0
assert rough_factor > 0
if vdf.empty:
return 0
brakers = (
(vdf['speed'] - new_speed) > (vdf.max_deceleration * rough_factor * step_length)
)
if not brakers.any():
return 0
fields = ['lane', 'position', 'speed', 'new_speed', 'max_deceleration', 'deceleration']
brake_df = (
vdf.assign(new_speed=new_speed, deceleration=vdf['speed'] - new_speed)
.loc[brakers, fields]
.sort_values(["lane", "position"], ascending=False)
)
LOG.warning(
"The following vehicles performed rough braking:\n%s",
brake_df
)
return len(brake_df)
[docs]def check_collisions(vdf: pd.DataFrame) -> bool:
"""
Do collision checks for all vehicles in the simulation.
Parameters
----------
vdf : pandas.DataFrame
The Dataframe containing the vehicles as rows
index: vid
columns: [position, length, lane, ..]
Returns
-------
bool
Whether there are collisions between vehicles
"""
if vdf.empty:
return False
crashed_vehicles = get_crashed_vehicles(vdf)
if crashed_vehicles:
for v in crashed_vehicles:
print(f"{v}: {vdf.at[v, 'position']}-{vdf.at[v, 'position']-vdf.at[v, 'length']},{vdf.at[v, 'lane']}")
return True
return False
[docs]def has_collision(
position1: float,
rear_position1: float,
lane1: int,
position2: float,
rear_position2: float,
lane2: int,
) -> bool:
"""
Check for a collision between two vehicles.
Parameters
----------
position1 : float
The current position of vehicle 1
rear_position1 : float
The current rear position of vehicle 1
lane1 : int
The current lane of vehicle 1
position2 : float
The current position of vehicle 2
rear_position2 : float
The current rear position of vehicle 2
lane2 : int
The current lane of vehicle 2
Returns
-------
bool : Whether there is a collision between two vehicles
"""
if lane1 != lane2:
return False
return min(position1, position2) - max(rear_position1, rear_position2) >= 0
[docs]def is_insert_safe(
depart_position: float,
depart_speed: float,
vtype: VehicleType,
other_vehicle: Vehicle,
step_length: float,
) -> bool:
"""
Checks if a vehicle can be inserted safely at a given position.
Parameters
----------
depart_position : float
The planned depature position of the vehicle
depart_speed : float
The planned departure speed of the vehicle
vtype : VehicleType
The vehicle type of the vehicle
other_vehicle: Vehicle
The other vehicle to check
step_length: float
The step length
Returns
-------
bool : Whether the insertion of a vehicle is safe
"""
assert depart_position >= 0
assert depart_speed >= 0
assert isinstance(other_vehicle, Vehicle)
assert step_length > 0
# would it be unsafe to insert the vehicle?
if other_vehicle.position <= depart_position:
# the other vehicle is behind the current vehicle
# check if the other vehicle could crash into the current vehicle within the next time step
return is_gap_safe(
front_position=depart_position,
front_speed=depart_speed,
front_max_deceleration=vtype.max_deceleration,
front_length=vtype.length,
back_position=other_vehicle.position,
back_speed=other_vehicle.speed,
back_max_acceleration=other_vehicle.max_acceleration,
back_min_gap=other_vehicle.min_gap,
step_length=step_length,
)
# the current vehicle is behind the other vehicle
# check if the current vehicle could crash into the other vehicle within the next time step
return is_gap_safe(
front_position=other_vehicle.position,
front_speed=other_vehicle.speed,
front_max_deceleration=other_vehicle.max_deceleration,
front_length=other_vehicle.length,
back_position=depart_position,
back_speed=depart_speed,
back_max_acceleration=vtype.max_acceleration,
back_min_gap=vtype.min_gap,
step_length=step_length,
)
[docs]class Simulator:
"""
A collection of parameters and information of the simulator.
"""
[docs] def __init__(
self,
*,
road_length: int = DEFAULTS['road_length'],
number_of_lanes: int = DEFAULTS['lanes'],
ramp_interval: int = DEFAULTS['ramp_interval'],
pre_fill: bool = DEFAULTS['pre_fill'],
number_of_vehicles: int = DEFAULTS['vehicles'],
vehicle_density: float = DEFAULTS['vehicle_density'],
max_speed: float = DEFAULTS['max_speed'],
acc_headway_time: float = DEFAULTS['acc_headway_time'],
cacc_spacing: float = DEFAULTS['cacc_spacing'],
penetration_rate: float = DEFAULTS['penetration_rate'],
random_depart_position: bool = DEFAULTS['random_depart_position'],
depart_all_lanes: bool = DEFAULTS['depart_all_lanes'],
desired_speed: float = DEFAULTS['desired_speed'],
random_desired_speed: bool = DEFAULTS['random_desired_speed'],
speed_variation: float = DEFAULTS['speed_variation'],
min_desired_speed: float = DEFAULTS['min_desired_speed'],
max_desired_speed: float = DEFAULTS['max_desired_speed'],
random_depart_speed: bool = DEFAULTS['random_depart_speed'],
depart_desired: bool = DEFAULTS['depart_desired'],
depart_flow: bool = DEFAULTS['depart_flow'],
depart_method: str = DEFAULTS['depart_method'],
depart_interval: float = DEFAULTS['depart_interval'],
depart_probability: float = DEFAULTS['depart_probability'],
depart_rate: int = DEFAULTS['depart_rate'],
random_arrival_position: bool = DEFAULTS['random_arrival_position'],
minimum_trip_length: int = DEFAULTS['minimum_trip_length'],
maximum_trip_length: int = DEFAULTS['maximum_trip_length'],
communication_range: int = DEFAULTS['communication_range'],
distributed_platoon_knowledge: bool = DEFAULTS['distributed_platoon_knowledge'],
distributed_maneuver_knowledge: bool = DEFAULTS['distributed_maneuver_knowledge'],
start_as_platoon: bool = DEFAULTS['start_as_platoon'],
reduced_air_drag: bool = DEFAULTS['reduced_air_drag'],
maximum_teleport_distance: int = DEFAULTS['maximum_teleport_distance'],
maximum_approach_time: int = DEFAULTS['maximum_approach_time'],
delay_teleports: bool = DEFAULTS['delay_teleports'],
update_desired_speed: bool = DEFAULTS['update_desired_speed'],
formation_algorithm: str = DEFAULTS['formation_algorithm'],
formation_strategy: str = DEFAULTS['formation_strategy'],
execution_interval: int = DEFAULTS['execution_interval'],
number_of_infrastructures: int = DEFAULTS['infrastructures'],
step_length: float = DEFAULTS['step_length'],
max_step: int = DEFAULTS['max_step'],
actions: bool = DEFAULTS['actions'],
collisions: bool = DEFAULTS['collisions'],
random_seed: int = DEFAULTS['random_seed'],
log_level: int = DEFAULTS['log_level'],
progress: bool = DEFAULTS['progress'],
gui: bool = DEFAULTS['gui'],
gui_delay: int = DEFAULTS['gui_delay'],
gui_track_vehicle: int = DEFAULTS['gui_track_vehicle'],
sumo_config: str = DEFAULTS['sumo_config'],
gui_play: int = DEFAULTS['gui_play'],
gui_start: int = DEFAULTS['gui_start'],
draw_ramps: bool = DEFAULTS['draw_ramps'],
draw_ramp_labels: bool = DEFAULTS['draw_ramp_labels'],
draw_road_end: bool = DEFAULTS['draw_road_end'],
draw_road_end_label: bool = DEFAULTS['draw_road_end_label'],
draw_infrastructures: bool = DEFAULTS['draw_infrastructures'],
draw_infrastructure_labels: bool = DEFAULTS['draw_infrastructure_labels'],
screenshot_filename: str = None,
result_base_filename: str = DEFAULTS['result_base_filename'],
record_simulation_trace: bool = DEFAULTS['record_simulation_trace'],
record_end_trace: bool = DEFAULTS['record_end_trace'],
record_vehicle_trips: bool = DEFAULTS['record_vehicle_trips'],
record_vehicle_emissions: bool = DEFAULTS['record_vehicle_emissions'],
record_vehicle_traces: bool = DEFAULTS['record_vehicle_traces'],
record_vehicle_changes: bool = DEFAULTS['record_vehicle_changes'],
record_emission_traces: bool = DEFAULTS['record_emission_traces'],
record_platoon_trips: bool = DEFAULTS['record_platoon_trips'],
record_platoon_maneuvers: bool = DEFAULTS['record_platoon_maneuvers'],
record_platoon_formation: bool = DEFAULTS['record_platoon_formation'],
record_platoon_traces: bool = DEFAULTS['record_platoon_traces'],
record_vehicle_platoon_traces: bool = DEFAULTS['record_vehicle_platoon_traces'],
record_platoon_changes: bool = DEFAULTS['record_platoon_changes'],
record_infrastructure_assignments: bool = DEFAULTS['record_infrastructure_assignments'],
record_vehicle_teleports: bool = DEFAULTS['record_vehicle_teleports'],
record_prefilled: bool = DEFAULTS['record_prefilled'],
**kwargs: dict,
):
"""
Initialize a simulator instance.
"""
# set up logging
# TODO add custom filter that prepends the log entry with the step time
logging.basicConfig(level=log_level, stream=sys.stdout, format="%(levelname)s [%(name)s]: %(message)s")
# road network properties
self._road_length = road_length # the length of the road
self._number_of_lanes = number_of_lanes # the number of lanes
if road_length % ramp_interval != 0:
sys.exit(f"ERROR [{__name__}]: The road length has to be a multiple of the ramp interval!")
self._ramp_interval = ramp_interval # the distance between any two on-/off-ramps
self._ramp_positions = list(range(0, self._road_length + 1, self._ramp_interval))
# vehicle properties
self._vehicles = {} # the list (dict) of vehicles in the simulation
self._last_vehicle_id = -1 # the id of the last vehicle generated
# set up queue for vehicles to be spawned
self._vehicle_spawn_queue = []
# the maximum number of vehicles
if vehicle_density > 0:
# override vehicles
LOG.debug("Using '--density' instead of '--vehicles' for number of vehicles. Use '--density -1' to disable this.")
number_of_vehicles = int(vehicle_density * (self._road_length / 1000) * self._number_of_lanes)
if vehicle_density == 0:
sys.exit(f"ERROR [{__name__}]: A vehicle density of 0 vehicles per lane per km does not make sense!")
if number_of_vehicles <= 0:
sys.exit(f"ERROR [{__name__}]: A simulation with 0 vehicles does not make sense!")
self._number_of_vehicles = number_of_vehicles
self._max_speed = max_speed # the maximum driving speed # TODO not used currently
self._acc_headway_time = acc_headway_time # the headway time for ACC
if acc_headway_time < 1.0:
LOG.warning("Values for ACC headway time lower 1.0s are not recommended to avoid crashes!")
self._cacc_spacing = cacc_spacing # the constant spacing for CACC
if cacc_spacing < 5.0:
LOG.warning("Values for CACC spacing lower than 5.0m are not recommended to avoid crashes!")
self._penetration_rate = penetration_rate # the penetration rate of platooning vehicles
# trip properties
self._random_depart_position = random_depart_position # whether to use random departure positions
self._depart_all_lanes = depart_all_lanes # whether to use all lanes for departure
self._desired_speed = desired_speed # the desired driving speed
self._random_desired_speed = random_desired_speed # whether to use random desired driving speeds
self._speed_variation = speed_variation # the deviation from the desired driving speed
self._min_desired_speed = min_desired_speed # the minimum desired driving speed
self._max_desired_speed = max_desired_speed # the maximum desired driving speed
if not min_desired_speed <= desired_speed <= max_desired_speed:
sys.exit(f"ERROR [{__name__}]: desired speed has to be between limits!")
self._random_depart_speed = random_depart_speed # whether to use random departure speeds
self._depart_desired = depart_desired # whether to departure with the desired driving speed
if random_depart_position and not depart_desired:
sys.exit(f"ERROR [{__name__}]: '--random-depart-position' can only be used in conjunction with '--depart-desired'!")
self._depart_flow = depart_flow # whether to spawn vehicles in a continuous flow
if not depart_flow and depart_method == "number":
sys.exit(f"ERROR [{__name__}]: The departure method 'number' can only be used in conjunction with '--depart-flow'!")
self._depart_method = depart_method # the departure method to use
if depart_interval <= 0:
sys.exit(f"ERROR [{__name__}]: The departure interval has to be bigger than 0!")
self._depart_interval = depart_interval # the interval between two vehicle departures
if depart_probability < 0 or depart_probability > 1:
sys.exit(f"ERROR [{__name__}]: The departure probability needs to be between 0 and 1!")
if depart_probability == 0:
sys.exit(f"ERROR [{__name__}]: A departure probability of 0 does not make sense!")
self._depart_probability = depart_probability # the departure probability
if depart_rate <= 0:
sys.exit(f"ERROR [{__name__}]: The departure rate has to be at least 1 vehicle per hour!")
self._depart_rate = depart_rate # the departure rate
self._effective_depart_rate = None
if self._depart_method == "interval":
self._effective_depart_rate = step_length / self._depart_interval
elif self._depart_method == "rate":
# spawn #vehicles per hour, similar to SUMO's flow parameter vehsPerHour
self._effective_depart_rate = self._depart_rate / 3600
elif self._depart_method == "number":
LOG.warning("This departure method is not yet tested!")
# spawn #number vehicles, similar to SUMO's flow parameter number
# TODO other departure method for constant number of concurrent vehicles
# thus: all vehicles have an equal spacing
self._effective_depart_rate = self._number_of_vehicles / max_step
elif self._depart_method != "probability":
sys.exit(f"ERROR [{__name__}]: Unknown departure method!")
if self._effective_depart_rate is not None:
if 0.5 < self._effective_depart_rate <= 1 and (not random_depart_position or ramp_interval == road_length):
LOG.warning(f"The current effective departure rate {self._effective_depart_rate} vehicles/step will lead to departure delays for vehicles!")
if self._effective_depart_rate > 1 and not self._random_depart_position:
sys.exit(f"ERROR [{__name__}]: An effective departure rate > 1 vehicles/step is incompatible with spawning only at the origin! Use '--random-departure-position True' to allow spawning at all on-ramps.")
if self._effective_depart_rate < 0:
sys.exit(f"ERROR [{__name__}]: The effective departure rate is < 0 vehicles/step!")
if self._effective_depart_rate > len(self._ramp_positions):
LOG.warning(f"The current effective departure rate {self._effective_depart_rate} vehicles/step will lead to departure delays for vehicles!")
self._random_arrival_position = random_arrival_position # whether to use random arrival positions
if minimum_trip_length > road_length:
sys.exit(f"ERROR [{__name__}]: Minimum trip length cannot be bigger than the length of the entire road!")
self._minimum_trip_length = max(minimum_trip_length, ramp_interval) # the minimum trip length
if maximum_trip_length == -1 * 1000:
self._maximum_trip_length = road_length
else:
if maximum_trip_length < minimum_trip_length:
sys.exit(f"ERROR [{__name__}]: Maximum trip length cannot be smaller than the minimum trip length!")
if maximum_trip_length < ramp_interval:
sys.exit(f"ERROR [{__name__}]: Maximum trip length cannot be smaller than the ramp interval!")
if maximum_trip_length % ramp_interval != 0:
sys.exit(f"ERROR [{__name__}]: Maximum trip length has to be a multiple of the ramp interval!")
if maximum_trip_length == minimum_trip_length:
LOG.debug(f"Using static trip length of {maximum_trip_length}m for all vehicles.")
if not random_arrival_position:
sys.exit(f"ERROR [{__name__}]: Static trip length is only possible in conjunction with random-arrival-position!")
self._maximum_trip_length = maximum_trip_length # the maximum trip length
# communication properties
if communication_range == -1:
self._communication_range = road_length
elif communication_range <= 0:
sys.exit(f"ERROR [{__name__}]: Communication range has to be > 0!")
else:
self._communication_range = communication_range # the maximum communication range between two vehicles
self._distributed_platoon_knowledge = distributed_platoon_knowledge # whether the distributed approach should have perfect platoon knowledge (e.g., platoon role)
self._distributed_maneuver_knowledge = distributed_maneuver_knowledge # whether the distributed approach should have perfect maneuver knowledge
# platoon properties
self._start_as_platoon = start_as_platoon # whether vehicles start as one platoon
if start_as_platoon:
if penetration_rate < 1.0:
sys.exit(f"ERROR [{__name__}]: The penetration rate cannot be smaller than 1.0 when starting as one platoon!")
if formation_algorithm:
sys.exit(f"ERROR [{__name__}]: A formation algorithm cannot be used when all starting as one platoon!")
if not pre_fill:
sys.exit(f"ERROR [{__name__}]: start-as-platoon is only available when using prefill!")
if depart_flow:
sys.exit(f"ERROR [{__name__}]: Vehicles can not spawn in a flow when starting as one platoon!")
if random_depart_position:
sys.exit(f"ERROR [{__name__}]: Vehicles can not have random departure positions when starting as one platoon!")
self._reduced_air_drag = reduced_air_drag # whether the reduced air drag due to platooning should be considered in the emissions calculation
if maximum_teleport_distance == -1:
self._maximum_teleport_distance = road_length
LOG.warning("No maximum teleport distance configured! The vehicle behavior may be unrealistic!")
else:
if maximum_teleport_distance >= ramp_interval:
LOG.warning(f"A maximum teleport distance of {maximum_teleport_distance}m allows teleports beyond the next highway ramp! ")
if 0 < minimum_trip_length <= maximum_teleport_distance:
LOG.warning(f"A maximum teleport distance of {maximum_teleport_distance}m allows teleports beyond the minimum trip length!")
self._maximum_teleport_distance = maximum_teleport_distance # maximum teleport distance
if maximum_approach_time == -1:
LOG.warning("No maximum approach timeout configured! The vehicle behavior may be unrealistic!")
self._maximum_appraoch_time = float('inf')
else:
self._maximum_appraoch_time = maximum_approach_time # maximum approach time
if not delay_teleports:
LOG.warning("Teleports will be executed instantaneous! The vehicle behavior may be unrealistic!")
self._delay_teleports = delay_teleports # whether teleports during a join maneuver should be delayed by the approach time
self._update_desired_speed = update_desired_speed # whether to update the platoon's desired driving speed to the average speed of all members after the formation changed
self._formation_algorithm = formation_algorithm # the formation algorithm to use
if formation_strategy == "centralized" and number_of_infrastructures <= 0:
sys.exit(f"ERROR [{__name__}]: When using a centralized strategy at least 1 infrastructure is needed!")
self._formation_strategy = formation_strategy # the formation strategy to use
# formation properties
self._execution_interval = execution_interval # the interval between two iterations of a formation algorithm
if execution_interval <= 0:
sys.exit(f"ERROR [{__name__}]: Execution interval has to be at least 1 second!")
# infrastructure properties
self._infrastructures = {} # the list (dict) of infrastructures in the simulation
# simulation properties
self._step = 0 # the current simulation step in s
assert step_length > 0
if step_length != 1.0:
LOG.warning("Step lengths other than 1s are not yet properly implemented and tested. Use at your own risk!")
if step_length != 0.5:
LOG.warning("Step lengths other than 0.5s are not yet properly implemented and tested. Use at your own risk!")
if step_length % 1 != 0:
LOG.warning("Non-integer step lengths are not yet properly implemented and tested. Use at your own risk!")
if step_length < 1.0:
LOG.warning("Step lengths smaller than 1s are not recommended in order to avoid collisions and long runtimes!")
if step_length >= vtype.headway_time:
LOG.warning("Step lengths bigger than or equal to the headway time are not recommended in order to avoid rough braking and collisions. Use at your own risk!")
self._step_length = step_length # the length of a simulation step
self._max_step = int(max_step)
self._running = False # whether the simulation is running
self._actions = actions # whether to enable actions
self._collisions = collisions # whether to check for collisions
if random_seed < 0:
random_seed = random.randint(0, 10000)
LOG.debug(f"Using random seed {random_seed}.")
self._rng = random.Random(random_seed)
self._progress = progress # whether to enable the (simulation) progress bar
# GUI properties
self._gui = gui # whether to show a live sumo-gui
if gui:
check_and_prepare_gui()
if road_length > 1000 * 1000:
sys.exit(f"ERROR [{__name__}]: The current maximum road length supported in the GUI is 1000km!")
if number_of_lanes > 4:
sys.exit(f"ERROR [{__name__}]: The current maximum number of lanes supported by the GUI is 4!")
if number_of_lanes < 4:
LOG.warning("The current number of lanes supported by the GUI is 4!")
self._gui_delay = gui_delay # the delay in every simulation step for the GUI
self._gui_track_vehicle = gui_track_vehicle # the id of a vehicle to track in the GUI
self._sumo_config = sumo_config # the name of the SUMO configuration file
self._gui_play = gui_play # whether to start the simulation immediately
if gui_start < 0:
sys.exit(f"ERROR [{__name__}]: GUI start time cannot be negative!")
self._gui_start = gui_start # the time when to connect to the GUI
self._draw_ramps = draw_ramps # whether to draw on-/off-ramps
self._draw_ramp_labels = draw_ramp_labels # whether to draw labels for on-/off-ramps
self._draw_road_end = draw_road_end # whether to draw the end of the road
self._draw_road_end_label = draw_road_end_label # whether to draw a label for the end of the road
self._draw_infrastructures = draw_infrastructures # whether to draw infrastructures
self._draw_infrastructure_labels = draw_infrastructure_labels # whether to draw labels for infrastructures
if screenshot_filename and not gui:
sys.exit(f"ERROR [{__name__}]: Saving screenshots is only possible with the GUI enabled!")
self._screenshot_file = screenshot_filename # the name of the screenshot file
# result recording properties
self._result_base_filename = result_base_filename # the base filename of the result files
self._record_simulation_trace = record_simulation_trace # whether to record a continuous simulation trace
self._record_end_trace = record_end_trace # whether to record another trace item at the trip end
self._record_vehicle_trips = record_vehicle_trips # whether to record vehicles trips
self._record_vehicle_emissions = record_vehicle_emissions # whether to record vehicle emissions
self._record_vehicle_traces = record_vehicle_traces # whether to record continuous vehicle traces
self._record_vehicle_changes = record_vehicle_changes # whether to record vehicle lane changes
self._record_emission_traces = record_emission_traces # whether to record continuous emission traces
self._record_platoon_trips = record_platoon_trips # whether to record platoon trips
self._record_platoon_maneuvers = record_platoon_maneuvers # whether to record platoon maneuvers
self._record_platoon_formation = record_platoon_formation # whether to record platoon formation
self._record_platoon_traces = record_platoon_traces # whether to record continuous platoon traces
self._record_vehicle_platoon_traces = record_vehicle_platoon_traces # whether to record continuous vehicle platoon traces
self._record_platoon_changes = record_platoon_changes # whether to record platoon lane changes
self._record_infrastructure_assignments = record_infrastructure_assignments # whether to record infrastructure assignments
self._record_vehicle_teleports = record_vehicle_teleports # whether to record vehicle teleports
if record_prefilled and not start_as_platoon:
LOG.warning("Recording results for pre-filled vehicles is not recommended to avoid distorted statistics!")
self._record_prefilled = record_prefilled # whether to record results for pre-filled vehicles
# additional keyword arguments (e.g., for formation algorithms)
self._kwargs = kwargs
# statistics
# average number of vehicles in simulation
self._avg_number_vehicles = 0
self._values_in_avg_number_vehicles = 0
# average number of vehicles in the spawn queue
self._avg_number_vehicles_queue = 0
self._values_in_avg_number_vehicles_queue = 0
# average number of vehicles spawned (departure flow)
self._avg_number_vehicles_spawned = 0
self._values_in_avg_number_vehicles_spawned = 0
# average number of vehicles arrival (arrival flow)
self._avg_number_vehicles_arrived = 0
self._values_in_avg_number_vehicles_arrived = 0
# average vehicle speed (HACK)
self._avg_vehicle_speed = 0
self._values_in_avg_vehicle_speed = 0
# average number of vehicles braking rough
self._avg_number_vehicles_braking_rough = 0
self._values_in_avg_number_vehicles_braking_rough = 0
# TODO log generation parameters
if pre_fill:
self._generate_vehicles()
self._last_vehicle_id = list(self._vehicles.keys())[-1]
self._number_of_prefilled_vehicles = len(self._vehicles)
else:
self._number_of_prefilled_vehicles = 0
self._generate_infrastructures(number_of_infrastructures)
@property
def road_length(self) -> int:
"""
Return the road length in m.
"""
return self._road_length
@property
def number_of_lanes(self) -> int:
"""
Return the number of lanes.
"""
return self._number_of_lanes
@property
def step_length(self) -> float:
"""
Return the length of a simulation step.
"""
return self._step_length
@property
def step(self) -> int:
"""
Return the current simulation step.
"""
return self._step
[docs] def _call_vehicle_actions(self):
"""
Triggers actions on all vehicles in the simulation.
"""
for vehicle in self._vehicles.values():
vehicle.action(self._step)
[docs] def _call_infrastructure_actions(self):
"""
Triggers actions on all infrastructures in the simulation.
"""
for infrastructure in self._infrastructures.values():
infrastructure.action(self._step)
[docs] def _get_predecessor(self, vehicle: Vehicle, lane: int = -1) -> Vehicle:
"""
Return the preceding (i.e., front) vehicle for a given vehicle on a given lane.
Parameters
----------
vehicle : Vehicle
The vehicle to consider
lane : int, optional
The lane to consider.
A lane of -1 indicates the vehicle's current lane.
"""
if lane == -1:
# implicitly search on current lane of vehicle
lane = vehicle._lane
predecessor = None # there is no predecessor so far
candidates = {
v._vid: v.rear_position for v in self._vehicles.values() if
v._lane == lane and # correct lane
v._position >= vehicle._position # in front this vehicle
}
# not this vehicle
candidates.pop(vehicle._vid, None)
# find candidate with smallest rear_position (min)
# we do not check for collisions here because this method is also called within an update step
if candidates:
predecessor = self._vehicles[min(candidates, key=candidates.get)]
return predecessor
[docs] def _get_successor(self, vehicle: Vehicle, lane: int = -1) -> Vehicle:
"""
Return the succeeding (i.e., back) vehicle for a given vehicle on a given lane.
Parameters
----------
vehicle : Vehicle
The vehicle to consider
lane : int, optional
The lane to consider.
A lane of -1 indicates the vehicle's current lane.
"""
if lane == -1:
# implicitly search on current lane of vehicle
lane = vehicle._lane
successor = None # there is no successor so far
candidates = {
v._vid: v._position for v in self._vehicles.values() if
v._lane == lane and # correct lane
v._position <= vehicle._position # behind this vehicle
}
# not this vehicle
candidates.pop(vehicle._vid, None)
# find candidate with largest_position (max)
# we do not check for collisions here because this method is also called within an update step
if candidates:
successor = self._vehicles[max(candidates, key=candidates.get)]
return successor
[docs] def _get_predecessor_rear_position(self, vehicle: Vehicle, lane: int = -1) -> float:
"""
Return the rear position of the preceding (i.e., front) vehicle for a given vehicle on a given lane.
Parameters
----------
vehicle : Vehicle
The vehicle to consider
lane : int, optional
The lane to consider.
A lane of -1 indicates the vehicle's current lane.
"""
p = self._get_predecessor(vehicle, lane)
if not p:
return -1
return p.rear_position
[docs] def _get_predecessor_speed(self, vehicle: Vehicle, lane: int = -1) -> float:
"""
Return the speed of the preceding (i.e., front) vehicle for a given vehicle on a given lane.
Parameters
----------
vehicle : Vehicle
The vehicle to consider
lane : int, optional
The lane to consider.
A lane of -1 indicates the vehicle's current lane.
"""
p = self._get_predecessor(vehicle, lane)
if not p:
return -1
return p.speed
[docs] def _remove_arrived_vehicles(self, arrived_vehicles: list):
"""
Remove arrived vehicles from the simulation.
Parameters
----------
arrived_vehicles : list
The ids of arrived vehicles
"""
for vid in arrived_vehicles:
# call finish on arrived vehicle
self._vehicles[vid].finish()
# remove arrived vehicle from the GUI
if self._gui and self._step >= self._gui_start:
remove_gui_vehicle(vid)
# remove from vehicles
del self._vehicles[vid]
[docs] def _generate_vehicles(self):
"""
Add pre-filled vehicles to the simulation.
"""
LOG.debug(f"Pre-filling the road network with {self._number_of_vehicles} vehicles")
assert not self._vehicles
assert self._last_vehicle_id == -1
for vid in tqdm(range(0, self._number_of_vehicles), desc="Generated vehicles", disable=not self._progress):
desired_speed = get_desired_speed(
desired_speed=self._desired_speed,
rng=self._rng,
speed_variation=self._speed_variation,
min_desired_speed=self._min_desired_speed,
max_desired_speed=self._max_desired_speed,
random_desired_speed=self._random_desired_speed,
)
# TODO remove duplicated code
if self._start_as_platoon:
depart_time = 0
depart_position = (self._number_of_vehicles - vid) * (vtype._length + self._cacc_spacing) - self._cacc_spacing
if depart_position >= self._road_length:
sys.exit("Too many vehicles for this road length!")
depart_lane = 0
if vid == 0:
# pick regular departure speed
depart_speed = get_depart_speed(
desired_speed=desired_speed,
rng=self._rng,
depart_desired=self._depart_desired,
random_depart_speed=self._random_depart_speed,
)
else:
# always set speed to leader speed
depart_speed = self._vehicles[0]._depart_speed
else:
depart_time = -1 # since this is a pre-filled vehicle, we cannot say when it departed
# always use desired speed for pre-fill vehicles
depart_speed = desired_speed
# assume we have a collision to check at least once
collision = True
while collision:
collision = False
# actual calculation of position and lane
# always use random position for pre-filled vehicle
# we do not consider departure interval here since this is supposed to be a snapshot from an earlier point of simulation
# make sure to also include the end of the road itself
# consider length, equal to departPos="base" in SUMO
# we assume that a vehicle has to drive at least 1m
depart_position = self._rng.uniform(vtype._length, self._road_length - 1)
# always use random lane for pre-filled vehicle
depart_lane = self._rng.randrange(0, self._number_of_lanes, 1)
LOG.trace(f"Generated random departure position for vehicle {vid}: {depart_position}-{vtype.length},{depart_lane}")
if not self._vehicles:
continue
# avoid a collision with an existing vehicle
for other_vehicle in self._vehicles.values():
if other_vehicle._lane != depart_lane:
# we do not care about other lanes
continue
if other_vehicle.position - other_vehicle.length > depart_position + 100 or other_vehicle.position < depart_position - vtype.length - 100:
# we do not care about vehicles that are too far away
continue
# do we have a collision?
# avoid being inserted in between two platoon members by also considering the min gap
if has_collision(
position1=depart_position + vtype._min_gap, # front collider
rear_position1=depart_position - vtype._length, # rear collider
lane1=depart_lane,
position2=other_vehicle.position + other_vehicle.min_gap, # front collider
rear_position2=other_vehicle.rear_position, # rear collider
lane2=other_vehicle.lane,
):
collision = True
LOG.trace(f"Collision between {vid} and {other_vehicle.vid}")
break
# do we have a "collision" in the next step?
# TODO use corresponding vtype
if not is_insert_safe(
depart_position=depart_position,
depart_speed=depart_speed,
vtype=vtype,
other_vehicle=other_vehicle,
step_length=self._step_length,
):
collision = True
LOG.trace(f"Unsafe insert between {vid} and {other_vehicle.vid}")
break
# use desired headway time and current speed to avoid harsh braking
if other_vehicle._position <= depart_position:
# the other vehicle is behind the current vehicle
gap = depart_position - vtype.length - other_vehicle.position
gap_desired = gap >= max(
other_vehicle.desired_headway_time * other_vehicle.speed * self._step_length,
other_vehicle.min_gap,
)
else:
# the current vehicle is behind the other vehicle
gap = other_vehicle.position - other_vehicle.length - depart_position
gap_desired = gap >= max(
vtype.headway_time * depart_speed * self._step_length, # TODO use corresponding vtype
vtype.min_gap,
)
if not gap_desired:
collision = True
LOG.trace(f"No desired gap between {vid} and {other_vehicle.vid} ({gap}m)")
break
assert gap >= vtype.min_gap, f"{vid}, {other_vehicle.vid}, {gap, vtype.min_gap}"
arrival_position = get_arrival_position(
depart_position=depart_position,
road_length=self._road_length,
ramp_interval=self._ramp_interval,
min_trip_length=self._minimum_trip_length,
max_trip_length=self._maximum_trip_length,
rng=self._rng,
random_arrival_position=self._random_arrival_position,
pre_fill=True,
)
self._add_vehicle(
vid=vid,
vtype=vtype,
depart_position=depart_position,
arrival_position=arrival_position,
desired_speed=desired_speed,
depart_lane=depart_lane,
depart_speed=depart_speed,
depart_time=depart_time,
pre_filled=True,
)
LOG.trace(f"Generated vehicle {vid} at {depart_position}-{depart_position - vtype._length},{depart_lane} with {depart_speed}")
if self._start_as_platoon:
self._initialize_prefilled_platoon()
[docs] def _vehicles_to_be_scheduled(self) -> int:
"""
Calculate how many vehicles should be spawned according to the departure method.
Returns
-------
int : The number of vehicles to be spawned within this time step
"""
vehicles_to_be_scheduled = -1
if not self._depart_flow and self._last_vehicle_id >= self._number_of_vehicles - 1:
# limit the spawn by a maximum number of total vehicles
LOG.debug(f"All {self._number_of_vehicles} vehicles have been spawned already")
# clear scheduled vehicles
vehicles_to_be_scheduled = 0
else:
# 1) number of new vehicles
if self._depart_method == "probability":
# spawn probability per time step, similar to SUMO's flow parameter probability
if self._step == 0:
# special case in step 0 to avoid not having any vehicles and thus stopping the simulation
vehicles_to_be_scheduled = 1
else:
# interpret comparison result as vehicle spawn
vehicles_to_be_scheduled = int(self._rng.random() <= self._depart_probability)
else:
# 1) estimate how many vehicles their need to be with the given effective departure rate
# 2) subtract all already generated (i.e., finished, spawned, queued)
# and all pre-filled vehicles
desired_number_vehicles = int(self._step * self._effective_depart_rate) + 1
# without pre-filled vehicles
total_number_scheduled_vehicles = self._last_vehicle_id + 1 - self._number_of_prefilled_vehicles
vehicles_to_be_scheduled = desired_number_vehicles - total_number_scheduled_vehicles
return vehicles_to_be_scheduled
[docs] def _spawn_vehicles(self, vdf: pd.DataFrame):
"""
Spawns vehicles within the current step.
1) Calculate how many vehicles should be spawned according to the departure method
2) Calculate properties for these vehicles (e.g., desired speed)
3) Add vehicles to spawn queue
4) Spawn as many vehicles as possible from the queue (sorted by waiting time)
5) Update queue
"""
# 1) how many vehicles
vehicles_to_be_scheduled = self._vehicles_to_be_scheduled()
assert vehicles_to_be_scheduled >= 0
LOG.trace(f"I need to schedule {vehicles_to_be_scheduled} new vehicles in this step ({self._step}).")
# 2) vehicle properties
new_vehicles = []
for vid in range(vehicles_to_be_scheduled):
vid += self._last_vehicle_id + 1
desired_speed = get_desired_speed(
desired_speed=self._desired_speed,
rng=self._rng,
speed_variation=self._speed_variation,
min_desired_speed=self._min_desired_speed,
max_desired_speed=self._max_desired_speed,
random_desired_speed=self._random_desired_speed,
)
new_vehicles.append(
{
'vid': vid,
'desired_speed': desired_speed,
'depart_speed': get_depart_speed(
desired_speed=desired_speed,
rng=self._rng,
depart_desired=self._depart_desired,
random_depart_speed=self._random_depart_speed,
),
'schedule_time': self._step,
'min_trip_length': self._minimum_trip_length,
'max_trip_length': self._maximum_trip_length,
}
)
# 3) enqueue
LOG.trace(f"Adding {len(new_vehicles)} vehicles to the spawn queue.")
if new_vehicles:
self._last_vehicle_id = new_vehicles[-1]["vid"]
self._vehicle_spawn_queue.extend(new_vehicles)
# 4) spawn
LOG.trace(f"Trying to spawn {len(self._vehicle_spawn_queue)} new vehicles")
# sort by waiting/schedule time (and vid) for fairness
spawned_vehicles_df, not_spawned_vehicles = compute_vehicle_spawns(
vehicles=sorted(self._vehicle_spawn_queue, key=lambda d: (d['schedule_time'], d['vid'])),
vdf=vdf,
ramp_positions=self._ramp_positions,
number_of_lanes=self._number_of_lanes,
current_step=self._step,
rng=self._rng,
random_arrival_position=self._random_arrival_position,
random_depart_position=self._random_depart_position,
depart_all_lanes=self._depart_all_lanes,
step_length=self._step_length,
)
LOG.debug(f"Spawned {len(spawned_vehicles_df)} new vehicles")
for row in spawned_vehicles_df.itertuples():
# already adds the vehicle to the dict
# TODO remove asserts here
assert row.depart_position >= 0
assert row.depart_position < self._road_length
assert row.arrival_position <= self._road_length
assert row.arrival_position > row.depart_position
assert row.arrival_position - row.depart_position <= self._maximum_trip_length
# TODO duplicate
assert row.arrival_position <= row.depart_position + self._maximum_trip_length
assert row.depart_position - vtype.length <= self._road_length - self._minimum_trip_length
# TODO duplicate
assert row.arrival_position >= row.depart_position + self._minimum_trip_length - vtype.length
# TODO: add to global vdf once it is available
vehicle = self._add_vehicle(
vid=row.vid,
vtype=vtype,
depart_position=row.depart_position,
arrival_position=row.arrival_position,
desired_speed=row.desired_speed,
depart_lane=row.depart_lane,
depart_speed=row.depart_speed,
depart_time=row.depart_time,
depart_delay=row.depart_delay,
)
if self._gui and self._step >= self._gui_start:
add_gui_vehicle(
vehicle.vid,
vehicle.position,
vehicle.lane,
vehicle.speed,
color=(
vehicle.platoon.leader._color
if (isinstance(vehicle, PlatooningVehicle) and vehicle.is_in_platoon)
else vehicle._color
),
track=vehicle.vid == self._gui_track_vehicle,
)
LOG.trace(f"Spawned vehicle {vehicle.vid} ({vehicle.depart_position}-{vehicle.rear_position},{vehicle.depart_lane}).")
# 5) enqueue remaining
if not_spawned_vehicles:
LOG.warning(f"Could not spawn a total of {len(not_spawned_vehicles)} vehicles within this step, putting them back into the queue!")
self._vehicle_spawn_queue = not_spawned_vehicles
assert len({v['vid'] for v in self._vehicle_spawn_queue}) == len(self._vehicle_spawn_queue)
return len(spawned_vehicles_df)
[docs] def _add_vehicle(
self,
vid: int,
vtype: VehicleType,
depart_position: float,
arrival_position: float,
desired_speed: float,
depart_lane: int,
depart_speed: float,
depart_time: float,
depart_delay: float = 0,
communication_range: int = DEFAULTS['communication_range'],
pre_filled: bool = False,
) -> Vehicle:
"""
Add a vehicle to the simulation based on the given parameters.
NOTE: Make sure that you set last_vehicle_id correctly.
Parameters
----------
vid : int
The id of the vehicle
vtype : VehicleType
The vehicle type of the vehicle
depart_position : int
The departure position of the vehicle
arrival_position : int
The arrival position of the vehicle
desired_speed : float
The desired driving speed of the vehicle
depart_lane : int
The departure lane of the vehicle
depart_speed : float
The departure speed of the vehicle
depart_time : float
The actual departure time of the vehicle
depart_delay : float, optional
The time the vehicle had to wait before starting its trip
communication_range : int, optional
The maximum communication range of the vehicle
pre_filled : bool, optional
Whether this vehicle was pre-filled
Returns
-------
Vehicle
The added vehicle
"""
# choose vehicle "type" depending on the penetration rate
if self._rng.random() <= self._penetration_rate:
vehicle = PlatooningVehicle(
simulator=self,
vid=vid,
vehicle_type=vtype,
depart_position=depart_position,
arrival_position=arrival_position,
desired_speed=desired_speed,
depart_lane=depart_lane,
depart_speed=depart_speed,
depart_time=depart_time,
depart_delay=depart_delay,
communication_range=communication_range,
acc_headway_time=self._acc_headway_time,
cacc_spacing=self._cacc_spacing,
formation_algorithm=self._formation_algorithm if self._formation_strategy == "distributed" else None,
execution_interval=self._execution_interval,
pre_filled=pre_filled,
**self._kwargs,
)
else:
vehicle = Vehicle(
simulator=self,
vid=vid,
vehicle_type=vtype,
depart_position=depart_position,
arrival_position=arrival_position,
desired_speed=desired_speed,
depart_lane=depart_lane,
depart_speed=depart_speed,
depart_time=depart_time,
depart_delay=depart_delay,
communication_range=self._communication_range,
pre_filled=pre_filled,
)
# add instance
self._vehicles[vid] = vehicle
return vehicle
[docs] def _generate_infrastructures(self, number_of_infrastructures: int):
"""
Generate infrastructures for the simulation.
Parameters
----------
number_of_infrastructures : int
The number of infrastructures to generate
"""
if number_of_infrastructures <= 0:
return
placement_interval = self._road_length / number_of_infrastructures
for iid in tqdm(range(0, number_of_infrastructures), desc="Generated infrastructures", disable=not self._progress):
position = (iid + 0.5) * placement_interval
infrastructure = Infrastructure(
self,
iid,
position,
self._formation_algorithm if self._formation_strategy == "centralized" else None,
self._execution_interval,
**self._kwargs,
)
self._infrastructures[iid] = infrastructure
LOG.info(f"Generated infrastructure {infrastructure.iid} at {position}")
[docs] def _initialize_result_recording(self):
"""
Create output files for all (enabled) statistics and writes the headers.
"""
# write some general information about the simulation
record_general_data_begin(basename=self._result_base_filename, simulator=self)
if self._record_vehicle_trips:
# create output file for vehicle trips
initialize_vehicle_trips(basename=self._result_base_filename)
if self._record_vehicle_emissions:
# create output file for vehicle emissions
initialize_vehicle_emissions(basename=self._result_base_filename)
if self._record_platoon_trips:
# create output file for platoon trips
initialize_platoon_trips(basename=self._result_base_filename)
if self._record_platoon_maneuvers:
# create output file for platoon maneuvers
initialize_platoon_maneuvers(basename=self._result_base_filename)
if self._record_platoon_formation:
# create output file for platoon formation
initialize_platoon_formation(basename=self._result_base_filename)
# traces
if self._record_vehicle_traces:
# create output file for vehicle traces
initialize_vehicle_traces(basename=self._result_base_filename)
if self._record_vehicle_changes:
# create output file for vehicle lane changes
initialize_vehicle_changes(basename=self._result_base_filename)
if self._record_emission_traces:
# create output file for emission traces
initialize_emission_traces(basename=self._result_base_filename)
if self._record_platoon_traces:
# create output file for platoon traces
initialize_platoon_traces(basename=self._result_base_filename)
if self._record_vehicle_platoon_traces:
# create output file for vehicle platoon traces
initialize_vehicle_platoon_traces(basename=self._result_base_filename)
if self._record_platoon_changes:
# create output file for platoon lane changes
initialize_platoon_changes(basename=self._result_base_filename)
# create output file for vehicle_platoon lane changes
initialize_vehicle_platoon_changes(basename=self._result_base_filename)
if self._record_simulation_trace:
# create output file for continuous simulation trace
initialize_simulation_trace(basename=self._result_base_filename)
if self._record_vehicle_teleports:
# create output file for vehicle teleports
initialize_vehicle_teleports(basename=self._result_base_filename)
[docs] def _initialize_prefilled_platoon(self):
"""
Initialize all pre-filled vehicles as one platoon.
"""
# we avoid the complicated join procedure and simply set all parameters
leader = self._vehicles[0]
leader._platoon_role = PlatoonRole.LEADER
leader._platoon._formation = list(self._vehicles.values())
if self._update_desired_speed:
leader.platoon.update_desired_speed()
leader.platoon.update_limits()
leader._cf_target_speed = leader.platoon.desired_speed
leader._first_platoon_join_time = 0
leader._last_platoon_join_time = 0
leader._first_platoon_join_position = leader._position
leader._last_platoon_join_position = leader._position
for vehicle in list(self._vehicles.values())[1:]:
vehicle._platoon_role = PlatoonRole.FOLLOWER
vehicle._cf_model = CF_Model.CACC
vehicle._blocked_front = False
vehicle._platoon = leader.platoon
vehicle._cf_target_speed = vehicle._platoon.desired_speed
vehicle._first_platoon_join_time = 0
vehicle._last_platoon_join_time = 0
vehicle._first_platoon_join_position = vehicle._position
vehicle._last_platoon_join_position = vehicle._position
[docs] def _initialize_gui(self):
"""
Initialize the GUI.
"""
# start GUI
start_gui(self._sumo_config, self._step_length, self._gui_play)
# set correct boundary and zoom
set_gui_window(road_length=self._road_length)
# draw ramps
if self._draw_ramps:
draw_ramps(
road_length=self._road_length,
interval=self._ramp_interval,
labels=self._draw_ramp_labels,
)
# draw road end
if self._draw_road_end:
draw_road_end(
road_length=self._road_length,
label=self._draw_road_end_label
)
# draw infrastructures
if self._draw_infrastructures:
draw_infrastructures(
infrastructures=self._infrastructures.values(),
labels=self._draw_infrastructure_labels,
)
# draw pre-filled vehicles
for vehicle in self._vehicles.values():
add_gui_vehicle(
vehicle.vid,
vehicle.position,
vehicle.lane,
vehicle.speed,
color=(
vehicle.platoon.leader._color
if (isinstance(vehicle, PlatooningVehicle) and vehicle.is_in_platoon)
else vehicle._color
),
track=vehicle.vid == self._gui_track_vehicle,
)
[docs] def _update_gui(self):
"""
Update the GUI.
"""
for vehicle in self._vehicles.values():
# update vehicles
move_gui_vehicle(vehicle.vid, vehicle.position, vehicle.lane, vehicle.speed)
# remove vehicles not in simulator
prune_vehicles(keep_vids=self._vehicles.keys())
# sleep for visualization
time.sleep(self._gui_delay)
[docs] def run(self):
"""
Run the simulation with the specified parameters until it is stopped.
Main simulation method.
This is based on Krauss' multi lane traffic:
laneChange();
adjust();
move();
"""
if not self._running:
self._running = True
else:
LOG.warning("Simulation is already running!")
self._initialize_result_recording()
progress_bar = tqdm(desc='Simulation progress', total=self._max_step, unit='step', disable=not self._progress)
# let the simulator run
while self._running:
start_time = timer()
if self._step >= self._max_step:
self.stop("Reached step limit")
continue
# initialize the GUI
if self._gui and self._step == self._gui_start:
self._initialize_gui()
# spawn vehicle based on given parameters
vehicles_spawned = self._spawn_vehicles(self._get_vehicles_df())
# statistics
vehicles_in_simulator = len(self._vehicles)
vehicles_in_queue = len(self._vehicle_spawn_queue)
if self._vehicles:
# update the GUI
if self._gui and self._step >= self._gui_start:
self._update_gui()
# call regular actions on vehicles
self._call_vehicle_actions()
# call regular actions on infrastructure
self._call_infrastructure_actions()
# BEGIN VECTORIZATION PART
# TODO move upwards/get rid of it entirely
# convert dict of vehicles to Dataframe (temporary)
vdf = self._get_vehicles_df()
vdf = vdf.sort_values(["position", "lane"], ascending=False)
# perform lane changes (for all vehicles)
# update neighbor data (predecessor, successor, front)
lane_changes = compute_lane_changes(
vdf=vdf,
max_lane=self._number_of_lanes - 1,
step_length=self._step_length,
)
# apply lane changes
vdf['old_lane'] = vdf['lane']
vdf['lane'] = lane_changes['lane']
vdf['lc_reason'] = lane_changes['reason']
# record lane changes
# TODO move to better location
if self._record_vehicle_changes or self._record_platoon_changes:
self._record_lane_changes(vdf)
# update neighbor data (predecessor, successor, front)
predecessor_map = lane_predecessors(vdf, self._number_of_lanes - 1)
predecessor = get_predecessors(
vdf=vdf,
predecessor_map=predecessor_map,
target_lane=vdf.lane,
).rename(columns=lambda col: "predecessor_" + col)
assert predecessor[predecessor.predecessor_vid != -1].predecessor_vid.is_unique
# adjust speed (of all vehicles)
new_speed = compute_new_speeds(
vdf.merge(predecessor, left_index=True, right_index=True),
step_length=self._step_length,
)
# apply new speed
vehicles_braking_rough = report_rough_braking(vdf, new_speed, step_length=self._step_length)
vdf['old_speed'] = vdf['speed']
vdf['speed'] = new_speed
vdf['blocked_front'] = (
(vdf.speed < vdf.max_speed)
& ((vdf.speed - vdf.old_speed) <= 0)
& (vdf.cf_model != CF_Model.CACC)
)
average_vehicle_speed = vdf.speed.mean()
# adjust positions (of all vehicles)
vdf = update_position(vdf, self._step_length)
# convert Dataframe back to dict of vehicles
self._write_back_vehicles_df(vdf)
# get arrived vehicles
arrived_vehicles = vdf[
(vdf.position >= vdf.arrival_position)
].index.values
# remove arrived vehicles from Dataframe
vdf = vdf.drop(arrived_vehicles)
# do collision check (for all vehicles)
# without arrived vehicles
if self._collisions and check_collisions(vdf):
# record final vehicle trace entries
if self._record_vehicle_traces:
for v in self._vehicles.values():
record_vehicle_trace(
basename=self._result_base_filename,
step=self._step + self._step_length,
vehicle=v,
)
sys.exit(f"ERROR [{__name__}]: There were collisions between vehicles!")
# remove arrived vehicles from dict and do finish
self._remove_arrived_vehicles(arrived_vehicles)
# make sure that everything is correct
assert list(vdf.index).sort() == list(self._vehicles.keys()).sort()
del vdf
# END VECTORIZATION PART
else:
if not self._vehicle_spawn_queue:
self.stop("No more vehicles in the simulation") # do we really want to exit here?
# statistics
arrived_vehicles = []
average_vehicle_speed = 0
vehicles_braking_rough = 0
end_time = timer()
# record some periodic statistics
run_time = end_time - start_time
self._statistics(
vehicles_in_simulator=vehicles_in_simulator,
vehicles_in_queue=vehicles_in_queue,
vehicles_spawned=vehicles_spawned,
vehicles_arrived=len(arrived_vehicles),
runtime=run_time,
average_vehicle_speed=average_vehicle_speed,
vehicles_braking_rough=vehicles_braking_rough,
)
# a new step begins
self._step += self._step_length
progress_bar.update(self._step_length)
if self._gui and self._step > self._gui_start:
gui_step(target_step=self._step, screenshot_filename=self._screenshot_file)
# We reach this point only by setting self._running to False
# which is only done by calling self.stop()
self._finish()
return self._step
[docs] def _statistics(
self,
vehicles_in_simulator: int,
vehicles_in_queue: int,
vehicles_spawned: int,
vehicles_arrived: int,
runtime: float,
average_vehicle_speed: float,
vehicles_braking_rough: int,
):
"""
Record some period statistics.
Parameters
----------
vehicles_in_simulator : int
The number of vehicles in the scenario within this step
vehicles_in_queue : int
The number of vehicles in the spawn queue within this step
vehicles_spawned : int
The number of vehicles that departed within this step
vehicles_arrived : int
The number of vehicles that arrived within this step
runtime : float
The run time of this step
average_vehicle_speed : int
The average driving speed amog all vehicles in the scenario within this step
vehicles_braking_rough : int
The number of vehicles performing rough braking within this step
"""
# average number of vehicles in simulation
self._avg_number_vehicles = float(
(self._values_in_avg_number_vehicles * self._avg_number_vehicles + vehicles_in_simulator) /
(self._values_in_avg_number_vehicles + 1)
)
# average number of vehicles in the spawn queue
self._avg_number_vehicles_queue = float(
(self._values_in_avg_number_vehicles_queue * self._avg_number_vehicles_queue + vehicles_in_queue) /
(self._values_in_avg_number_vehicles_queue + 1)
)
# average number of vehicles spawned (departure flow)
self._avg_number_vehicles_spawned = float(
(self._values_in_avg_number_vehicles_spawned * self._avg_number_vehicles_spawned + vehicles_spawned) /
(self._values_in_avg_number_vehicles_spawned + 1)
)
# average number of vehicles arrival (arrival flow)
self._avg_number_vehicles_arrived = float(
(self._values_in_avg_number_vehicles_arrived * self._avg_number_vehicles_arrived + vehicles_arrived) /
(self._values_in_avg_number_vehicles_arrived + 1)
)
# average vehicle speed (HACK)
self._avg_vehicle_speed = float(
(self._values_in_avg_vehicle_speed * self._avg_vehicle_speed + average_vehicle_speed) /
(self._values_in_avg_vehicle_speed + 1)
)
# average number of vehicles braking rough
self._avg_number_vehicles_braking_rough = float(
(self._values_in_avg_number_vehicles_braking_rough * self._avg_number_vehicles_braking_rough + vehicles_braking_rough) /
(self._values_in_avg_number_vehicles_braking_rough + 1)
)
if self._record_simulation_trace:
# write continuous simulation traces
record_simulation_trace(
basename=self._result_base_filename,
step=self._step,
vehicles_in_simulator=vehicles_in_simulator,
vehicles_in_queue=vehicles_in_queue,
vehicles_spawned=vehicles_spawned,
vehicles_arrived=vehicles_arrived,
runtime=runtime,
average_vehicle_speed=average_vehicle_speed,
vehicles_braking_rough=vehicles_braking_rough,
)
[docs] def _record_lane_changes(self, vdf: pd.DataFrame):
"""
Record lane changes.
Parameters
----------
vdf : pd.DataFrame
The Dataframe containing the vehicles as rows
index: vid
columns: [position, length, lane, ..]
"""
for row in vdf[vdf.lane != vdf.old_lane].itertuples():
if row.cf_model != CF_Model.CACC:
if self._record_vehicle_changes:
# record lane change for normal vehicles (not CACC)
record_vehicle_change(
basename=self._result_base_filename,
step=self._step,
vid=row.Index,
position=row.position,
speed=row.speed,
source_lane=row.old_lane,
target_lane=row.lane,
reason=row.lc_reason,
)
if self._record_platoon_changes and row.platoon_role == PlatoonRole.LEADER:
assert row.cf_model == CF_Model.ACC
# record lane change for platoon leaders (ACC)
record_platoon_change(
basename=self._result_base_filename,
step=self._step,
leader=self._vehicles[row.Index],
source_lane=row.old_lane,
target_lane=row.lane,
reason=row.lc_reason,
)
else:
assert row.platoon_role == PlatoonRole.FOLLOWER
if self._record_platoon_changes:
# record lane change for platoon followers (CACC)
record_vehicle_platoon_change(
basename=self._result_base_filename,
step=self._step,
member=self._vehicles[row.Index],
source_lane=row.old_lane,
target_lane=row.lane,
reason=row.lc_reason,
)
[docs] def _get_vehicles_df(self) -> pd.DataFrame:
"""
Return a pandas Dataframe from the internal data structure.
Returns
-------
pandas.DataFrame
The Dataframe containing the vehicles as rows
index: vid
columns: [position, length, lane, ..]
"""
platoon_fields = [
"leader_id",
"platoon_id",
"platoon_desired_speed",
"platoon_max_speed",
"platoon_max_acceleration",
"platoon_max_deceleration",
"platoon_position",
"platoon_rear_position",
]
def get_platoon_data(vehicle):
data = {key: 1e15 for key in platoon_fields}
data["leader_id"] = -1
data["platoon_id"] = -1
data["platoon_role"] = None
if not isinstance(vehicle, PlatooningVehicle):
return data
platoon = vehicle._platoon
data["leader_id"] = platoon._formation[0].vid
data["platoon_id"] = platoon._platoon_id
data["platoon_role"] = vehicle._platoon_role
data["platoon_desired_speed"] = platoon._desired_speed
data["platoon_max_speed"] = platoon.max_speed
data["platoon_max_acceleration"] = platoon.max_acceleration
data["platoon_max_deceleration"] = platoon.max_deceleration
data["platoon_position"] = platoon.position
data["platoon_rear_position"] = platoon.rear_position
return data
fields = [
"arrival_position",
"cf_target_speed",
"position",
"lane",
"speed",
"cf_model",
"vid",
]
vtype_fields = [
"length",
"min_gap",
"max_speed",
"max_acceleration",
"max_deceleration",
]
if not self._vehicles:
return pd.DataFrame({key: [] for key in fields + vtype_fields + platoon_fields + ["desired_headway_time", "acc_lambda"]}).drop(["cf_target_speed"], axis="columns")
return (
pd.DataFrame([
dict(
**{key: vars(vehicle)[f"_{key}"] for key in fields},
**{key: vars(vehicle._vehicle_type)[f"_{key}"] for key in vtype_fields},
**get_platoon_data(vehicle),
desired_headway_time=vehicle.desired_headway_time,
acc_lambda=getattr(vehicle, "_acc_lambda", np.nan),
rear_position=vehicle.rear_position,
)
for vehicle in self._vehicles.values()
])
.astype({"cf_model": CFModelDtype, "platoon_role": PlatoonRoleDtype})
.set_index('vid')
# compute effective limits
# These computations may negatively impact performance, but will
# eventually be removed from here anyway once the Dataframe stays
# and computations/updates move to actions (like platoon updates).
.assign(
max_speed=lambda df: df[["max_speed", "cf_target_speed", "platoon_max_speed", "platoon_desired_speed"]].min(axis="columns"),
max_acceleration=lambda df: df[["max_acceleration", "platoon_max_acceleration"]].min(axis="columns"),
max_deceleration=lambda df: df[["max_deceleration", "platoon_max_deceleration"]].min(axis="columns"),
rear_position=lambda df: df[["rear_position", "platoon_rear_position"]].min(axis="columns"),
)
.drop(["cf_target_speed"], axis="columns")
)
[docs] def _write_back_vehicles_df(self, vdf: pd.DataFrame):
"""
Write back the vehicle updates from a given pandas dataframe to the internal data structure.
Parameters
----------
vdf : pandas.DataFrame
The Dataframe containing the vehicles as rows
index: vid
columns: [position, length, lane, ..]
"""
# make sure that everything is correct
assert list(vdf.index).sort() == list(self._vehicles.keys()).sort()
for row in vdf.itertuples():
# update all fields within the data that we updated with pandas
vehicle = self._vehicles[row.Index]
vehicle._position = row.position
vehicle._speed = row.speed
vehicle._acceleration = row.speed - row.old_speed
vehicle._blocked_front = row.blocked_front
vehicle._lane = row.lane
[docs] def stop(self, msg: str):
"""
Stop the simulation with the given message.
Parameters
----------
msg : str
The message to show after stopping the simulation
"""
self._running = False
if self._progress:
print(f"\n{msg}")
def __str__(self) -> str:
"""
Return a str representation of a simulator instance.
"""
sim_dict = vars(self).copy()
sim_dict.pop('_vehicles')
sim_dict.pop('_infrastructures')
sim_dict.update({'current_number_of_vehicles': len(self._vehicles)})
sim_dict.update({'current_number_of_infrastructures': len(self._infrastructures)})
return str(dict(sorted(sim_dict.items())))
[docs] def _finish(self):
"""
Clean up the simulation.
"""
if self._running:
LOG.warning("Finish called during simulation!")
return
# write some general information about the simulation
record_general_data_end(basename=self._result_base_filename, simulator=self)
# call finish on infrastructures
for infrastructure in self._infrastructures.values():
infrastructure.finish()
# call finish on remaining vehicles?
for vehicle in self._vehicles.values():
# we do not want to write statistics for not finished vehicles
# therefore, we do not call finish here
if self._gui and self._step >= self._gui_start:
remove_gui_vehicle(vehicle._vid)
# remove from vehicles
del vehicle
if self._gui and self._step >= self._gui_start:
close_gui()
DUMMY = pd.DataFrame([
{
'position': HIGHVAL,
'speed': HIGHVAL,
'lane': -1,
'length': 0.1,
'max_acceleration': 0,
'max_deceleration': 0,
'min_gap': 0,
'desired_headway_time': 0,
},
{
'position': -HIGHVAL,
'speed': 0,
'lane': -1,
'length': 0.1,
'max_acceleration': 0,
'max_deceleration': 0,
'min_gap': 0,
'desired_headway_time': 0,
},
])
# TODO move to mobility
[docs]def compute_vehicle_spawns(
vehicles: list,
vdf: pd.DataFrame,
ramp_positions: list,
number_of_lanes: int,
current_step: float,
rng: random.Random,
random_depart_position: bool,
random_arrival_position: bool,
depart_all_lanes: bool,
step_length: float = 1.0,
):
"""
Spawn as many vehicles as possible from the queue (sorted by waiting time).
Assumption: list of vehicles is already sorted ascending by departure priority (e.g., waiting time)
Assumption: ramp positions is sorted in ascending manner
Parameters
----------
vehicles : list(dict)
The list of vehicles to add
vdf : pandas.DataFrame
The Dataframe containing the vehicles as rows
index: vid
columns: [position, length, lane, ..]
ramp_positions : list(int)
The list of available on-ramp positions in m
number_of_lanes : int
The number of available lanes
current_step : float
The current simulation step
rng : random.Random
The random number generator to use
random_depart_position : bool
Whether the vehicles sould depart at a random position
random_arrival_position : bool
Whether the vehicles should arrive at a random position
depart_all_lanes : bool
Whether to use all availale lanes for depature
step_length : float, optioal
The length of a simulation step
"""
if not vehicles:
return pd.DataFrame(), []
# add dummy predecessor to use for all front gap calculations
vdf = pd.concat(
[vdf] + [DUMMY.assign(lane=lane) for lane in range(number_of_lanes)],
ignore_index=True,
).astype({'lane': int})
spawn_positions = ramp_positions if random_depart_position else [ramp_positions[0]]
vehicles_to_spawn = {}
# lane first, then ramp, possibly tuple
# this is done for every possible spawn coordinate (ramp + lane)
# try spawning in lane 0 first
spawn_coordinate_queue = [(position, 0) for position in rng.sample(spawn_positions, len(spawn_positions))]
for position, lane in spawn_coordinate_queue:
if not vehicles:
# no more vehicles to process
break
spawn_position = position + vtype.length
# select predecessors at spawn position
# since we have the dummy, we can assume there is always at least one vehicle
vehicle_after_spawn_position = (
vdf
[(vdf.position >= spawn_position) & (vdf.lane == lane)]
.sort_values('position', ascending=True)
.iloc[0]
)
gap_to_next_vehicle = vehicle_after_spawn_position.position - vtype.length - spawn_position
# select successor of spawn position
# since we have the dummy, we can assume there is always at least one vehicle
vehicle_before_spawn_position = (
vdf
[(vdf.position < spawn_position) & (vdf.lane == lane)]
.sort_values('position', ascending=True)
.iloc[-1]
)
gap_to_previous_vehicle = spawn_position - vtype.length - vehicle_before_spawn_position.position # TODO use corresponding vtype
max_remanining_trip_length = ramp_positions[-1] - position
vehicle_to_spawn_now = None
# TODO use vectorized approach
# assume sorted by waiting time for fairness
# search for vehicle to insert at current spawn position
# check whether it is safe to insert the vehicle
# we use the headway times and the current speed
# and assume that the resulting required gap is big enough to avoid a crash
# in case the corresponding vehicles apply their maximum acceleration/deceleration
# (see mobility.is_gap_safe)
# NOTE: departing with desired speed is not really realistic and unnecessary
# The vehicle could depart already with max(0, rear_vehicle.speed), which will decrease the required gap
for v in vehicles:
# TODO use correct headway time (HUMAN vs. ACC)
# enough space on the road to reach the minimum trip length
trip_possible = max_remanining_trip_length >= v['min_trip_length']
# enough space to the vehicle in front
## avoid a crash
front_gap_safe = is_gap_safe(
front_position=vehicle_after_spawn_position.position,
front_speed=vehicle_after_spawn_position.speed,
front_max_deceleration=vehicle_after_spawn_position.max_deceleration,
front_length=vehicle_after_spawn_position.length,
back_position=spawn_position,
back_speed=v['depart_speed'],
back_max_acceleration=vtype.max_acceleration, # TODO use corresponding vtype
back_min_gap=vtype.min_gap, # TODO use corresponding vtype
step_length=step_length,
)
## avoid decelerating from desired speed, assuming the front vehicles does not change speed
assert vtype.headway_time >= 0
front_gap_desired = gap_to_next_vehicle > max(
# step length not required
vtype.headway_time * v['depart_speed'], # TODO use corresponding vtype
vtype.min_gap,
)
# enough space to the vehicle behind
## avoid a crash
back_gap_safe = is_gap_safe(
front_position=spawn_position,
front_speed=v['depart_speed'],
front_max_deceleration=vtype.max_deceleration, # TODO use corresponding vtype
front_length=vtype.length, # TODO use corresponding vtype
back_position=vehicle_before_spawn_position.position,
back_speed=vehicle_before_spawn_position.speed,
back_max_acceleration=vehicle_before_spawn_position.max_acceleration,
back_min_gap=vehicle_before_spawn_position.min_gap,
step_length=step_length,
)
## avoid deceleration of rear vehicle from desired speed, assuming the speed does not change
assert vehicle_before_spawn_position.desired_headway_time >= 0, vehicle_before_spawn_position
back_gap_desired = gap_to_previous_vehicle > max(
# step length not required
vehicle_before_spawn_position.desired_headway_time * vehicle_before_spawn_position.speed,
vehicle_before_spawn_position.min_gap,
)
if (trip_possible and front_gap_safe and front_gap_desired and back_gap_safe and back_gap_desired):
vehicle_to_spawn_now = v
break
else:
LOG.trace(f"{v['vid']} could not be spawned at {spawn_position} due to constraints (trip: {trip_possible}, front safe: {front_gap_safe}, front desired: {front_gap_desired}, back safe: {back_gap_safe}, back desired: {back_gap_desired})!")
# vehicles are sorted, so we always pick the longest waiting vehicle first :-)
if not vehicle_to_spawn_now:
# no vehicle from the queue fits to the current spawn position
# TODO do not spawn on fastest lane?
if lane < number_of_lanes - 1:
# there might be a spot on a faster lane at this ramp
# thus, try again to insert a vehicle at this ramp
# TODO check if lanes are ordered by speed (cf. keepRight)
if depart_all_lanes:
spawn_coordinate_queue.append((position, lane + 1))
continue
vehicles_to_spawn[vehicle_to_spawn_now['vid']] = {
'vid': vehicle_to_spawn_now['vid'],
'desired_speed': vehicle_to_spawn_now['desired_speed'],
'depart_speed': vehicle_to_spawn_now['depart_speed'],
'speed': vehicle_to_spawn_now['depart_speed'],
'depart_position': spawn_position,
'position': spawn_position,
'depart_lane': lane,
'lane': lane,
'depart_time': current_step,
'depart_delay': current_step - vehicle_to_spawn_now['schedule_time'],
'arrival_position': get_arrival_position(
depart_position=position,
road_length=ramp_positions[-1],
ramp_interval=ramp_positions[1] - ramp_positions[0],
min_trip_length=vehicle_to_spawn_now['min_trip_length'],
max_trip_length=vehicle_to_spawn_now['max_trip_length'],
rng=rng,
random_arrival_position=random_arrival_position,
pre_fill=False,
),
'schedule_time': vehicle_to_spawn_now['schedule_time'],
'min_trip_length': vehicle_to_spawn_now['min_trip_length'],
'max_trip_length': vehicle_to_spawn_now['max_trip_length'],
}
vehicles.remove(vehicle_to_spawn_now)
# TODO make global
columns = {
'vid': int,
'desired_speed': float,
'depart_speed': float,
'speed': float,
'depart_position': float,
'position': float,
'depart_lane': int,
'lane': int,
'depart_time': float,
'depart_delay': float,
'arrival_position': float,
'schedule_time': int,
'min_trip_length': int,
'max_trip_length': int,
}
spawned_vehicles_df = pd.DataFrame(
data=vehicles_to_spawn.values(),
columns=columns.keys()
).astype(columns)
not_spawned_vehicles = vehicles
return spawned_vehicles_df, not_spawned_vehicles