Custom observations#
Overview#
One of the main objectives of the Flatland challenge is to find a suitable observation to solve the problems. Three observations are provided with Flatland out of the box, however it is unlikely that they will be sufficient for this challenge.
Flatland was built with as much flexibility as possible when it comes to building your custom observations. Whenever an environment needs to compute new observations for each agent, it queries an object derived from the ObservationBuilder
base class, which takes the current state of the environment and returns the desired observation.
We will go through 3 examples to explain how to build custom observations:
Simple (but useless) observation#
In this first example we implement all the methods necessary for an observation builder to be valid and work with Flatland. This observation builder will simply return a vector of size 5 filled with the ID of the agent. This is a toy example and wouldn’t help an actual agent to learn anything.
Custom observation builders need to derive from the flatland.core.env_observation_builder.ObservationBuilder
base class and must implement at least two methods, reset(self)
and get(self, handle)
.
Below is a simple example that returns observation vectors of size 5 featuring only the ID (handle) of the agent whose observation vector is being computed:
class SimpleObs(ObservationBuilder):
"""
Simplest observation builder. The object returns observation vectors with 5 identical components,
all equal to the ID of the respective agent.
"""
def reset(self):
return
def get(self, handle):
observation = handle * np.ones(5)
return observation
We can pass an instance of our custom observation builder SimpleObs
to the RailEnv
creator as follows:
env = RailEnv(width=30,
height=30,
number_of_agents=3,
rail_generator=sparse_rail_generator(),
line_generator=sparse_line_generator(),
obs_builder_object=SimpleObs())
env.reset()
Anytime env.reset()
or env.step()
is called, the observation builder will return the custom observation of all agents initialized in the env. Not very useful, but it is a start!
The code sample above is available in custom_observation_example_01_SimpleObs.py
.
In the next example, we highlight how to inherit from existing observation builders and how to access internal variables of Flatland.
Using predictors and rendering observations#
Because the re-scheduling task of the Flatland challenge requires some short term planning, we allow the possibility to use custom predictors that help predict upcoming conflicts and help agent solve them in a timely manner.
The Flatland environment comes with a built-in predictor called ShortestPathPredictorForRailEnv
, to give you an idea what you can do with these predictors.
Any custom predictor can be passed to the observation builder and will then be used to build the observation. In this example we will illustrate how an observation builder can be used to detect conflicts using a predictor.
Note that the toy ObservePredictions
observation we will create only contains information about potential conflicts and has no feature about the agents’ objectives, so it wouldn’t be sufficient to solve real tasks!
You can also render your custom observation or predictor information as an overlay on the environment. All you need to do in order to render your custom observation is to populate self.env.dev_obs_dict[handle]
for every agent (all handles). For the predictor, you can similarly use self.env.dev_pred_dict[handle]
.
In contrast to the previous examples, we also implement the def get_many(self, handles=None)
function for this custom observation builder. The reasoning here is that we want to call the predictor only once per env.step()
. The base implementation of def get_many(self, handles=None)
will call the get(handle)
function for all handles, which mean that it normally does not need to be reimplemented, except for cases such as this one.
class ObservePredictions(TreeObsForRailEnv):
"""
We use the provided ShortestPathPredictor to illustrate the usage of predictors in your custom observation.
We derive our observation builder from TreeObsForRailEnv, to exploit the existing implementation to compute
the minimum distances from each grid node to each agent's target.
This is necessary so that we can pass the distance map to the ShortestPathPredictor
Here we also want to highlight how you can visualize your observation
"""
def __init__(self, predictor):
super().__init__(max_depth=0)
self.predictor = predictor
def reset(self):
# Recompute the distance map, if the environment has changed.
super().reset()
def get_many(self, handles=None):
'''
Because we do not want to call the predictor seperately for every agent we implement the get_many function
Here we can call the predictor just once for all the agents and use the predictions to generate our observations
:param handles:
:return:
'''
self.predictions = self.predictor.get()
self.predicted_pos = {}
for t in range(len(self.predictions[0])):
pos_list = []
for a in handles:
pos_list.append(self.predictions[a][t][1:3])
# We transform (x,y) coordinates to a single integer number for simpler comparison
self.predicted_pos.update({t: coordinate_to_position(self.env.width, pos_list)})
observations = {}
# Collect all the different observation for all the agents
for h in handles:
observations[h] = self.get(h)
return observations
def get(self, handle):
'''
Lets write a simple observation which just indicates whether or not the own predicted path
overlaps with other predicted paths at any time. This is useless for the task of navigation but might
help when looking for conflicts. A more complex implementation can be found in the TreeObsForRailEnv class
Each agent receives an observation of length 10, where each element represents a prediction step and its value
is:
- 0 if no overlap is happening
- 1 if any other paths is crossing the predicted cell
:param handle: handled as an index of an agent
:return: Observation of handle
'''
observation = np.zeros(10)
# We track what cells where considered while building the observation and make them accessible for rendering
visited = set()
for _idx in range(10):
# Check if any of the other prediction overlap with agents own predictions
x_coord = self.predictions[handle][_idx][1]
y_coord = self.predictions[handle][_idx][2]
# We add every observed cell to the observation rendering
visited.add((x_coord, y_coord))
if self.predicted_pos[_idx][handle] in np.delete(self.predicted_pos[_idx], handle, 0):
# We detect if another agent is predicting to pass through the same cell at the same predicted time
observation[handle] = 1
# This variable will be access by the renderer to visualize the observation
self.env.dev_obs_dict[handle] = visited
return observation
We can then use this new observation builder and the renderer to visualize the observation of each agent.
# Create the Predictor
CustomPredictor = ShortestPathPredictorForRailEnv(10)
# Pass the Predictor to the observation builder
CustomObsBuilder = ObservePredictions(CustomPredictor)
# Initiate Environment
env = RailEnv(width=30,
height=30,
number_of_agents=3,
rail_generator=sparse_rail_generator(),
line_generator=sparse_line_generator(),
obs_builder_object=CustomObsBuilder)
env.reset()
obs, info = env.reset()
env_renderer = RenderTool(env)
# We render the initial step and show the obsereved cells as colored boxes
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
action_dict = {}
for step in range(100):
for a in range(env.get_num_agents()):
action = np.random.randint(0, 5)
action_dict[a] = action
obs, all_rewards, done, _ = env.step(action_dict)
print("Rewards: ", all_rewards, " [done=", done, "]")
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
time.sleep(0.5)
The code sample above is available in custom_observation_example_03_ObservePredictions.py
.
Going further#
When building your custom observation builder, you might want to aggregate and define your own features that are different from the raw environment data. The next section explains how to access such information.