Skip to content

unienv_interface.world.node

WorldNode

Bases: ABC, Generic[ContextType, ObsType, ActType, BArrayType, BDeviceType, BDtypeType, BRNGType]

A stateful node that manages one aspect of a world (e.g. a sensor, a robot, a reward function).

The environment is initialized during the first call to reset or reload.

Lifecycle — reset flow::

World.reset()
  -> WorldNode.reset(priority=...)
  -> WorldNode.after_reset(priority=...)
  -> WorldNode.get_context() / get_observation() / get_info() / render()

Lifecycle — reload flow::

World.reload()
  -> WorldNode.reload(priority=...)
  -> WorldNode.after_reset(priority=...)
  -> WorldNode.get_context() / get_observation() / get_info() / render()

reload re-generates the simulation environment (e.g. re-reading assets, rebuilding the scene). This is typically much more expensive than reset and should only be called when the environment configuration has changed. By default reload delegates to reset.

Lifecycle — step flow::

WorldNode.set_next_action(action)
  -> WorldNode.pre_environment_step(dt, priority=...)
  -> World.step()
  -> WorldNode.post_environment_step(dt, priority=...)
  -> WorldNode.get_observation()
  -> WorldNode.get_reward() / get_termination() / get_truncation() / get_info() / render()

Implementing a node

Subclasses should:

  1. Set name, world, and any relevant spaces / signal flags / render attributes (render_mode, supported_render_modes) in __init__.
  2. Override the lifecycle methods they need (reset, pre_environment_step, etc.).
  3. Populate the corresponding priority sets for every lifecycle method they override. Callers check these sets before dispatching a method call — a node will only have a given method called for priorities present in its corresponding set. A node with an empty priority set for a given method will never have that method called.

Example::

   class MySensor(WorldNode[...]):
       after_reset_priorities = {0}
       post_environment_step_priorities = {0}

       def post_environment_step(self, dt, *, priority=0):
           ...  # read sensor data

Multiple priorities (e.g. {0, 1}) allow the same method to be invoked at different stages; the caller iterates priorities in the desired order.

Space lifecycle contract

action_space, observation_space, and context_space follow a two-phase lifecycle:

Phase 1 — construction (__init__): Nodes should expose a preliminary space if the dimensionality is known before the scene is built. For example, an EEF controller always produces a 6-D action regardless of the number of robot DOFs, so the space can be set to a BoxSpace with [-inf, inf] bounds immediately. If the dimensionality is genuinely unknown (e.g. a joint-position controller whose DOF count comes from the compiled scene), the space may be None at this stage.

Phase 2 — post-build (end of after_reload): By the time the lowest-priority after_reload has returned, every node must have set its final, tightly-bounded spaces. CombinedWorldNode calls _refresh_spaces() at this point so that the aggregated spaces seen by the environment composer reflect the true, post-build state.


Priority conventions (in reload / reset / after_reload / after_reset): Higher priority runs first (sorted(..., reverse=True)).

  • [100, 200) -> Floor / surrounding environment setup (e.g. table, ground plane)
  • [50, 100) -> Robot setup (add URDF/MJCF entity, init controller, apply rest pose)
  • [0, 50) -> Object / prop setup; general post-step observation updates
  • (-, 0) -> Sensors / renderers that must run after all entities are settled (e.g. cameras: -50)

name instance-attribute

name: str

control_timestep class-attribute instance-attribute

control_timestep: Optional[float] = None

update_timestep class-attribute instance-attribute

update_timestep: Optional[float] = None

context_space class-attribute instance-attribute

context_space: Optional[Space[ContextType, BDeviceType, BDtypeType, BRNGType]] = None

observation_space class-attribute instance-attribute

observation_space: Optional[Space[ObsType, BDeviceType, BDtypeType, BRNGType]] = None

action_space class-attribute instance-attribute

action_space: Optional[Space[ActType, BDeviceType, BDtypeType, BRNGType]] = None

has_reward class-attribute instance-attribute

has_reward: bool = False

has_termination_signal class-attribute instance-attribute

has_termination_signal: bool = False

has_truncation_signal class-attribute instance-attribute

has_truncation_signal: bool = False

supported_render_modes class-attribute instance-attribute

supported_render_modes: Sequence[str] = ()

render_mode class-attribute instance-attribute

render_mode: Optional[str] = None

world class-attribute instance-attribute

world: Optional[World[BArrayType, BDeviceType, BDtypeType, BRNGType]] = None

reset_priorities class-attribute instance-attribute

reset_priorities: Set[int] = set()

reload_priorities class-attribute instance-attribute

reload_priorities: Set[int] = set()

after_reset_priorities class-attribute instance-attribute

after_reset_priorities: Set[int] = set()

after_reload_priorities class-attribute instance-attribute

