// store the initial vehicle ids
self.initial_ids = deepcopy(self.vehicles.get_ids())
def _step(self, rl_actions):
Run one timestep of the environment"s dynamics. An autonomous agent
(i.e. autonomous vehicles) performs an action provided by the RL
algorithm. Other cars step forward based on their car following model.
When end of episode is reached, reset() should be called to reset the
environment"s initial state.
Parameters
----------
rl_actions: numpy ndarray
an list of actions provided by the rl algorithm
Returns
-------
observation: numpy ndarray
agent"s observation of the current environment
reward: float
amount of reward associated with the previous state/action pair
done: boolean
indicates whether the episode has ended
info: dictionary
contains other diagnostic information from the previous action
self.time_counter += 1
// perform acceleration actions for controlled human-driven vehicles
if len(self.vehicles.get_controlled_ids()) > 0:
accel = []
for veh_id in self.vehicles.get_controlled_ids():
accel_contr = self.vehicles.get_acc_controller(veh_id)
action = accel_contr.get_action(self)
accel.append(action)
self.apply_acceleration(self.vehicles.get_controlled_ids(), accel)
// perform lane change actions for controlled human-driven vehicles
if len(self.vehicles.get_controlled_lc_ids()) > 0:
new_lane = []
for veh_id in self.vehicles.get_controlled_lc_ids():
lc_contr = self.vehicles.get_lane_changing_controller(veh_id)
target_lane = lc_contr.get_action(self)
new_lane.append(target_lane)
self.apply_lane_change(self.vehicles.get_controlled_lc_ids(),
target_lane=new_lane)
// perform (optionally) routing actions for all vehicle in the network,
// including rl and sumo-controlled vehicles
routing_ids = []
routing_actions = []
for veh_id in self.vehicles.get_ids():
if self.vehicles.get_routing_controller(veh_id) is not None:
routing_ids.append(veh_id)
route_contr = self.vehicles.get_routing_controller(veh_id)
routing_actions.append(route_contr.choose_route(self))
self.choose_routes(veh_ids=routing_ids, route_choices=routing_actions)
self.apply_rl_actions(rl_actions)
self.additional_command()
self.traci_connection.simulationStep()
// update all internal classes with current state data
self._update()
// collect list of sorted vehicle ids
self.sorted_ids, self.sorted_extra_data = self.sort_by_position()