unienv_interface.world.node¶
WorldNode
¶
Bases: ABC, Generic[ContextType, ObsType, ActType, BArrayType, BDeviceType, BDtypeType, BRNGType]
A stateful node that manages one aspect of a world (e.g. a sensor, a robot, a reward function).
The environment is initialized during the first call to reset or reload.
Lifecycle — reset flow::
World.reset()
-> WorldNode.reset(priority=...)
-> WorldNode.after_reset(priority=...)
-> WorldNode.get_context() / get_observation() / get_info() / render()
Lifecycle — reload flow::
World.reload()
-> WorldNode.reload(priority=...)
-> WorldNode.after_reset(priority=...)
-> WorldNode.get_context() / get_observation() / get_info() / render()
reload re-generates the simulation environment (e.g. re-reading assets,
rebuilding the scene). This is typically much more expensive than reset
and should only be called when the environment configuration has changed.
By default reload delegates to reset.
Lifecycle — step flow::
WorldNode.set_next_action(action)
-> WorldNode.pre_environment_step(dt, priority=...)
-> World.step()
-> WorldNode.post_environment_step(dt, priority=...)
-> WorldNode.get_observation()
-> WorldNode.get_reward() / get_termination() / get_truncation() / get_info() / render()
Implementing a node¶
Subclasses should:
- Set
name,world, and any relevant spaces / signal flags / render attributes (render_mode,supported_render_modes) in__init__. - Override the lifecycle methods they need (
reset,pre_environment_step, etc.). - Populate the corresponding priority sets for every lifecycle method they override. Callers check these sets before dispatching a method call — a node will only have a given method called for priorities present in its corresponding set. A node with an empty priority set for a given method will never have that method called.
Example::
class MySensor(WorldNode[...]):
after_reset_priorities = {0}
post_environment_step_priorities = {0}
def post_environment_step(self, dt, *, priority=0):
... # read sensor data
Multiple priorities (e.g. {0, 1}) allow the same method to be invoked at
different stages; the caller iterates priorities in the desired order.
Space lifecycle contract¶
action_space, observation_space, and context_space follow a
two-phase lifecycle:
Phase 1 — construction (__init__):
Nodes should expose a preliminary space if the dimensionality is known
before the scene is built. For example, an EEF controller always produces a
6-D action regardless of the number of robot DOFs, so the space can be set to
a BoxSpace with [-inf, inf] bounds immediately. If the dimensionality
is genuinely unknown (e.g. a joint-position controller whose DOF count comes
from the compiled scene), the space may be None at this stage.
Phase 2 — post-build (end of after_reload):
By the time the lowest-priority after_reload has returned, every node
must have set its final, tightly-bounded spaces. CombinedWorldNode
calls _refresh_spaces() at this point so that the aggregated spaces seen
by the environment composer reflect the true, post-build state.
Priority conventions (in reload / reset / after_reload / after_reset):
Higher priority runs first (sorted(..., reverse=True)).
- [100, 200) -> Floor / surrounding environment setup (e.g. table, ground plane)
- [50, 100) -> Robot setup (add URDF/MJCF entity, init controller, apply rest pose)
- [0, 50) -> Object / prop setup; general post-step observation updates
- (-, 0) -> Sensors / renderers that must run after all entities are settled (e.g. cameras: -50)
context_space
class-attribute
instance-attribute
¶
context_space: Optional[Space[ContextType, BDeviceType, BDtypeType, BRNGType]] = None
observation_space
class-attribute
instance-attribute
¶
observation_space: Optional[Space[ObsType, BDeviceType, BDtypeType, BRNGType]] = None
action_space
class-attribute
instance-attribute
¶
action_space: Optional[Space[ActType, BDeviceType, BDtypeType, BRNGType]] = None
supported_render_modes
class-attribute
instance-attribute
¶
supported_render_modes: Sequence[str] = ()
world
class-attribute
instance-attribute
¶
world: Optional[World[BArrayType, BDeviceType, BDtypeType, BRNGType]] = None
after_reset_priorities
class-attribute
instance-attribute
¶
after_reset_priorities: Set[int] = set()
after_reload_priorities
class-attribute
instance-attribute
¶
after_reload_priorities: Set[int] = set()
pre_environment_step_priorities
class-attribute
instance-attribute
¶
pre_environment_step_priorities: Set[int] = set()
post_environment_step_priorities
class-attribute
instance-attribute
¶
post_environment_step_priorities: Set[int] = set()
backend
property
¶
backend: ComputeBackend[BArrayType, BDeviceType, BDtypeType, BRNGType]
Backend provided by the attached world.
effective_update_timestep
property
¶
effective_update_timestep: Optional[float]
Resolved update period, defaulting to control_timestep when unset.
pre_environment_step
¶
pre_environment_step(dt: Union[float, BArrayType], *, priority: int = 0) -> None
This method is called before the environment step Args: dt (float/BArrayType): The time elapsed between the last world step and the current step (NOT the current step and next step).
get_context
¶
get_context() -> Optional[ContextType]
Get the current context from the node. If the context space is None, this method should not be called.
get_observation
¶
get_observation() -> ObsType
Get the current observation from the sensor. If the observation space is None, this method should not be called.
get_reward
¶
get_reward() -> Union[float, BArrayType]
Get the current reward from the environment.
If has_reward is False, this method should not be called.
get_termination
¶
get_termination() -> Union[bool, BArrayType]
Get the current termination signal from the environment.
If has_termination_signal is False, this method should not be called.
get_truncation
¶
get_truncation() -> Union[bool, BArrayType]
Get the current truncation signal from the environment.
If has_truncation_signal is False, this method should not be called.
get_info
¶
get_info() -> Optional[Dict[str, Any]]
Get optional auxiliary information with this node.
render
¶
render() -> Union[np.ndarray, BArrayType, Sequence[Union[np.ndarray, BArrayType]], Dict[str, Union[np.ndarray, BArrayType, Sequence[Union[np.ndarray, BArrayType]]]], None]
Render the current state of the node.
If can_render is False, this method should not be called.
set_next_action
¶
set_next_action(action: ActType) -> None
Update the next action to be taken by the node.
This method should be called before pre_environment_step call.
If this method is not called after a world step or an action of None is given in the call, the node will compute a dummy action to try retain the same state of the robot.
Note that if the action space is None, this method should not be called.
post_environment_step
¶
post_environment_step(dt: Union[float, BArrayType], *, priority: int = 0) -> None
This method is called after the environment step to update the sensor's internal state. Args: dt (float/BArrayType): The time elapsed between the last world step and the current step.
reset
¶
reset(*, priority: int = 0, seed: Optional[int] = None, mask: Optional[BArrayType] = None, **kwargs) -> None
This method is called after World.reset(...) has been called.
Reset the node and update its internal state.
reload
¶
reload(*, priority: int = 0, seed: Optional[int] = None, mask: Optional[BArrayType] = None, **kwargs) -> None
Reload the node. By default, this just calls reset with the same parameters.
Simulation environments can override this to completely re-read assets and reload the node.
after_reset
¶
after_reset(*, priority: int = 0, mask: Optional[BArrayType] = None) -> None
This method is called after all WorldNodes has been called with reset (e.g. the environment reset is effectively done).
Use get_context, get_observation, and get_info to read the post-reset state.
after_reload
¶
after_reload(*, priority: int = 0, mask: Optional[BArrayType] = None) -> None
This method is called after the world has been rebuilt following a reload.
At this point: - All nodes have added their entities during the reload phase - The world scene has been built (simulation is ready) - Nodes can now cache references to entities, geoms, etc.
By default, this calls after_reset(). Override if you need specific
post-reload initialization that differs from post-reset.
Use get_context, get_observation, and get_info to read the state.
get_node
¶
get_node(nested_keys: Union[str, Sequence[str]]) -> Optional[WorldNode]
Return a node by key path.
strkeys are treated as a single direct-child key.- Empty key sequence returns
self. - Vanilla nodes have no children and return
Nonefor non-empty lookup.
get_nodes_by_fn
¶
get_nodes_by_fn(fn: Callable[[WorldNode], bool]) -> list[WorldNode]
Return nodes in this subtree matching fn.
Vanilla nodes only evaluate self.
get_nodes_by_type
¶
get_nodes_by_type(node_type: Type[WorldNode]) -> list[WorldNode]
Return nodes in this subtree that are instances of node_type.
has_wrapper_attr
¶
has_wrapper_attr(name: str) -> bool
Checks if the attribute name exists in the environment.
set_wrapper_attr
¶
set_wrapper_attr(name: str, value: Any)
Sets the attribute name on the environment with value.