plafosim.cli.plafosim module

class plafosim.cli.plafosim.CustomFormatter(prog, indent_increment=2, max_help_position=24, width=None)[source]

Bases: ArgumentDefaultsHelpFormatter, RawDescriptionHelpFormatter, MetavarTypeHelpFormatter

Metaclass combining multiple formatter classes for argparse.

class _Section(formatter, parent, heading=None)

Bases: object

__init__(formatter, parent, heading=None)
format_help()
__init__(prog, indent_increment=2, max_help_position=24, width=None)
_add_item(func, args)
_dedent()
_expand_help(action)
_fill_text(text, width, indent)
_format_action(action)
_format_action_invocation(action)
_format_actions_usage(actions, groups)
_format_args(action, default_metavar)
_format_text(text)
_format_usage(usage, actions, groups, prefix)
_get_default_metavar_for_optional(action)
_get_default_metavar_for_positional(action)
_get_help_string(action)
_indent()
_iter_indented_subactions(action)
_join_parts(part_strings)
_metavar_formatter(action, default_metavar)
_split_lines(text, width)
add_argument(action)
add_arguments(actions)
add_text(text)
add_usage(usage, actions, groups, prefix=None)
end_section()
format_help()
start_section(heading)
class plafosim.cli.plafosim.Dummy(owner: object, dummy: int = -1, **kw_args)[source]

Bases: FormationAlgorithm

Dummy Platoon Formation Algorithm.

__init__(owner: object, dummy: int = -1, **kw_args)[source]

Initialize an instance of this formation algorithm to be used in a vehicle or an infrastructure.

Parameters:
  • owner (object) – The owning object that is execution this algorithm. This can be either a PlatooningVehicle or an Infrastructure.

  • dummy (int, optional) – The value for the dummy parameter

classmethod add_parser_argument_group(parser: ArgumentParser) _ArgumentGroup[source]

Create and return specific argument group for this algorithm to use in global argument parser.

Parameters:

parser (argparse.ArgumentParser) – The global argument parser

Returns:

The specific argument group for this algorithm

Return type:

argparse._ArgumentGroup

do_formation()[source]

Run platoon formation algorithm to search for a platooning opportunity and perform the corresponding join maneuver.

finish()[source]

Clean up the instance of the formation algorithm.

This includes mostly statistic recording.

_abc_impl = <_abc_data object>
property name

Print the name of the formation algorithm.

class plafosim.cli.plafosim.FormationAlgorithm(owner: object)[source]

Bases: ABC

Abstract base class for any type of platoon formation algorithm (i.e., assignment calculation).

Implementing sub-classes need to override the do_formation() method.

__init__(owner: object)[source]

Initialize an instance of a formation algorithm.

Parameters:

owner (object) – The owning object that is execution this algorithm. This can be either a PlatooningVehicle or an Infrastructure.

abstract add_parser_argument_group(parser: ArgumentParser) _ArgumentGroup[source]

Abstract method for performing any type of platoon formation (i.e., assignment calculation).

This methods needs to be overridden in implementing sub-classes.

Returns:

The specific argument group for this algorithm.

Return type:

argparse._ArgumentGroup

abstract do_formation()[source]

Abstract method for performing any type of platoon formation (i.e., assignment calculation).

This methods needs to be overridden in implementing sub-classes.

finish()[source]

Reserved for future use.

_abc_impl = <_abc_data object>
property name

Print the name of the formation algorithm.