after_reload_priorities: Set[int] = set()

pre_environment_step_priorities class-attribute instance-attribute

pre_environment_step_priorities: Set[int] = set()

post_environment_step_priorities class-attribute instance-attribute

post_environment_step_priorities: Set[int] = set()

backend property

backend: ComputeBackend[BArrayType, BDeviceType, BDtypeType, BRNGType]

Backend provided by the attached world.

device property

device: Optional[BDeviceType]

Device provided by the attached world.

can_render property

can_render: bool

Whether the node currently exposes a render mode.

effective_update_timestep property

effective_update_timestep: Optional[float]

Resolved update period, defaulting to control_timestep when unset.

unwrapped property

unwrapped: WorldNode

prev_wrapper_layer property

prev_wrapper_layer: Optional[WorldNode]

pre_environment_step

pre_environment_step(dt: Union[float, BArrayType], *, priority: int = 0) -> None

This method is called before the environment step Args: dt (float/BArrayType): The time elapsed between the last world step and the current step (NOT the current step and next step).

get_context

get_context() -> Optional[ContextType]

Get the current context from the node. If the context space is None, this method should not be called.

get_observation

get_observation() -> ObsType

Get the current observation from the sensor. If the observation space is None, this method should not be called.

get_reward

get_reward() -> Union[float, BArrayType]

Get the current reward from the environment. If has_reward is False, this method should not be called.

get_termination

get_termination() -> Union[bool, BArrayType]

Get the current termination signal from the environment. If has_termination_signal is False, this method should not be called.

get_truncation

get_truncation() -> Union[bool, BArrayType]

Get the current truncation signal from the environment. If has_truncation_signal is False, this method should not be called.

get_info

get_info() -> Optional[Dict[str, Any]]

Get optional auxiliary information with this node.

render

render() -> Union[np.ndarray, BArrayType, Sequence[Union[np.ndarray, BArrayType]], Dict[str, Union[np.ndarray, BArrayType, Sequence[Union[np.ndarray, BArrayType]]]], None]

Render the current state of the node.

If can_render is False, this method should not be called.

set_next_action

set_next_action(action: ActType) -> None

Update the next action to be taken by the node. This method should be called before pre_environment_step call. If this method is not called after a world step or an action of None is given in the call, the node will compute a dummy action to try retain the same state of the robot. Note that if the action space is None, this method should not be called.

post_environment_step

post_environment_step(dt: Union[float, BArrayType], *, priority: int = 0) -> None

This method is called after the environment step to update the sensor's internal state. Args: dt (float/BArrayType): The time elapsed between the last world step and the current step.

reset

reset(*, priority: int = 0, seed: Optional[int] = None, mask: Optional[BArrayType] = None, **kwargs) -> None

This method is called after World.reset(...) has been called. Reset the node and update its internal state.

reload

reload(*, priority: int = 0, seed: Optional[int] = None, mask: Optional[BArrayType] = None, **kwargs) -> None

Reload the node. By default, this just calls reset with the same parameters. Simulation environments can override this to completely re-read assets and reload the node.

after_reset

after_reset(*, priority: int = 0, mask: Optional[BArrayType] = None) -> None

This method is called after all WorldNodes has been called with reset (e.g. the environment reset is effectively done). Use get_context, get_observation, and get_info to read the post-reset state.

after_reload

after_reload(*, priority: int = 0, mask: Optional[BArrayType] = None) -> None

This method is called after the world has been rebuilt following a reload.

At this point: - All nodes have added their entities during the reload phase - The world scene has been built (simulation is ready) - Nodes can now cache references to entities, geoms, etc.

By default, this calls after_reset(). Override if you need specific post-reload initialization that differs from post-reset.

Use get_context, get_observation, and get_info to read the state.

get_node

get_node(nested_keys: Union[str, Sequence[str]]) -> Optional[WorldNode]

Return a node by key path.

  • str keys are treated as a single direct-child key.
  • Empty key sequence returns self.
  • Vanilla nodes have no children and return None for non-empty lookup.

get_nodes_by_fn

get_nodes_by_fn(fn: Callable[[WorldNode], bool]) -> list[WorldNode]

Return nodes in this subtree matching fn.

Vanilla nodes only evaluate self.

get_nodes_by_type

get_nodes_by_type(node_type: Type[WorldNode]) -> list[WorldNode]

Return nodes in this subtree that are instances of node_type.

close

close() -> None

Release any node-owned resources.

has_wrapper_attr

has_wrapper_attr(name: str) -> bool

Checks if the attribute name exists in the environment.

get_wrapper_attr

get_wrapper_attr(name: str) -> Any

Gets the attribute name from the environment.

set_wrapper_attr

set_wrapper_attr(name: str, value: Any)

Sets the attribute name on the environment with value.