plafosim.cli.plafosim module
- class plafosim.cli.plafosim.CustomFormatter(prog, indent_increment=2, max_help_position=24, width=None)[source]
Bases:
ArgumentDefaultsHelpFormatter,RawDescriptionHelpFormatter,MetavarTypeHelpFormatterMetaclass combining multiple formatter classes for argparse.
- class _Section(formatter, parent, heading=None)
Bases:
object- __init__(formatter, parent, heading=None)
- format_help()
- __init__(prog, indent_increment=2, max_help_position=24, width=None)
- _add_item(func, args)
- _dedent()
- _expand_help(action)
- _fill_text(text, width, indent)
- _format_action(action)
- _format_action_invocation(action)
- _format_actions_usage(actions, groups)
- _format_args(action, default_metavar)
- _format_text(text)
- _format_usage(usage, actions, groups, prefix)
- _get_default_metavar_for_optional(action)
- _get_default_metavar_for_positional(action)
- _get_help_string(action)
- _indent()
- _iter_indented_subactions(action)
- _join_parts(part_strings)
- _metavar_formatter(action, default_metavar)
- _split_lines(text, width)
- add_argument(action)
- add_arguments(actions)
- add_text(text)
- add_usage(usage, actions, groups, prefix=None)
- end_section()
- format_help()
- start_section(heading)
- class plafosim.cli.plafosim.Dummy(owner: object, dummy: int = -1, **kw_args)[source]
Bases:
FormationAlgorithmDummy Platoon Formation Algorithm.
- __init__(owner: object, dummy: int = -1, **kw_args)[source]
Initialize an instance of this formation algorithm to be used in a vehicle or an infrastructure.
- Parameters:
owner (object) – The owning object that is execution this algorithm. This can be either a PlatooningVehicle or an Infrastructure.
dummy (int, optional) – The value for the dummy parameter
- classmethod add_parser_argument_group(parser: ArgumentParser) _ArgumentGroup[source]
Create and return specific argument group for this algorithm to use in global argument parser.
- Parameters:
parser (argparse.ArgumentParser) – The global argument parser
- Returns:
The specific argument group for this algorithm
- Return type:
argparse._ArgumentGroup
- do_formation()[source]
Run platoon formation algorithm to search for a platooning opportunity and perform the corresponding join maneuver.
- finish()[source]
Clean up the instance of the formation algorithm.
This includes mostly statistic recording.
- _abc_impl = <_abc_data object>
- property name
Print the name of the formation algorithm.
- class plafosim.cli.plafosim.FormationAlgorithm(owner: object)[source]
Bases:
ABCAbstract base class for any type of platoon formation algorithm (i.e., assignment calculation).
Implementing sub-classes need to override the do_formation() method.
- __init__(owner: object)[source]
Initialize an instance of a formation algorithm.
- Parameters:
owner (object) – The owning object that is execution this algorithm. This can be either a PlatooningVehicle or an Infrastructure.
- abstract add_parser_argument_group(parser: ArgumentParser) _ArgumentGroup[source]
Abstract method for performing any type of platoon formation (i.e., assignment calculation).
This methods needs to be overridden in implementing sub-classes.
- Returns:
The specific argument group for this algorithm.
- Return type:
argparse._ArgumentGroup
- abstract do_formation()[source]
Abstract method for performing any type of platoon formation (i.e., assignment calculation).
This methods needs to be overridden in implementing sub-classes.
- _abc_impl = <_abc_data object>
- property name
Print the name of the formation algorithm.
- class plafosim.cli.plafosim.Simulator(*, road_length: int = 100000, number_of_lanes: int = 3, ramp_interval: int = 5000, pre_fill: bool = False, number_of_vehicles: int = 100, vehicle_density: float = -1, max_speed: float = 55, acc_headway_time: float = 1.0, cacc_spacing: float = 5.0, penetration_rate: float = 1.0, random_depart_position: bool = False, depart_all_lanes: bool = True, desired_speed: float = 33.0, random_desired_speed: bool = True, speed_variation: float = 0.1, min_desired_speed: float = 22.0, max_desired_speed: float = 44.0, random_depart_speed: bool = False, depart_desired: bool = False, depart_flow: bool = False, depart_method: str = 'interval', depart_interval: float = 2.0, depart_probability: float = 1.0, depart_rate: int = 3600, random_arrival_position: bool = False, minimum_trip_length: int = 0, maximum_trip_length: int = -1000, communication_range: int = 500, distributed_platoon_knowledge: bool = True, distributed_maneuver_knowledge: bool = False, start_as_platoon: bool = False, reduced_air_drag: bool = True, maximum_teleport_distance: int = 2000, maximum_approach_time: int = 60, delay_teleports: bool = True, update_desired_speed: bool = True, formation_algorithm: str | None = None, formation_strategy: str = 'distributed', execution_interval: int = 10, number_of_infrastructures: int = 0, step_length: float = 1.0, max_step: int = 3600, actions: bool = True, collisions: bool = True, random_seed: int = -1, log_level: int = 30, progress: bool = True, gui: bool = False, gui_delay: int = 0, gui_track_vehicle: int = -1, sumo_config: str = 'sumocfg/freeway.sumo.cfg', gui_play: int = True, gui_start: int = 0, draw_ramps: bool = True, draw_ramp_labels: bool = False, draw_road_end: bool = True, draw_road_end_label: bool = True, draw_infrastructures: bool = True, draw_infrastructure_labels: bool = True, screenshot_filename: str | None = None, result_base_filename: str = 'results', record_simulation_trace: bool = False, record_end_trace: bool = True, record_vehicle_trips: bool = False, record_vehicle_emissions: bool = False, record_vehicle_traces: bool = False, record_vehicle_changes: bool = False, record_emission_traces: bool = False, record_platoon_trips: bool = False, record_platoon_maneuvers: bool = False, record_platoon_formation: bool = False, record_platoon_traces: bool = False, record_vehicle_platoon_traces: bool = False, record_platoon_changes: bool = False, record_infrastructure_assignments: bool = False, record_vehicle_teleports: bool = False, record_prefilled: bool = False, **kwargs: dict)[source]
Bases:
objectA collection of parameters and information of the simulator.
- __init__(*, road_length: int = 100000, number_of_lanes: int = 3, ramp_interval: int = 5000, pre_fill: bool = False, number_of_vehicles: int = 100, vehicle_density: float = -1, max_speed: float = 55, acc_headway_time: float = 1.0, cacc_spacing: float = 5.0, penetration_rate: float = 1.0, random_depart_position: bool = False, depart_all_lanes: bool = True, desired_speed: float = 33.0, random_desired_speed: bool = True, speed_variation: float = 0.1, min_desired_speed: float = 22.0, max_desired_speed: float = 44.0, random_depart_speed: bool = False, depart_desired: bool = False, depart_flow: bool = False, depart_method: str = 'interval', depart_interval: float = 2.0, depart_probability: float = 1.0, depart_rate: int = 3600, random_arrival_position: bool = False, minimum_trip_length: int = 0, maximum_trip_length: int = -1000, communication_range: int = 500, distributed_platoon_knowledge: bool = True, distributed_maneuver_knowledge: bool = False, start_as_platoon: bool = False, reduced_air_drag: bool = True, maximum_teleport_distance: int = 2000, maximum_approach_time: int = 60, delay_teleports: bool = True, update_desired_speed: bool = True, formation_algorithm: str | None = None, formation_strategy: str = 'distributed', execution_interval: int = 10, number_of_infrastructures: int = 0, step_length: float = 1.0, max_step: int = 3600, actions: bool = True, collisions: bool = True, random_seed: int = -1, log_level: int = 30, progress: bool = True, gui: bool = False, gui_delay: int = 0, gui_track_vehicle: int = -1, sumo_config: str = 'sumocfg/freeway.sumo.cfg', gui_play: int = True, gui_start: int = 0, draw_ramps: bool = True, draw_ramp_labels: bool = False, draw_road_end: bool = True, draw_road_end_label: bool = True, draw_infrastructures: bool = True, draw_infrastructure_labels: bool = True, screenshot_filename: str | None = None, result_base_filename: str = 'results', record_simulation_trace: bool = False, record_end_trace: bool = True, record_vehicle_trips: bool = False, record_vehicle_emissions: bool = False, record_vehicle_traces: bool = False, record_vehicle_changes: bool = False, record_emission_traces: bool = False, record_platoon_trips: bool = False, record_platoon_maneuvers: bool = False, record_platoon_formation: bool = False, record_platoon_traces: bool = False, record_vehicle_platoon_traces: bool = False, record_platoon_changes: bool = False, record_infrastructure_assignments: bool = False, record_vehicle_teleports: bool = False, record_prefilled: bool = False, **kwargs: dict)[source]
Initialize a simulator instance.
- _add_vehicle(vid: int, vtype: VehicleType, depart_position: float, arrival_position: float, desired_speed: float, depart_lane: int, depart_speed: float, depart_time: float, depart_delay: float = 0, communication_range: int = 500, pre_filled: bool = False) Vehicle[source]
Add a vehicle to the simulation based on the given parameters.
NOTE: Make sure that you set last_vehicle_id correctly.
- Parameters:
vid (int) – The id of the vehicle
vtype (VehicleType) – The vehicle type of the vehicle
depart_position (int) – The departure position of the vehicle
arrival_position (int) – The arrival position of the vehicle
desired_speed (float) – The desired driving speed of the vehicle
depart_lane (int) – The departure lane of the vehicle
depart_speed (float) – The departure speed of the vehicle
depart_time (float) – The actual departure time of the vehicle
depart_delay (float, optional) – The time the vehicle had to wait before starting its trip
communication_range (int, optional) – The maximum communication range of the vehicle
pre_filled (bool, optional) – Whether this vehicle was pre-filled
- Returns:
The added vehicle
- Return type:
- _generate_infrastructures(number_of_infrastructures: int)[source]
Generate infrastructures for the simulation.
- Parameters:
number_of_infrastructures (int) – The number of infrastructures to generate
- _get_predecessor(vehicle: Vehicle, lane: int = -1) Vehicle[source]
Return the preceding (i.e., front) vehicle for a given vehicle on a given lane.
- Parameters:
vehicle (Vehicle) – The vehicle to consider
lane (int, optional) – The lane to consider. A lane of -1 indicates the vehicle’s current lane.
- _get_predecessor_rear_position(vehicle: Vehicle, lane: int = -1) float[source]
Return the rear position of the preceding (i.e., front) vehicle for a given vehicle on a given lane.
- Parameters:
vehicle (Vehicle) – The vehicle to consider
lane (int, optional) – The lane to consider. A lane of -1 indicates the vehicle’s current lane.
- _get_predecessor_speed(vehicle: Vehicle, lane: int = -1) float[source]
Return the speed of the preceding (i.e., front) vehicle for a given vehicle on a given lane.
- Parameters:
vehicle (Vehicle) – The vehicle to consider
lane (int, optional) – The lane to consider. A lane of -1 indicates the vehicle’s current lane.
- _get_successor(vehicle: Vehicle, lane: int = -1) Vehicle[source]
Return the succeeding (i.e., back) vehicle for a given vehicle on a given lane.
- Parameters:
vehicle (Vehicle) – The vehicle to consider
lane (int, optional) – The lane to consider. A lane of -1 indicates the vehicle’s current lane.
- _get_vehicles_df() DataFrame[source]
Return a pandas Dataframe from the internal data structure.
- Returns:
The Dataframe containing the vehicles as rows index: vid columns: [position, length, lane, ..]
- Return type:
pandas.DataFrame
- _initialize_result_recording()[source]
Create output files for all (enabled) statistics and writes the headers.
- _record_lane_changes(vdf: DataFrame)[source]
Record lane changes.
- Parameters:
vdf (pd.DataFrame) – The Dataframe containing the vehicles as rows index: vid columns: [position, length, lane, ..]
- _remove_arrived_vehicles(arrived_vehicles: list)[source]
Remove arrived vehicles from the simulation.
- Parameters:
arrived_vehicles (list) – The ids of arrived vehicles
- _spawn_vehicles(vdf: DataFrame)[source]
Spawns vehicles within the current step.
Calculate how many vehicles should be spawned according to the departure method
Calculate properties for these vehicles (e.g., desired speed)
Add vehicles to spawn queue
Spawn as many vehicles as possible from the queue (sorted by waiting time)
Update queue
- _statistics(vehicles_in_simulator: int, vehicles_in_queue: int, vehicles_spawned: int, vehicles_arrived: int, runtime: float, average_vehicle_speed: float, vehicles_braking_rough: int)[source]
Record some period statistics.
- Parameters:
vehicles_in_simulator (int) – The number of vehicles in the scenario within this step
vehicles_in_queue (int) – The number of vehicles in the spawn queue within this step
vehicles_spawned (int) – The number of vehicles that departed within this step
vehicles_arrived (int) – The number of vehicles that arrived within this step
runtime (float) – The run time of this step
average_vehicle_speed (int) – The average driving speed amog all vehicles in the scenario within this step
vehicles_braking_rough (int) – The number of vehicles performing rough braking within this step
- _vehicles_to_be_scheduled() int[source]
Calculate how many vehicles should be spawned according to the departure method.
- Returns:
int
- Return type:
The number of vehicles to be spawned within this time step
- _write_back_vehicles_df(vdf: DataFrame)[source]
Write back the vehicle updates from a given pandas dataframe to the internal data structure.
- Parameters:
vdf (pandas.DataFrame) – The Dataframe containing the vehicles as rows index: vid columns: [position, length, lane, ..]
- run()[source]
Run the simulation with the specified parameters until it is stopped.
Main simulation method.
This is based on Krauss’ multi lane traffic: laneChange(); adjust(); move();
- stop(msg: str)[source]
Stop the simulation with the given message.
- Parameters:
msg (str) – The message to show after stopping the simulation
- property number_of_lanes: int
Return the number of lanes.
- property road_length: int
Return the road length in m.
- property step: int
Return the current simulation step.
- property step_length: float
Return the length of a simulation step.
- class plafosim.cli.plafosim.SpeedPosition(owner: object, alpha: float = 0.5, speed_deviation_threshold: float = 0.2, position_deviation_threshold: int = 1000, formation_centralized_kind: str = 'greedy', solver_time_limit: int = 60, record_solver_traces: bool = False, record_infrastructure_assignments: bool = False, **kw_args)[source]
Bases:
FormationAlgorithmPlatoon Formation Algorithm based on Similarity, considering Speed and Position.
See papers
Julian Heinovski and Falko Dressler, “Where to Decide? Centralized vs. Distributed Vehicle Assignment for Platoon Formation,” IEEE Transactions on Intelligent Transportation Systems, vol. 25 (11), pp. 17317–17334, November 2024. https://www.tkn.tu-berlin.de/bib/heinovski2024where/
and
Julian Heinovski and Falko Dressler, “Platoon Formation: Optimized Car to Platoon Assignment Strategies and Protocols,” Proceedings of 10th IEEE Vehicular Networking Conference (VNC 2018), Taipei, Taiwan, December 2018. https://www.tkn.tu-berlin.de/bib/heinovski2018platoon/
- __init__(owner: object, alpha: float = 0.5, speed_deviation_threshold: float = 0.2, position_deviation_threshold: int = 1000, formation_centralized_kind: str = 'greedy', solver_time_limit: int = 60, record_solver_traces: bool = False, record_infrastructure_assignments: bool = False, **kw_args)[source]
Initialize an instance of this formation algorithm to be used in a vehicle or an infrastructure.
- Parameters:
owner (object) – The owning object that is execution this algorithm. This can be either a PlatooningVehicle or an Infrastructure.
alpha (float) – The weighting factor alpha
speed_deviation_threshold (float) – The threshold for speed deviation
position_deviation_threshold (int) – The threshold for position deviation
formation_centralized_kind (str) – TODO
solver_time_limit (int) – The time limit in s to apply to the solver
record_solver_traces (bool) – Whether to record continuous solver traces
record_infrastructure_assignments (bool) – Whether to record infrastructure assignments
- _do_formation_centralized()[source]
Run centralized greedy formation approach.
This selects candidates and triggers join maneuvers.
- _do_formation_distributed()[source]
Run distributed greedy formation approach.
This selects a candidate and triggers a join maneuver.
- _do_formation_optimal()[source]
Run centralized optimal formation approach.
This selects candidates and triggers join maneuvers.
- _record_infrastructure_assignments(basename: str)[source]
Record infrastructure assignments.
- Parameters:
basename (str) – The basename of the result file
- classmethod add_parser_argument_group(parser: ArgumentParser) _ArgumentGroup[source]
Create and return specific argument group for this algorithm to use in global argument parser.
- Parameters:
parser (argparse.ArgumentParser) – The global argument parser
- Returns:
The specific argument group for this algorithm
- Return type:
argparse._ArgumentGroup
- cost_speed_position(ds: float, dp: float) float[source]
Return the overall cost (i.e., the weighted deviation) for a candidate.
- Parameters:
ds (float) – The deviation in speed
dp (int) – The deviation in position
- Returns:
The weighted relative deviation
- Return type:
float
- do_formation()[source]
Run platoon formation algorithms to search for a platooning opportunity and perform the corresponding join maneuver.
- dp(vehicle: PlatooningVehicle, platoon: Platoon) float[source]
Return the deviation in position from a given platoon.
NOTE: In the original version of the paper, the deviation calculated here was not normalized.
- Parameters:
vehicle (PlatooningVehicle) – The vehicle for which the deviation is calculated
platoon (Platoon) – The platoon to which the deviation is calculated
- Returns:
The relative deviation in position
- Return type:
float
- ds(vehicle: PlatooningVehicle, platoon: Platoon) float[source]
Return the deviation in speed from a given platoon.
NOTE: In the original version of the paper, the deviation calculated here was not normalized.
- Parameters:
vehicle (PlatooningVehicle) – The vehicle for which the deviation is calculated
platoon (Platoon) – The platoon to which the deviation is calculated
- Returns:
The relative deviation in speed
- Return type:
float
- finish()[source]
Clean up the instance of the formation algorithm.
This includes mostly statistic recording.
- _abc_impl = <_abc_data object>
- property name
Print the name of the formation algorithm.
- plafosim.cli.plafosim.attribute()
perf_counter() -> float
Performance counter for benchmarking.
- plafosim.cli.plafosim.create_simulator(**kwargs: dict) Simulator[source]
Create a simulator object from given keyword arguments.
- Parameters:
kwargs (dict) – The dictionary of keyword arguments to use for the creation
- Returns:
Simulator
- Return type:
The created simulator object
- plafosim.cli.plafosim.find_resource(path: str) str[source]
Find the resouces under relpath locally or as a packaged resource.
- Parameters:
path (str) – The path to search for the ressource
- Returns:
str
- Return type:
The path of the resource
- plafosim.cli.plafosim.format_help(parser: ArgumentParser, groups=None) str[source]
Format help message for argument groups.
Taken from https://stackoverflow.com/a/40730878.
- plafosim.cli.plafosim.import_module(name, package=None)[source]
Import a module.
The ‘package’ argument is required when performing a relative import. It specifies the package to use as the anchor point from which to resolve the relative import to an absolute import.
- plafosim.cli.plafosim.isclass(object)[source]
Return true if the object is a class.
- Class objects provide these attributes:
__doc__ documentation string __module__ name of module in which this class was defined
- plafosim.cli.plafosim.iter_modules(path=None, prefix='')[source]
Yields ModuleInfo for all submodules on path, or, if path is None, all top-level modules on sys.path.
‘path’ should be either None or a list of paths to look for modules in.
‘prefix’ is a string to output on the front of every module name on output.
- plafosim.cli.plafosim.load_snapshot(snapshot_filename: str) Simulator[source]
Load a simulator object from a snapshot file.
- Parameters:
snapshot_filename (str) – The name of the file containing the snapshot
- Returns:
Simulator
- Return type:
The loaded simulator object
- plafosim.cli.plafosim.parse_args() -> (<class 'argparse.Namespace'>, <class 'argparse._ArgumentGroup'>)[source]
Parse arguments given to this module.
- Returns:
args (argparse.Namespace) – The namespace of parsed arguments and corresponding values.
g_gui (argparse._ArgumentGroup) – The specific argument group for the GUI properties.
- plafosim.cli.plafosim.save_snapshot(simulator: Simulator, snapshot_filename: str)[source]
Store a simulator object to a snapshot file.
- Parameters:
simulator (Simulator) – The simulator object to store
snapshot_filename (str) – The name of the file for storing the snapshot
- plafosim.cli.plafosim.signal(signalnum, handler, /)
Set the action for the given signal.
The action can be SIG_DFL, SIG_IGN, or a callable Python object. The previous action is returned. See getsignal() for possible return values.
* IMPORTANT NOTICE * A signal handler function is called with two arguments: the first is the signal number, the second is the interrupted stack frame.
- plafosim.cli.plafosim.strtobool(val)[source]
Convert a string representation of truth to true (1) or false (0).
True values are ‘y’, ‘yes’, ‘t’, ‘true’, ‘on’, and ‘1’; false values are ‘n’, ‘no’, ‘f’, ‘false’, ‘off’, and ‘0’. Raises ValueError if ‘val’ is anything else.
- plafosim.cli.plafosim.timer()
perf_counter() -> float
Performance counter for benchmarking.