class plafosim.cli.plafosim.Simulator(*, road_length: int = 100000, number_of_lanes: int = 3, ramp_interval: int = 5000, pre_fill: bool = False, number_of_vehicles: int = 100, vehicle_density: float = -1, max_speed: float = 55, acc_headway_time: float = 1.0, cacc_spacing: float = 5.0, penetration_rate: float = 1.0, random_depart_position: bool = False, depart_all_lanes: bool = True, desired_speed: float = 33.0, random_desired_speed: bool = True, speed_variation: float = 0.1, min_desired_speed: float = 22.0, max_desired_speed: float = 44.0, random_depart_speed: bool = False, depart_desired: bool = False, depart_flow: bool = False, depart_method: str = 'interval', depart_interval: float = 2.0, depart_probability: float = 1.0, depart_rate: int = 3600, random_arrival_position: bool = False, minimum_trip_length: int = 0, maximum_trip_length: int = -1000, communication_range: int = 500, distributed_platoon_knowledge: bool = True, distributed_maneuver_knowledge: bool = False, start_as_platoon: bool = False, reduced_air_drag: bool = True, maximum_teleport_distance: int = 2000, maximum_approach_time: int = 60, delay_teleports: bool = True, update_desired_speed: bool = True, formation_algorithm: str | None = None, formation_strategy: str = 'distributed', execution_interval: int = 10, number_of_infrastructures: int = 0, step_length: float = 1.0, max_step: int = 3600, actions: bool = True, collisions: bool = True, random_seed: int = -1, log_level: int = 30, progress: bool = True, gui: bool = False, gui_delay: int = 0, gui_track_vehicle: int = -1, sumo_config: str = 'sumocfg/freeway.sumo.cfg', gui_play: int = True, gui_start: int = 0, draw_ramps: bool = True, draw_ramp_labels: bool = False, draw_road_end: bool = True, draw_road_end_label: bool = True, draw_infrastructures: bool = True, draw_infrastructure_labels: bool = True, screenshot_filename: str | None = None, result_base_filename: str = 'results', record_simulation_trace: bool = False, record_end_trace: bool = True, record_vehicle_trips: bool = False, record_vehicle_emissions: bool = False, record_vehicle_traces: bool = False, record_vehicle_changes: bool = False, record_emission_traces: bool = False, record_platoon_trips: bool = False, record_platoon_maneuvers: bool = False, record_platoon_formation: bool = False, record_platoon_traces: bool = False, record_vehicle_platoon_traces: bool = False, record_platoon_changes: bool = False, record_infrastructure_assignments: bool = False, record_vehicle_teleports: bool = False, record_prefilled: bool = False, **kwargs: dict)[source]

Bases: object

A collection of parameters and information of the simulator.

__init__(*, road_length: int = 100000, number_of_lanes: int = 3, ramp_interval: int = 5000, pre_fill: bool = False, number_of_vehicles: int = 100, vehicle_density: float = -1, max_speed: float = 55, acc_headway_time: float = 1.0, cacc_spacing: float = 5.0, penetration_rate: float = 1.0, random_depart_position: bool = False, depart_all_lanes: bool = True, desired_speed: float = 33.0, random_desired_speed: bool = True, speed_variation: float = 0.1, min_desired_speed: float = 22.0, max_desired_speed: float = 44.0, random_depart_speed: bool = False, depart_desired: bool = False, depart_flow: bool = False, depart_method: str = 'interval', depart_interval: float = 2.0, depart_probability: float = 1.0, depart_rate: int = 3600, random_arrival_position: bool = False, minimum_trip_length: int = 0, maximum_trip_length: int = -1000, communication_range: int = 500, distributed_platoon_knowledge: bool = True, distributed_maneuver_knowledge: bool = False, start_as_platoon: bool = False, reduced_air_drag: bool = True, maximum_teleport_distance: int = 2000, maximum_approach_time: int = 60, delay_teleports: bool = True, update_desired_speed: bool = True, formation_algorithm: str | None = None, formation_strategy: str = 'distributed', execution_interval: int = 10, number_of_infrastructures: int = 0, step_length: float = 1.0, max_step: int = 3600, actions: bool = True, collisions: bool = True, random_seed: int = -1, log_level: int = 30, progress: bool = True, gui: bool = False, gui_delay: int = 0, gui_track_vehicle: int = -1, sumo_config: str = 'sumocfg/freeway.sumo.cfg', gui_play: int = True, gui_start: int = 0, draw_ramps: bool = True, draw_ramp_labels: bool = False, draw_road_end: bool = True, draw_road_end_label: bool = True, draw_infrastructures: bool = True, draw_infrastructure_labels: bool = True, screenshot_filename: str | None = None, result_base_filename: str = 'results', record_simulation_trace: bool = False, record_end_trace: bool = True, record_vehicle_trips: bool = False, record_vehicle_emissions: bool = False, record_vehicle_traces: bool = False, record_vehicle_changes: bool = False, record_emission_traces: bool = False, record_platoon_trips: bool = False, record_platoon_maneuvers: bool = False, record_platoon_formation: bool = False, record_platoon_traces: bool = False, record_vehicle_platoon_traces: bool = False, record_platoon_changes: bool = False, record_infrastructure_assignments: bool = False, record_vehicle_teleports: bool = False, record_prefilled: bool = False, **kwargs: dict)[source]

Initialize a simulator instance.

_add_vehicle(vid: int, vtype: VehicleType, depart_position: float, arrival_position: float, desired_speed: float, depart_lane: int, depart_speed: float, depart_time: float, depart_delay: float = 0, communication_range: int = 500, pre_filled: bool = False) Vehicle[source]

