if self.timer % 100 == 0:
for veh_id in self.controlled_ids:
car_type = self.vehicles[veh_id]["type"]
newlane = self.scenario.type_params[car_type][2](veh_id, self)
traci.vehicle.changeLane(veh_id, newlane, 10000)
if self.timer > self.perturbation_at and self.timer < (self.perturbation_at + self.perturbation_length):
self.apply_action(self.perturbed_id, self.env_params["max-deacc"])
After Change
else:
self.perturbed_id = list(self.vehicles.keys())[random.randint(0, len(self.vehicles.keys())-1)]
def step(self, rl_actions):
Run one timestep of the environment"s dynamics. "Self-driving cars" will
step forward based on rl_actions, provided by the RL algorithm. Other cars
will step forward based on their car following model. When end of episode
is reached, reset() should be called to reset the environment"s internal state.
Input
-----
rl_actions : an action provided by the rl algorithm
Outputs
-------
(observation, reward, done, info)
observation : agent"s observation of the current environment
reward [Float] : amount of reward due to the previous action
done : a boolean, indicating whether the episode has ended
info : a dictionary containing other diagnostic information from the previous action
logging.debug("================= performing step =================")
for veh_id in self.controlled_ids:
action = self.vehicles[veh_id]["controller"].get_action(self)
safe_action = self.vehicles[veh_id]["controller"].get_safe_action(self, action)
self.apply_action(veh_id, action=safe_action)
logging.debug("Car with id " + veh_id + " is on route " + str(traci.vehicle.getRouteID(veh_id)))
for index, veh_id in enumerate(self.rl_ids):
action = rl_actions[index]
safe_action = self.vehicles[veh_id]["controller"].get_safe_action(self, action)
self.apply_action(veh_id, action=safe_action)
self.timer += 1
// TODO: Turn 100 into a hyperparameter
// if it"s been long enough try and change lanes
if self.timer % 100 == 0:
for veh_id in self.controlled_ids:
// car_type = self.vehicles[veh_id]["type"]
// newlane = self.scenario.type_params[car_type][2](veh_id, self)
newlane = self.vehicles[veh_id]["lane_changer"].get_action(self)
traci.vehicle.changeLane(veh_id, newlane, 10000)
if self.timer > self.perturbation_at and self.timer < (self.perturbation_at + self.perturbation_length):
self.apply_action(self.perturbed_id, self.env_params["max-deacc"])