// so density in vehicles/km would be 1000 * self.density
self.density = self.scenario.num_vehicles / self.scenario.net_params["length"]
def setup_initial_state(self):
Store initial state so that simulation can be reset at the end.
TODO: Make traci calls as bulk as possible
Initial state is a dictionary: key = vehicle IDs, value = state describing car
// collect ids and prepare id and vehicle lists
self.ids = self.traci_connection.vehicle.getIDList()
self.controlled_ids.clear()
self.sumo_ids.clear()
self.rl_ids.clear()
self.vehicles.clear()
// create the list of colors used to different between different types of
// vehicles visually on sumo"s gui
self.colors = {}
key_index = 1
color_choice = np.random.choice(len(COLORS))
for key in self.scenario.type_params.keys():
self.colors[key] = COLORS[(color_choice + key_index) % len(COLORS)]
key_index += 1
for veh_id in self.ids:
// import initial state from traci and place in vehicle dict
vehicle = dict()
vehicle["id"] = veh_id
veh_type = self.traci_connection.vehicle.getTypeID(veh_id)
vehicle["type"] = veh_type
self.traci_connection.vehicle.setColor(veh_id, self.colors[veh_type])
vehicle["edge"] = self.traci_connection.vehicle.getRoadID(veh_id)
vehicle["position"] = self.traci_connection.vehicle.getLanePosition(veh_id)
vehicle["lane"] = self.traci_connection.vehicle.getLaneIndex(veh_id)
vehicle["speed"] = self.traci_connection.vehicle.getSpeed(veh_id)
vehicle["length"] = self.traci_connection.vehicle.getLength(veh_id)
vehicle["max_speed"] = self.traci_connection.vehicle.getMaxSpeed(veh_id)
// specify acceleration controller
controller_params = self.scenario.type_params[veh_type][1]
vehicle["controller"] = controller_params[0](veh_id=veh_id, **controller_params[1])
if controller_params[0] == SumoController:
self.sumo_ids.append(veh_id)
elif controller_params[0] == RLController:
self.rl_ids.append(veh_id)
else:
self.controlled_ids.append(veh_id)
// specify lane-changing controller
lane_changer_params = self.scenario.type_params[veh_type][2]
if lane_changer_params is not None:
vehicle["lane_changer"] = lane_changer_params[0](veh_id=veh_id, **lane_changer_params[1])
else:
vehicle["lane_changer"] = None
self.vehicles[veh_id] = vehicle
self.vehicles[veh_id]["absolute_position"] = self.get_x_by_id(veh_id)
// the time step of the last lane change is always present in the environment,
// but only used by sub-classes that apply lane changing
self.vehicles[veh_id]["last_lc"] = -1 * self.lane_change_duration
// set speed mode
self.set_speed_mode(veh_id)
// set lane change mode
self.set_lane_change_mode(veh_id)
// Saving initial state
// route_id = self.traci_connection.vehicle.getRouteID(veh_id)
route_id = "route" + vehicle["edge"]
pos = self.traci_connection.vehicle.getPosition(veh_id)
self.initial_state[veh_id] = (vehicle["type"], route_id, vehicle["lane"],
vehicle["position"], vehicle["speed"], pos)
// collect list of sorted vehicle ids
self.sorted_ids = self.sort_by_position()
// collect headway, leader id, and follower id data
// vehicles = self.get_headway_dict()
for veh_id in self.ids:
// self.vehicles[veh_id]["headway"] = vehicles[veh_id]["headway"]
// self.vehicles[veh_id]["leader"] = vehicles[veh_id]["leader"]
// self.vehicles[veh_id]["follower"] = vehicles[veh_id]["follower"]
headway = self.traci_connection.vehicle.getLeader(veh_id, 200)
if headway is None:
self.vehicles[veh_id]["leader"] = ""
self.vehicles[veh_id]["follower"] = ""
self.vehicles[veh_id]["headway"] = self.scenario.length - self.vehicles[veh_id]["length"]
else:
self.vehicles[veh_id]["headway"] = headway[1]
self.vehicles[veh_id]["leader"] = headway[0]
self.vehicles[headway[0]]["follower"] = veh_id
// dictionary of initial observations used while resetting vehicles after each rollout
self.initial_observations = deepcopy(dict(self.vehicles))
// contains the last lc before the current step
self.prev_last_lc = dict()