Add a vehicle to the simulation based on the given parameters.

NOTE: Make sure that you set last_vehicle_id correctly.

Parameters:
  • vid (int) – The id of the vehicle

  • vtype (VehicleType) – The vehicle type of the vehicle

  • depart_position (int) – The departure position of the vehicle

  • arrival_position (int) – The arrival position of the vehicle

  • desired_speed (float) – The desired driving speed of the vehicle

  • depart_lane (int) – The departure lane of the vehicle

  • depart_speed (float) – The departure speed of the vehicle

  • depart_time (float) – The actual departure time of the vehicle

  • depart_delay (float, optional) – The time the vehicle had to wait before starting its trip

  • communication_range (int, optional) – The maximum communication range of the vehicle

  • pre_filled (bool, optional) – Whether this vehicle was pre-filled

Returns:

The added vehicle

Return type:

Vehicle

_call_infrastructure_actions()[source]

Triggers actions on all infrastructures in the simulation.

_call_vehicle_actions()[source]

Triggers actions on all vehicles in the simulation.

_finish()[source]

Clean up the simulation.

_generate_infrastructures(number_of_infrastructures: int)[source]

Generate infrastructures for the simulation.

Parameters:

number_of_infrastructures (int) – The number of infrastructures to generate

_generate_vehicles()[source]

Add pre-filled vehicles to the simulation.

_get_predecessor(vehicle: Vehicle, lane: int = -1) Vehicle[source]

Return the preceding (i.e., front) vehicle for a given vehicle on a given lane.

Parameters:
  • vehicle (Vehicle) – The vehicle to consider

  • lane (int, optional) – The lane to consider. A lane of -1 indicates the vehicle’s current lane.

_get_predecessor_rear_position(vehicle: Vehicle, lane: int = -1) float[source]

Return the rear position of the preceding (i.e., front) vehicle for a given vehicle on a given lane.

Parameters:
  • vehicle (Vehicle) – The vehicle to consider

  • lane (int, optional) – The lane to consider. A lane of -1 indicates the vehicle’s current lane.

_get_predecessor_speed(vehicle: Vehicle, lane: int = -1) float[source]

Return the speed of the preceding (i.e., front) vehicle for a given vehicle on a given lane.

Parameters:
  • vehicle (Vehicle) – The vehicle to consider

  • lane (int, optional) – The lane to consider. A lane of -1 indicates the vehicle’s current lane.

_get_successor(vehicle: Vehicle, lane: int = -1) Vehicle[source]

Return the succeeding (i.e., back) vehicle for a given vehicle on a given lane.

Parameters:
  • vehicle (Vehicle) – The vehicle to consider

  • lane (int, optional) – The lane to consider. A lane of -1 indicates the vehicle’s current lane.

_get_vehicles_df() DataFrame[source]

Return a pandas Dataframe from the internal data structure.

Returns:

The Dataframe containing the vehicles as rows index: vid columns: [position, length, lane, ..]

Return type:

pandas.DataFrame

_initialize_gui()[source]

Initialize the GUI.

_initialize_prefilled_platoon()[source]

Initialize all pre-filled vehicles as one platoon.

_initialize_result_recording()[source]

Create output files for all (enabled) statistics and writes the headers.

_record_lane_changes(vdf: DataFrame)[source]

Record lane changes.

Parameters:

vdf (pd.DataFrame) – The Dataframe containing the vehicles as rows index: vid columns: [position, length, lane, ..]

_remove_arrived_vehicles(arrived_vehicles: list)[source]

Remove arrived vehicles from the simulation.

Parameters:

arrived_vehicles (list) – The ids of arrived vehicles

_spawn_vehicles(vdf: DataFrame)[source]

Spawns vehicles within the current step.

  1. Calculate how many vehicles should be spawned according to the departure method

  2. Calculate properties for these vehicles (e.g., desired speed)

  3. Add vehicles to spawn queue

  4. Spawn as many vehicles as possible from the queue (sorted by waiting time)

  5. Update queue

_statistics(vehicles_in_simulator: int, vehicles_in_queue: int, vehicles_spawned: int, vehicles_arrived: int, runtime: float, average_vehicle_speed: float, vehicles_braking_rough: int)[source]

Record some period statistics.

