Source code for plafosim.algorithms.speed_position

#
# Copyright (c) 2020-2025 Julian Heinovski <heinovski@ccs-labs.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#

import argparse
import logging
import sys
from distutils.util import strtobool
from timeit import default_timer as timer
from typing import TYPE_CHECKING

from ..formation_algorithm import FormationAlgorithm
from ..platoon_role import PlatoonRole

if TYPE_CHECKING:
    from ..platooning_vehicle import Platoon  # noqa 401
    from ..platooning_vehicle import PlatooningVehicle  # noqa 401

LOG = logging.getLogger(__name__)

# default values for this algorithm's parameters
DEFAULTS = {
    'alpha': 0.5,
    'speed_deviation_threshold': 0.2,
    'position_deviation_threshold': 1000,  # m
    'formation_centralized_kind': 'greedy',
    'solver_time_limit': 60,  # s
    'record_solver_traces': False,
    'record_infrastructure_assignments': False,
}


[docs]def initialize_solver_traces(basename: str): """ Initialize the solver trace file. Parameters ---------- basename : str The basename of the trace file """ assert basename with open(f'{basename}_solver_traces.csv', 'w') as f: f.write( "step," "id," "numVariables," "numConstraints," "runTime," "resultStatus," "solutionValue," "bestBound," "solutionQuality" "\n" )
[docs]def record_solver_trace( basename: str, step: float, iid: int, variables: int, constraints: int, run_time: float, result_status: int, solution_value: float, best_bound: float, solution_quality: float, ): """ Record one line in the solver trace file. Parameters ---------- basename : str The basename of the trace file step : float The current simulation step iid : int The id of the infrastructure which executed the solver run variables : int The number of variables constraints : int The number of constraints run_time : float The run time of the solver result_status : int The result status of the solver solution_value : float The solution value of the solver best_bound : float The best bound of the problem solution_quality : float The quality of the solution """ assert basename with open(f'{basename}_solver_traces.csv', 'a') as f: f.write( f"{step}," f"{iid}," f"{variables}," f"{constraints}," f"{run_time}," f"{result_status}," f"{solution_value}," f"{best_bound}," f"{solution_quality}" "\n" )
[docs]def initialize_infrastructure_assignments(basename: str): """ Initialize the infrastructure assignments result file. Parameters ---------- basename : str The basename of the result file """ assert basename with open(f'{basename}_infrastructure_assignments.csv', 'w') as f: f.write( "id," "assignmentsSolved," "assignmentsNotSolved," "assignmentsSolvedOptimal," "assignmentsSolvedFeasible," "assignmentsNone," "assignmentsSelf," "assignmentsCandidateJoinedAlready," "assignmentsVehicleBecameLeader," "assignmentsSuccessful" "\n" )
[docs]class SpeedPosition(FormationAlgorithm): """ Platoon Formation Algorithm based on Similarity, considering Speed and Position. See papers Julian Heinovski and Falko Dressler, "Where to Decide? Centralized vs. Distributed Vehicle Assignment for Platoon Formation," IEEE Transactions on Intelligent Transportation Systems, vol. 25 (11), pp. 17317–17334, November 2024. https://www.tkn.tu-berlin.de/bib/heinovski2024where/ and Julian Heinovski and Falko Dressler, "Platoon Formation: Optimized Car to Platoon Assignment Strategies and Protocols," Proceedings of 10th IEEE Vehicular Networking Conference (VNC 2018), Taipei, Taiwan, December 2018. https://www.tkn.tu-berlin.de/bib/heinovski2018platoon/ """
[docs] def __init__( self, owner: object, alpha: float = DEFAULTS['alpha'], speed_deviation_threshold: float = DEFAULTS['speed_deviation_threshold'], position_deviation_threshold: int = DEFAULTS['position_deviation_threshold'], formation_centralized_kind: str = DEFAULTS['formation_centralized_kind'], solver_time_limit: int = DEFAULTS['solver_time_limit'], record_solver_traces: bool = DEFAULTS['record_solver_traces'], record_infrastructure_assignments: bool = DEFAULTS['record_infrastructure_assignments'], **kw_args, ): """ Initialize an instance of this formation algorithm to be used in a vehicle or an infrastructure. Parameters ---------- owner : object The owning object that is execution this algorithm. This can be either a PlatooningVehicle or an Infrastructure. alpha : float The weighting factor alpha speed_deviation_threshold : float The threshold for speed deviation position_deviation_threshold : int The threshold for position deviation formation_centralized_kind : str TODO solver_time_limit : int The time limit in s to apply to the solver record_solver_traces : bool Whether to record continuous solver traces record_infrastructure_assignments : bool Whether to record infrastructure assignments """ super().__init__(owner) if alpha < 0 or alpha > 1: sys.exit(f"ERROR [{__name__}]: The weighting factor alpha needs to be in [0,1]!") self._alpha = alpha # the weight of the speed deviation # the maximum deviation from the desired driving speed if speed_deviation_threshold == -1: # use 100% speed deviation from own desired driving speed # e.g., des = 30 m/s, possible speed in [0, 60] m/s self._speed_deviation_threshold = 1.0 else: self._speed_deviation_threshold = speed_deviation_threshold # the maximum deviation from the current position if position_deviation_threshold == -1: # use maximum possible distance aka total road length self._position_deviation_threshold = self._owner._simulator.road_length else: self._position_deviation_threshold = position_deviation_threshold self._formation_centralized_kind = formation_centralized_kind # the kind of the centralized formation if formation_centralized_kind == "optimal": if solver_time_limit <= 0: LOG.warning("Running the solver without a time limit may lead to long simulation times!") elif solver_time_limit < 2: LOG.warning("The time limit for the solver should be at least 2s! Otherwise it may not be possible for the solver to produce a solution (especially with many vehicles)!") self._solver_time_limit = solver_time_limit # the time limit for the optimal solver per assignment problem self._record_solver_traces = record_solver_traces # whether to record continuous solver traces self._record_infrastructure_assignments = record_infrastructure_assignments # whether to record infrastructure assignments # statistics self._assignments_solved = 0 self._assignments_not_solved = 0 self._assignments_solved_optimal = 0 self._assignments_solved_feasible = 0 self._assignments_none = 0 self._assignments_self = 0 self._assignments_candidate_joined_already = 0 self._assingments_vehicle_became_leader = 0 self._assignments_successful = 0 from ..infrastructure import Infrastructure if isinstance(self._owner, Infrastructure) and self._formation_centralized_kind == 'optimal': if self._record_solver_traces: # create output file for solver traces initialize_solver_traces(basename=self._owner._simulator._result_base_filename) if self._record_infrastructure_assignments: # create output file for infrastructure assignments initialize_infrastructure_assignments(basename=self._owner._simulator._result_base_filename)
[docs] @classmethod def add_parser_argument_group(self, parser: argparse.ArgumentParser) -> argparse._ArgumentGroup: """ Create and return specific argument group for this algorithm to use in global argument parser. Parameters ---------- parser : argparse.ArgumentParser The global argument parser Returns ------- argparse._ArgumentGroup The specific argument group for this algorithm """ group = parser.add_argument_group(f"Formation Algorithm -- {self.__name__}") group.add_argument( "--alpha", type=float, default=DEFAULTS['alpha'], help="The weight of the speed deviation in comparison to the position deviation", ) group.add_argument( "--speed-deviation-threshold", type=float, default=DEFAULTS['speed_deviation_threshold'], help="The maximum allowed relative deviation from the desired speed for considering neighbors as candidates. A value of -1 disables the threshold (i.e., 100%% desired speed)", ) group.add_argument( "--position-deviation-threshold", type=int, default=DEFAULTS['position_deviation_threshold'], help="The maximum allowed absolute deviation from the current position for considering neighbors as candidates. A value of -1 disables the threshold (i.e., total road length)", ) group.add_argument( "--formation-centralized-kind", type=str, default=DEFAULTS['formation_centralized_kind'], choices=["greedy", "optimal"], help="The kind of the centralized formation", ) group.add_argument( "--solver-time-limit", type=int, default=int(DEFAULTS['solver_time_limit']), help="The time limit for the optimal solver per assignment problem in s. Influences the quality of the solution. A value below 1 disables the limit.", ) group.add_argument( "--record-solver-traces", type=lambda x: bool(strtobool(x)), default=DEFAULTS['record_solver_traces'], choices=(True, False), help="Whether to record solver traces", ) group.add_argument( "--record-infrastructure-assignments", type=lambda x: bool(strtobool(x)), default=DEFAULTS['record_infrastructure_assignments'], choices=(True, False), help="Whether to record infrastructure assignments", ) return group
[docs] def ds(self, vehicle: 'PlatooningVehicle', platoon: 'Platoon') -> float: """ Return the deviation in speed from a given platoon. NOTE: In the original version of the paper, the deviation calculated here was not normalized. Parameters ---------- vehicle : PlatooningVehicle The vehicle for which the deviation is calculated platoon : Platoon The platoon to which the deviation is calculated Returns ------- float The relative deviation in speed """ diff = abs(vehicle._desired_speed - platoon.desired_speed) # normalize deviation by maximum allowed speed deviation # a value in [0, 1] from maximum return (diff / (self._speed_deviation_threshold * vehicle._desired_speed))
[docs] def dp(self, vehicle: 'PlatooningVehicle', platoon: 'Platoon') -> float: """ Return the deviation in position from a given platoon. NOTE: In the original version of the paper, the deviation calculated here was not normalized. Parameters ---------- vehicle : PlatooningVehicle The vehicle for which the deviation is calculated platoon : Platoon The platoon to which the deviation is calculated Returns ------- float The relative deviation in position """ # NOTE: we have multiple options # a) take the leader's position # b) take the last's position # c) take the platoon's middle position # d) take minimum from a) and b) diff = min( # TODO consider vehicles' length as well abs(vehicle.position - platoon.position), # vehicle is in front of platoon abs(platoon.last.position - vehicle.position), # platoon is in front of vehicle ) # normalize deviation by maximum allowed position deviation # a value in [0, 1] from maximum return (diff / self._position_deviation_threshold)
[docs] def cost_speed_position(self, ds: float, dp: float) -> float: """ Return the overall cost (i.e., the weighted deviation) for a candidate. Parameters ---------- ds : float The deviation in speed dp : int The deviation in position Returns ------- float The weighted relative deviation """ fx = (self._alpha * ds) + ((1.0 - self._alpha) * dp) assert 0.0 <= fx <= 1.0 return fx
[docs] def do_formation(self): """ Run platoon formation algorithms to search for a platooning opportunity and perform the corresponding join maneuver. """ from ..infrastructure import Infrastructure if isinstance(self._owner, Infrastructure): LOG.info(f"{self._owner.iid} is running formation algorithm {self.name} ({self._formation_centralized_kind}) at {self._owner._simulator.step}") if self._formation_centralized_kind == 'optimal': # optimal self._do_formation_optimal() else: # greedy self._do_formation_centralized() else: # greedy self._do_formation_distributed()
[docs] def _record_infrastructure_assignments(self, basename: str): """ Record infrastructure assignments. Parameters ---------- basename : str The basename of the result file """ assert basename with open(f'{basename}_infrastructure_assignments.csv', 'a') as f: f.write( f"{self._owner.iid}," f"{self._assignments_solved}," f"{self._assignments_not_solved}," f"{self._assignments_solved_optimal}," f"{self._assignments_solved_feasible}," f"{self._assignments_none}," f"{self._assignments_self}," f"{self._assignments_candidate_joined_already}," f"{self._assingments_vehicle_became_leader}," f"{self._assignments_successful}" "\n" )
[docs] def finish(self): """ Clean up the instance of the formation algorithm. This includes mostly statistic recording. """ # write statistics if not self._owner._simulator._record_platoon_formation: return if self._formation_centralized_kind != 'optimal': return from ..infrastructure import Infrastructure assert isinstance(self._owner, Infrastructure) if self._owner._simulator._record_infrastructure_assignments: self._record_infrastructure_assignments(self._owner._simulator._result_base_filename)
[docs] def _do_formation_distributed(self): """ Run distributed greedy formation approach. This selects a candidate and triggers a join maneuver. """ # we can only run the algorithm if we are not yet in a platoon # because this algorithm does not support changing the platoon later on if self._owner.platoon_role != PlatoonRole.NONE: LOG.trace(f"{self._owner.vid} is already in a platoon") return # only if not currently in a maneuver if self._owner.in_maneuver: LOG.trace(f"{self._owner.vid} is already in a maneuver") return LOG.debug(f"{self._owner.vid} is running formation algorithm {self.name} (distributed)") self._owner._formation_iterations += 1 found_candidates = [] # get all available platoons or platoon candidates for platoon in self._owner._get_available_platoons(): # calculate deviation values ds = self.ds(self._owner, platoon) dp = self.dp(self._owner, platoon) # FIXME HACK for skipping platoons behind us if self._owner.position > platoon.rear_position: LOG.trace(f"{self._owner.vid}'s platoon {platoon.platoon_id} (leader {platoon.leader.vid}) not applicable because of its absolute position") # we do not record stats here since this is different to the communication aspect filtering continue # remove platoon if not in speed range if ds > 1.0: LOG.trace(f"{self._owner.vid}'s platoon {platoon.platoon_id} (leader {platoon.leader.vid}) not applicable because of its speed difference") continue # remove platoon if not in position range if dp > 1.0: LOG.trace(f"{self._owner.vid}'s platoon {platoon.platoon_id} (leader {platoon.leader.vid}) not applicable because of its position difference") continue # calculate deviation/cost fx = self.cost_speed_position(ds, dp) # add platoon to list found_candidates.append({'vid': self._owner.vid, 'pid': platoon.platoon_id, 'lid': platoon.leader.vid, 'cost': fx}) # stats if platoon.size > 1: self._owner._candidates_found_platoon += 1 else: self._owner._candidates_found_individual += 1 # the number of candidates found in this iteration LOG.debug(f"{self._owner.vid} found {len(found_candidates)} applicable candidates") self._owner._candidates_found += len(found_candidates) if not found_candidates: LOG.debug(f"{self._owner.vid} has no candidates") return # find best candidate to join # pick the platoon with the lowest deviation best = min(found_candidates, key=lambda x: x['cost']) LOG.debug(f"{self._owner.vid}'s best platoon is {best['pid']} (leader {best['lid']}) with {best['cost']}") # perform a join maneuver with the candidate's platoon self._owner._join(best['pid'], best['lid'])
[docs] def _do_formation_centralized(self): """ Run centralized greedy formation approach. This selects candidates and triggers join maneuvers. """ all_found_candidates = [] from ..platooning_vehicle import PlatooningVehicle # noqa 811 # select all searching vehicles for vehicle in self._owner._simulator._vehicles.values(): # filter vehicles that are technically not able to do platooning if not isinstance(vehicle, PlatooningVehicle): LOG.trace(f"{vehicle.vid} is not capable of platooning") continue # filter vehicles which are already in a platoon if vehicle.platoon_role != PlatoonRole.NONE: LOG.trace(f"{vehicle.vid} is already in a platoon") continue # filter vehicles which are already in a maneuver if vehicle.in_maneuver: LOG.trace(f"{vehicle.vid} is already in a maneuver") continue vehicle._formation_iterations += 1 # get all available platoons or platoon candidates for other_vehicle in self._owner._simulator._vehicles.values(): # filter same car because we assume driving alone is worse than to do platooning if other_vehicle is vehicle: continue # filter vehicles that are technically not able to do platooning if not isinstance(other_vehicle, PlatooningVehicle): LOG.trace(f"{other_vehicle.vid} is not capable of platooning") continue # here, the logic similar to the distributed approach begins # filter vehicles which are not available to become a new leader # we only have this information due to oracle knowledge in the centralized version # we use this to replace management of neighbors and their advertisements if other_vehicle.platoon_role != PlatoonRole.NONE and other_vehicle.platoon_role != PlatoonRole.LEADER: # we can only join an existing platoon or built a new one LOG.trace(f"{other_vehicle.vid} is not available") vehicle._candidates_filtered += 1 vehicle._candidates_filtered_follower += 1 continue # filter vehicles which are already in a maneuver # we only have this information due to oracle knowledge in the centralized version if other_vehicle.in_maneuver: LOG.trace(f"{other_vehicle.vid} is already in a maneuver") vehicle._candidates_filtered += 1 vehicle._candidates_filtered_maneuver += 1 continue platoon = other_vehicle.platoon # for one vehicle A we are looking at a different vehicle B to # check whether it is useful that A joins B # calculate deviation values ds = self.ds(vehicle, platoon) dp = self.dp(vehicle, platoon) # FIXME HACK for skipping platoons behind us if vehicle.position > platoon.rear_position: LOG.trace(f"{vehicle.vid}'s platoon {platoon.platoon_id} (leader {platoon.leader.vid}) not applicable because of its absolute position") # we do not record stats here since this is different to the communication aspect filtering continue # remove platoon if not in speed range if ds > 1.0: # simply applying the threshold LOG.trace(f"{vehicle.vid}'s platoon {platoon.platoon_id} (leader {platoon.leader.vid}) not applicable because of its speed difference") continue # remove platoon if not in position range if dp > 1.0: # simply applying the threshold LOG.trace(f"{vehicle.vid}'s platoon {platoon.platoon_id} (leader {platoon.leader.vid}) not applicable because of its position difference") continue # calculate deviation/cost fx = self.cost_speed_position(ds, dp) # NOTE: The distinction of vid and lid is superfluous, since we can only join either individual vehicles or platoon leaders. # More specifically, only individual vehicles or platoon leader are able to "advertise" their platoon (see above) assert other_vehicle.vid == platoon.leader.vid, f"We can only join individual vehicles or platoon leaders! {other_vehicle.vid}, {platoon.platoon_id}, {platoon.leader.vid}" # add platoon to list all_found_candidates.append({'vid': vehicle.vid, 'pid': platoon.platoon_id, 'lid': platoon.leader.vid, 'cost': fx}) LOG.trace(f"{vehicle.vid} found applicable platoon {platoon.platoon_id} (leader {platoon.leader.vid})") # stats vehicle._candidates_found += 1 if platoon.size > 1: vehicle._candidates_found_platoon += 1 else: vehicle._candidates_found_individual += 1 # end vehicle # end all vehicles if len(all_found_candidates) == 0: LOG.debug(f"{self._owner.iid} found no possible matches") return # get unique list of searching vehicles from within the possible matches uids = set(x['vid'] for x in all_found_candidates) # TODO apply random to uids? # go through all searching vehicles (i.e., their ids) for v in uids: # get vehicle data and candidates vehicle = self._owner._simulator._vehicles[v] # get all candidates that vehicle could join found_candidates = [x for x in all_found_candidates if x['vid'] == v] if len(found_candidates) == 0: # this vehicle has no candidates (anymore) LOG.trace(f"{vehicle.vid} has no candidates (anymore)") continue # find best candidate to join # pick the platoon with the lowest deviation best = min(found_candidates, key=lambda x: x['cost']) LOG.trace(f"{v}'s best platoon is {best['pid']} (leader {best['lid']}) with cost {best['cost']}") # perform a join maneuver with the candidate's platoon vehicle._join(best['pid'], best['lid']) # remove all matches from the list of possible matches that would include the selected vehicle # this is exactly avoiding using candidates that became followers meanwhile or are at least within a maneuver def is_available(x: dict) -> bool: """ Return whether an entry from the list of possible matches is (still) available. Parameters ---------- x : dict The entry from the list of possible matches """ return ( x['vid'] != best['vid'] and # noqa 504 # this vehicle does not search anymore, since it just started a join maneuver x['lid'] != best['vid'] and # noqa 504 # this vehicle will not be applicable as leader anymore, since it just started a join maneuver x['vid'] != best['lid'] and # noqa 504 # the other vehicle is not searching anymore, since it will become a leader in the current join maneuver x['lid'] != best['lid'] # the other vehicle is not available as leader anymore, since it is busy with the current join maneuver ) all_found_candidates = [x for x in all_found_candidates if is_available(x)]
[docs] def _do_formation_optimal(self): """ Run centralized optimal formation approach. This selects candidates and triggers join maneuvers. """ from ortools import __version__ as solver_version from ortools.linear_solver import pywraplp solver = pywraplp.Solver(f"{self.name} solver (version {solver_version}) @ {self._owner.iid}", pywraplp.Solver.SCIP_MIXED_INTEGER_PROGRAMMING) solver.SetNumThreads(1) if self._solver_time_limit > 0: # influences the quality of the solution solver.set_time_limit(self._solver_time_limit * 1000) # s --> ms # import sys # infinity = sys.float_info.max # does work # infinity = solver.infinity() does work? individual = 1 # big magic number for the cost of driving individually objective = solver.Objective() objective.SetMinimization() decision_variables = {} from ..platooning_vehicle import PlatooningVehicle # noqa 811 LOG.debug(f"{self._owner.iid} is adding assignment variables for vehicles") # select all searching vehicles for vehicle in self._owner._simulator._vehicles.values(): # filter vehicles that are technically not able to do platooning if not isinstance(vehicle, PlatooningVehicle): LOG.trace(f"{vehicle.vid} is not capable of platooning") continue # filter vehicles which are already in a platoon if vehicle.platoon_role != PlatoonRole.NONE: LOG.trace(f"{vehicle.vid} is already in a platoon") continue # filter vehicles which are already in a maneuver if vehicle.in_maneuver: LOG.trace(f"{vehicle.vid} is already in a maneuver") continue vehicle._formation_iterations += 1 # allow a vehicle to be assigned to exactly one platoon constraint_one_target_platoon = solver.RowConstraint(1, 1, f"only one platoon for {vehicle.vid}") # get all available platoons or platoon candidates for other_vehicle in self._owner._simulator._vehicles.values(): # NOTE: we do not filter same car because staying individual is possible here # filter vehicles that are technically not able to do platooning if not isinstance(other_vehicle, PlatooningVehicle): LOG.trace(f"{other_vehicle.vid} is not capable of platooning") continue # here, the logic similar to the distributed approach begins # filter vehicles which are not available to become a new leader # we only have this information due to oracle knowledge in the centralized version # we use this to replace management of neighbors and their advertisements if other_vehicle.platoon_role != PlatoonRole.NONE and other_vehicle.platoon_role != PlatoonRole.LEADER: # we can only join an existing platoon or built a new one LOG.trace(f"{other_vehicle.vid} is not available") vehicle._candidates_filtered += 1 vehicle._candidates_filtered_follower += 1 continue # filter vehicles which are already in a maneuver # we only have this information due to oracle knowledge in the centralized version if other_vehicle.in_maneuver: LOG.trace(f"{other_vehicle.vid} is already in a maneuver") vehicle._candidates_filtered += 1 vehicle._candidates_filtered_maneuver += 1 continue platoon = other_vehicle.platoon # for one vehicle A we are looking at a different vehicle B to # check whether it is useful that A joins B # we assume driving alone is worse than to do platooning if platoon is vehicle.platoon: fx = individual LOG.trace(f"Considering driving individually for vehicle {vehicle.vid} (with cost {fx})") elif vehicle.position > platoon.rear_position: # FIXME HACK for skipping platoons behind us # we do not record stats here since this is different to the communication aspect filtering LOG.trace(f"{vehicle.vid}'s platoon {platoon.platoon_id} (leader {platoon.leader.vid}) not applicable because of its absolute position") continue else: # calculate deviation values ds = self.ds(vehicle, platoon) dp = self.dp(vehicle, platoon) # remove platoon if not in speed range if ds > 1.0: # simply applying the threshold LOG.trace(f"{vehicle.vid}'s platoon {platoon.platoon_id} (leader {platoon.leader.vid}) not applicable because of its speed difference ({ds})") continue # remove platoon if not in position range elif dp > 1.0: # simply applying the threshold LOG.trace(f"{vehicle.vid}'s platoon {platoon.platoon_id} (leader {platoon.leader.vid}) not applicable because of its position difference ({dp})") continue else: # calculate deviation/cost fx = self.cost_speed_position(ds, dp) LOG.trace(f"Considering platoon {platoon.platoon_id} (leader {platoon.leader.vid}) for vehicle {vehicle.vid} with cost {fx}") # stats vehicle._candidates_found += 1 if platoon.size > 1: vehicle._candidates_found_platoon += 1 else: vehicle._candidates_found_individual += 1 # NOTE: The distinction of vid and lid is superfluous, since we can only join either individual vehicles or platoon leaders. # More specifically, only individual vehicles or platoon leader are able to "advertise" their platoon (see above) assert other_vehicle.vid == platoon.leader.vid, f"We can only join individual vehicles or platoon leaders! {other_vehicle.vid}, {platoon.platoon_id}, {platoon.leader.vid}" # define (0,1) decision variable for assignment of vehicle to platoon variable = solver.IntVar(0, 1, f"{vehicle.vid} -> {platoon.platoon_id} ({platoon.leader.vid})") # add variable to assignment matrix decision_variables[variable.index()] = { 'vid': vehicle.vid, 'pid': platoon.platoon_id, 'lid': platoon.leader.vid, 'cost': fx, 'var': variable.index(), } # add decision variable from vehicle to platoon to row sum constraint_one_target_platoon.SetCoefficient(variable, 1) # add decision variable from vehicle to platoon with corresponding cost to the row sum objective.SetCoefficient(variable, fx) # end vehicle # end all vehicles # print cost matrix if LOG.getEffectiveLevel() <= logging.DEBUG: print(' ', end=' ') for lid in set([x['lid'] for x in decision_variables.values()]): print(lid, ' ', end=' ') print('(lid)') for vid in set([x['vid'] for x in decision_variables.values()]): print(vid, end=' ') for lid in set([x['lid'] for x in decision_variables.values()]): variables = [x['var'] for x in decision_variables.values() if x['vid'] == vid and x['lid'] == lid] if variables: print(round(decision_variables[solver.variable(variables[0]).index()]['cost'], 1), end=' ') else: print(' - ', end=' ') print() print('(vid)') # add more platoon constraints # NOTE: We need to take the leader here, since the platoon id does not necessarily match the leader id (e.g., when the leader left already) # Also, we want to compare the vehicle id to the leader id to make sure that only one assignment is done for this vehicle LOG.debug(f"{self._owner.iid} is adding additional platoon constraints") # get all platoons for lid in set([x['lid'] for x in decision_variables.values()]): # create constraint for this platoon # assign only one (other) vehicle to this leader constraint_one_member_per_platoon = solver.RowConstraint(0, 1, f"one other member for leader {lid}") # assign a vehicle only if no other vehicles has been assigned to this vehicle constraint_one_assignment_per_vehicle = solver.RowConstraint(0, 1, f"one assignment per vehicle {lid}") # get all variables that assign a vehicle to this vehicle (as leader) for var in [x['var'] for x in decision_variables.values() if x['lid'] == lid and x['vid'] != lid]: # not a self-assignment variable = solver.variable(var) constraint_one_member_per_platoon.SetCoefficient(variable, 1) constraint_one_assignment_per_vehicle.SetCoefficient(variable, 1) # get all variables that assign this vehicle to someone else (as leader) for var in [x['var'] for x in decision_variables.values() if x['vid'] == lid and x['lid'] != lid]: # not a self-assignment constraint_one_assignment_per_vehicle.SetCoefficient(solver.variable(var), 1) if solver.NumConstraints() == 0: LOG.info(f"{self._owner.iid} has no vehicles to run the solver for") return # run the solver to calculate the optimal assignments LOG.info(f"{self._owner.iid} is running the solver for {solver.NumVariables()} possible assignments, and {solver.NumConstraints()} constraints") # solver debug output if LOG.getEffectiveLevel() <= logging.DEBUG: solver.EnableOutput() start_time = timer() result_status = solver.Solve() end_time = timer() run_time = end_time - start_time LOG.info(f"{self._owner.iid}'s solver ran for {run_time}s") LOG.info(f"{self._owner.iid}'s solver ran for {solver.iterations()} iterations") # broken? solution_quality = objective.BestBound() / objective.Value() # record solver trace if self._record_solver_traces: record_solver_trace( basename=self._owner._simulator._result_base_filename, step=self._owner._simulator.step, iid=self._owner.iid, variables=solver.NumVariables(), constraints=solver.NumConstraints(), run_time=run_time, result_status=result_status, # this is a simple int solution_value=objective.Value(), best_bound=objective.BestBound(), solution_quality=solution_quality, ) # TODO record mean runtime if result_status == solver.OPTIMAL: # 0 LOG.info(f"{self._owner.iid}'s solution is optimal") self._assignments_solved += 1 self._assignments_solved_optimal += 1 elif result_status == solver.FEASIBLE: # 1 # from or-tools' documentation: # The solver had enough time to find some solution that satisfies all # constraints, but it did not prove optimality (which means it may or may # not have reached the optimal). # This can happen for large LP models (Linear Programming), and is a frequent # response for time-limited MIPs (Mixed Integer Programming). In the MIP # case, the difference between the solution 'objective_value' and # 'best_objective_bound' fields of the MPSolutionResponse will give an # indication of how far this solution is from the optimal one. LOG.info(f"{self._owner.iid}'s solution is not optimal") self._assignments_solved += 1 self._assignments_solved_feasible += 1 LOG.debug(f"{self._owner.iid}'s optimal objective value is {objective.Value()}") LOG.debug(f"{self._owner.iid}'s best bound is {objective.BestBound()}") LOG.info(f"{solution_quality} approximation of the optimal solution") elif result_status >= solver.INFEASIBLE: # 2 LOG.warning(f"{self._owner.iid}'s optimization problem was not solvable!") self._assignments_not_solved += 1 return if objective.Value() == 0: LOG.info(f"{self._owner.iid} made no assignment!") self._assignments_none += 1 return # print assingment matrix if LOG.getEffectiveLevel() <= logging.DEBUG: print(' ', end=' ') for lid in set([x['lid'] for x in decision_variables.values()]): print(lid, '', end=' ') print('(lid)') for vid in set([x['vid'] for x in decision_variables.values()]): print(vid, end=' ') for lid in set([x['lid'] for x in decision_variables.values()]): variables = [x['var'] for x in decision_variables.values() if x['vid'] == vid and x['lid'] == lid] if variables: print(int(solver.variable(variables[0]).solution_value()), '', end=' ') else: print('- ', end=' ') print() print('(vid)') LOG.debug("Applying solver solution...") for variable in solver.variables(): if variable.solution_value() > 0: mapping = decision_variables[variable.index()] LOG.trace(f"{mapping['vid']} was assigned to leader {mapping['lid']} (platoon {mapping['pid']}) with cost {mapping['cost']}") # get vehicle & platoon data # HACK for oracle knowledge leader = self._owner._simulator._vehicles[mapping['lid']] vehicle = self._owner._simulator._vehicles[mapping['vid']] target_platoon = leader.platoon if mapping['vid'] == mapping['lid']: # self-assignment assert mapping['cost'] == individual LOG.trace(f"{vehicle.vid} keeps driving individually") self._assignments_self += 1 self._assignments_successful += 1 continue if target_platoon.platoon_id != mapping['pid']: # meanwhile, the leader became a platoon member (during application of the solver's solution) # NOTE: this should never happen assert (leader.is_in_platoon() and leader.platoon_role == PlatoonRole.FOLLOWER) LOG.warning(f"{vehicle.vid}'s assigned leader {leader.vid} (platoon {mapping['pid']}) meanwhile joined another platoon {target_platoon.platoon_id}!") self._assignments_candidate_joined_already += 1 sys.exit(f"ERROR [{__name__}]: This should never happen!") continue else: assert not leader.in_maneuver assert (not leader.is_in_platoon() or leader.platoon_role == PlatoonRole.LEADER) # let vehicle join platoon if vehicle.is_in_platoon(): # meanwhile, we became a platoon leader (during application of the solver's solution) # NOTE: this should never happen assert vehicle.platoon_role == PlatoonRole.LEADER LOG.warning(f"{vehicle.vid} meanwhile became the leader of platoon {vehicle.platoon.platoon_id}. Hence, no assignment is possible/necessary anymore") self._assingments_vehicle_became_leader += 1 sys.exit(f"ERROR [{__name__}]: This should never happen!") continue assert not vehicle.in_maneuver assert not leader.in_maneuver # actual join vehicle._join(target_platoon.platoon_id, target_platoon.leader.vid) self._assignments_successful += 1