Parameters:
  • vehicles_in_simulator (int) – The number of vehicles in the scenario within this step

  • vehicles_in_queue (int) – The number of vehicles in the spawn queue within this step

  • vehicles_spawned (int) – The number of vehicles that departed within this step

  • vehicles_arrived (int) – The number of vehicles that arrived within this step

  • runtime (float) – The run time of this step

  • average_vehicle_speed (int) – The average driving speed amog all vehicles in the scenario within this step

  • vehicles_braking_rough (int) – The number of vehicles performing rough braking within this step

_update_gui()[source]

Update the GUI.

_vehicles_to_be_scheduled() int[source]

Calculate how many vehicles should be spawned according to the departure method.

Returns:

int

Return type:

The number of vehicles to be spawned within this time step

_write_back_vehicles_df(vdf: DataFrame)[source]

Write back the vehicle updates from a given pandas dataframe to the internal data structure.

Parameters:

vdf (pandas.DataFrame) – The Dataframe containing the vehicles as rows index: vid columns: [position, length, lane, ..]

run()[source]

Run the simulation with the specified parameters until it is stopped.

Main simulation method.

This is based on Krauss’ multi lane traffic: laneChange(); adjust(); move();

stop(msg: str)[source]

Stop the simulation with the given message.

Parameters:

msg (str) – The message to show after stopping the simulation

property number_of_lanes: int

Return the number of lanes.

property road_length: int

Return the road length in m.

property step: int

Return the current simulation step.

property step_length: float

Return the length of a simulation step.

class plafosim.cli.plafosim.SpeedPosition(owner: object, alpha: float = 0.5, speed_deviation_threshold: float = 0.2, position_deviation_threshold: int = 1000, formation_centralized_kind: str = 'greedy', solver_time_limit: int = 60, record_solver_traces: bool = False, record_infrastructure_assignments: bool = False, **kw_args)[source]

Bases: FormationAlgorithm

Platoon Formation Algorithm based on Similarity, considering Speed and Position.

See papers

Julian Heinovski and Falko Dressler, “Where to Decide? Centralized vs. Distributed Vehicle Assignment for Platoon Formation,” IEEE Transactions on Intelligent Transportation Systems, vol. 25 (11), pp. 17317–17334, November 2024. https://www.tkn.tu-berlin.de/bib/heinovski2024where/

and

Julian Heinovski and Falko Dressler, “Platoon Formation: Optimized Car to Platoon Assignment Strategies and Protocols,” Proceedings of 10th IEEE Vehicular Networking Conference (VNC 2018), Taipei, Taiwan, December 2018. https://www.tkn.tu-berlin.de/bib/heinovski2018platoon/

__init__(owner: object, alpha: float = 0.5, speed_deviation_threshold: float = 0.2, position_deviation_threshold: int = 1000, formation_centralized_kind: str = 'greedy', solver_time_limit: int = 60, record_solver_traces: bool = False, record_infrastructure_assignments: bool = False, **kw_args)[source]

Initialize an instance of this formation algorithm to be used in a vehicle or an infrastructure.

Parameters:
  • owner (object) – The owning object that is execution this algorithm. This can be either a PlatooningVehicle or an Infrastructure.

  • alpha (float) – The weighting factor alpha

  • speed_deviation_threshold (float) – The threshold for speed deviation

  • position_deviation_threshold (int) – The threshold for position deviation

  • formation_centralized_kind (str) – TODO

  • solver_time_limit (int) – The time limit in s to apply to the solver

  • record_solver_traces (bool) – Whether to record continuous solver traces

  • record_infrastructure_assignments (bool) – Whether to record infrastructure assignments

_do_formation_centralized()[source]

Run centralized greedy formation approach.

This selects candidates and triggers join maneuvers.

_do_formation_distributed()[source]

Run distributed greedy formation approach.

This selects a candidate and triggers a join maneuver.

_do_formation_optimal()[source]

Run centralized optimal formation approach.

This selects candidates and triggers join maneuvers.

_record_infrastructure_assignments(basename: str)[source]

Record infrastructure assignments.

Parameters:

basename (str) – The basename of the result file

classmethod add_parser_argument_group(parser: ArgumentParser) _ArgumentGroup[source]

Create and return specific argument group for this algorithm to use in global argument parser.

Parameters:

parser (argparse.ArgumentParser) – The global argument parser

Returns:

The specific argument group for this algorithm

Return type:

argparse._ArgumentGroup

cost_speed_position(ds: float, dp: float) float[source]

Return the overall cost (i.e., the weighted deviation) for a candidate.

Parameters:
  • ds (float) – The deviation in speed

  • dp (int) – The deviation in position

Returns:

The weighted relative deviation

Return type:

float

do_formation()[source]

Run platoon formation algorithms to search for a platooning opportunity and perform the corresponding join maneuver.

dp(vehicle: PlatooningVehicle, platoon: Platoon) float[source]

Return the deviation in position from a given platoon.

NOTE: In the original version of the paper, the deviation calculated here was not normalized.

Parameters:
  • vehicle (PlatooningVehicle) – The vehicle for which the deviation is calculated

  • platoon (Platoon) – The platoon to which the deviation is calculated

Returns:

The relative deviation in position

Return type:

float

ds(vehicle: PlatooningVehicle, platoon: Platoon) float[source]

Return the deviation in speed from a given platoon.

NOTE: In the original version of the paper, the deviation calculated here was not normalized.

Parameters:
  • vehicle (PlatooningVehicle) – The vehicle for which the deviation is calculated

  • platoon (Platoon) – The platoon to which the deviation is calculated

Returns:

The relative deviation in speed

Return type:

float

finish()[source]

Clean up the instance of the formation algorithm.

This includes mostly statistic recording.

_abc_impl = <_abc_data object>
property name

Print the name of the formation algorithm.

plafosim.cli.plafosim.attribute()

perf_counter() -> float

Performance counter for benchmarking.

plafosim.cli.plafosim.create_simulator(**kwargs: dict) Simulator[source]

Create a simulator object from given keyword arguments.

Parameters:

kwargs (dict) – The dictionary of keyword arguments to use for the creation

Returns:

Simulator

Return type:

The created simulator object

plafosim.cli.plafosim.find_resource(path: str) str[source]

Find the resouces under relpath locally or as a packaged resource.

Parameters:

path (str) – The path to search for the ressource

Returns:

str

Return type:

The path of the resource

plafosim.cli.plafosim.format_help(parser: ArgumentParser, groups=None) str[source]

Format help message for argument groups.

Taken from https://stackoverflow.com/a/40730878.

plafosim.cli.plafosim.import_module(name, package=None)[source]

Import a module.

The ‘package’ argument is required when performing a relative import. It specifies the package to use as the anchor point from which to resolve the relative import to an absolute import.

plafosim.cli.plafosim.isclass(object)[source]

Return true if the object is a class.

Class objects provide these attributes:

__doc__ documentation string __module__ name of module in which this class was defined

plafosim.cli.plafosim.iter_modules(path=None, prefix='')[source]

Yields ModuleInfo for all submodules on path, or, if path is None, all top-level modules on sys.path.

‘path’ should be either None or a list of paths to look for modules in.

‘prefix’ is a string to output on the front of every module name on output.

plafosim.cli.plafosim.load_snapshot(snapshot_filename: str) Simulator[source]

Load a simulator object from a snapshot file.

Parameters:

snapshot_filename (str) – The name of the file containing the snapshot

Returns:

Simulator

Return type:

The loaded simulator object

plafosim.cli.plafosim.main()[source]

The main entry point of PlaFoSim.

plafosim.cli.plafosim.parse_args() -> (<class 'argparse.Namespace'>, <class 'argparse._ArgumentGroup'>)[source]

Parse arguments given to this module.

Returns:

  • args (argparse.Namespace) – The namespace of parsed arguments and corresponding values.

  • g_gui (argparse._ArgumentGroup) – The specific argument group for the GUI properties.

plafosim.cli.plafosim.save_snapshot(simulator: Simulator, snapshot_filename: str)[source]

Store a simulator object to a snapshot file.

Parameters:
  • simulator (Simulator) – The simulator object to store

  • snapshot_filename (str) – The name of the file for storing the snapshot

plafosim.cli.plafosim.signal(signalnum, handler, /)

Set the action for the given signal.

The action can be SIG_DFL, SIG_IGN, or a callable Python object. The previous action is returned. See getsignal() for possible return values.

* IMPORTANT NOTICE * A signal handler function is called with two arguments: the first is the signal number, the second is the interrupted stack frame.

plafosim.cli.plafosim.strtobool(val)[source]

Convert a string representation of truth to true (1) or false (0).

True values are ‘y’, ‘yes’, ‘t’, ‘true’, ‘on’, and ‘1’; false values are ‘n’, ‘no’, ‘f’, ‘false’, ‘off’, and ‘0’. Raises ValueError if ‘val’ is anything else.

plafosim.cli.plafosim.timer()

perf_counter() -> float

Performance counter for benchmarking.