unienv_interface.world.funcnode¶
FuncWorldNode
¶
Bases: ABC, Generic[WorldStateT, NodeStateT, ContextType, ObsType, ActType, BArrayType, BDeviceType, BDtypeType, BRNGType]
A functional (stateless) node that manages one aspect of a world.
Unlike WorldNode, every method receives and returns explicit state objects
(world_state, node_state) instead of mutating internal attributes.
initial creates the node state for the first time (the environment has not
been set up yet). reset is called on subsequent episode boundaries when the
node state already exists.
Lifecycle — initial flow (environment not yet created)::
FuncWorldNode.initial(world_state, priority=...)
-> returns (world_state, node_state)
Lifecycle — reset flow::
FuncWorld.reset()
-> FuncWorldNode.reset(world_state, node_state, priority=...)
-> FuncWorldNode.after_reset(world_state, node_state, priority=...)
-> FuncWorldNode.get_context(...) / get_observation(...) / get_info(...) / render(...)
Lifecycle — reload flow::
FuncWorld.reload() # World prepares (clear, load assets)
-> FuncWorldNode.reload(world_state, priority=...) # Nodes add entities
FuncWorld.after_reload() # World compiles scene
-> FuncWorldNode.after_reload(world_state, node_state, priority=...) # Nodes cache refs
-> FuncWorldNode.get_context(...) / get_observation(...) / get_info(...) / render(...)
reload re-generates the simulation environment (e.g. re-reading assets,
rebuilding the scene). This is typically much more expensive than reset
and should only be called when the environment configuration has changed.
By default reload delegates to initial.
Lifecycle — step flow::
FuncWorldNode.set_next_action(world_state, node_state, action)
-> FuncWorldNode.pre_environment_step(world_state, node_state, dt, priority=...)
-> FuncWorld.step()
-> FuncWorldNode.post_environment_step(world_state, node_state, dt, priority=...)
-> FuncWorldNode.get_observation(world_state, node_state)
-> FuncWorldNode.get_reward(...) / get_termination(...) / get_truncation(...) / get_info(...) / render(...)
Implementing a node¶
Subclasses should:
- Set
name,world, and any relevant spaces / signal flags / render attributes (render_mode,supported_render_modes) in__init__(or as class-level attributes). - Implement
initialandreset(both abstract), plus any other lifecycle methods the node needs (pre_environment_step,post_environment_step, etc.). - Populate the corresponding priority sets for every lifecycle method they implement. Callers check these sets before dispatching a method call — a node will only have a given method called for priorities present in its corresponding set. A node with an empty priority set for a given method will never have that method called.
Example::
class MySensor(FuncWorldNode[...]):
initial_priorities = {0}
reset_priorities = {0}
after_reset_priorities = {0}
post_environment_step_priorities = {0}
def initial(self, world_state, *, priority=0, seed=None, **kwargs):
...
def reset(self, world_state, node_state, *, priority=0, seed=None, mask=None, **kwargs):
...
Multiple priorities (e.g. {0, 1}) allow the same method to be invoked at
different stages; the caller iterates priorities in the desired order.
Space contract¶
action_space, observation_space, and context_space are static
interface metadata. They must be fully declared in __init__ and must
not change after construction. CombinedFuncWorldNode reads them once at
construction time; there is no refresh mechanism.
Unlike stateful WorldNode subclasses (which may need to build a simulation
scene before knowing DOF counts), functional nodes are expected to derive their
spaces purely from their configuration parameters.
context_space
class-attribute
instance-attribute
¶
context_space: Optional[Space[ContextType, BDeviceType, BDtypeType, BRNGType]] = None
observation_space
class-attribute
instance-attribute
¶
observation_space: Optional[Space[ObsType, BDeviceType, BDtypeType, BRNGType]] = None
action_space
class-attribute
instance-attribute
¶
action_space: Optional[Space[ActType, BDeviceType, BDtypeType, BRNGType]] = None
supported_render_modes
class-attribute
instance-attribute
¶
supported_render_modes: Sequence[str] = ()
world
class-attribute
instance-attribute
¶
world: Optional[FuncWorld[WorldStateT, BArrayType, BDeviceType, BDtypeType, BRNGType]] = None
after_reset_priorities
class-attribute
instance-attribute
¶
after_reset_priorities: Set[int] = set()
after_reload_priorities
class-attribute
instance-attribute
¶
after_reload_priorities: Set[int] = set()
pre_environment_step_priorities
class-attribute
instance-attribute
¶
pre_environment_step_priorities: Set[int] = set()
post_environment_step_priorities
class-attribute
instance-attribute
¶
post_environment_step_priorities: Set[int] = set()
backend
property
¶
backend: ComputeBackend[BArrayType, BDeviceType, BDtypeType, BRNGType]
Backend provided by the attached world.
effective_update_timestep
property
¶
effective_update_timestep: Optional[float]
Resolved update period, defaulting to control_timestep when unset.
initial
abstractmethod
¶
initial(world_state: WorldStateT, *, priority: int = 0, seed: Optional[int] = None, **kwargs) -> Tuple[WorldStateT, NodeStateT]
Create the node state for a freshly initialized world.
reload
¶
reload(world_state: WorldStateT, *, priority: int = 0, seed: Optional[int] = None, **kwargs) -> Tuple[WorldStateT, NodeStateT]
Reload the node. By default, this just calls initial with the same parameters.
Simulation environments can override this to completely re-read assets and reload the node.
reset
abstractmethod
¶
reset(world_state: WorldStateT, node_state: NodeStateT, *, priority: int = 0, seed: Optional[int] = None, mask: Optional[BArrayType] = None, **kwargs) -> Tuple[WorldStateT, NodeStateT]
This method is called after FuncWorld.reset(...) has been called.
after_reset
¶
after_reset(world_state: WorldStateT, node_state: NodeStateT, *, priority: int = 0, mask: Optional[BArrayType] = None) -> Tuple[WorldStateT, NodeStateT]
This method is called after all FuncWorldNodes has been called with reset (e.g. the environment reset is effectively done).
Use get_context, get_observation, and get_info to read the post-reset state.
after_reload
¶
after_reload(world_state: WorldStateT, node_state: NodeStateT, *, priority: int = 0, mask: Optional[BArrayType] = None) -> Tuple[WorldStateT, NodeStateT]
This method is called after the world has been rebuilt following a reload.
At this point: - All nodes have added their entities during the reload phase - The world scene has been built (simulation is ready) - Nodes can now cache references to entities, geoms, etc.
By default, this calls after_reset(). Override if you need specific
post-reload initialization that differs from post-reset.
Use get_context, get_observation, and get_info to read the state.
pre_environment_step
¶
pre_environment_step(world_state: WorldStateT, node_state: NodeStateT, dt: Union[float, BArrayType], *, priority: int = 0) -> Tuple[WorldStateT, NodeStateT]
This method is called before each environment step. Args: world_state (WorldStateT): The current state of the world. node_state (NodeStateT): The current state of the node. dt (Union[float, BArrayType]): The time delta since the last step.
get_context
¶
get_context(world_state: WorldStateT, node_state: NodeStateT) -> Optional[ContextType]
Get the current context from the node. If the context space is None, this method should not be called.
get_observation
¶
get_observation(world_state: WorldStateT, node_state: NodeStateT) -> ObsType
Get the current observation from the sensor. If the observation space is None, this method should not be called.
get_reward
¶
get_reward(world_state: WorldStateT, node_state: NodeStateT) -> Union[float, BArrayType]
Get the current reward from the environment. If the reward space is None, this method should not be called.
get_termination
¶
get_termination(world_state: WorldStateT, node_state: NodeStateT) -> Union[bool, BArrayType]
Get the current termination status from the environment. If the termination space is None, this method should not be called.
get_truncation
¶
get_truncation(world_state: WorldStateT, node_state: NodeStateT) -> Union[bool, BArrayType]
Get the current truncation status from the environment. If the truncation space is None, this method should not be called.
get_info
¶
get_info(world_state: WorldStateT, node_state: NodeStateT) -> Optional[Dict[str, Any]]
Get the current info from the environment.
render
¶
render(world_state: WorldStateT, node_state: NodeStateT) -> Union[np.ndarray, BArrayType, Sequence[Union[np.ndarray, BArrayType]], Dict[str, Union[np.ndarray, BArrayType, Sequence[Union[np.ndarray, BArrayType]]]], None]
Render the current state of the node.
If can_render is False, this method should not be called.
set_next_action
¶
set_next_action(world_state: WorldStateT, node_state: NodeStateT, action: ActType) -> Tuple[WorldStateT, NodeStateT]
Update the next action to be taken by the node.
This method should be called before pre_environment_step call.
If this method is not called after a world step or an action of None is given in the call, the node will compute a dummy action to try retain the same state of the robot.
Note that if the action space is None, this method should not be called.
post_environment_step
¶
post_environment_step(world_state: WorldStateT, node_state: NodeStateT, dt: Union[float, BArrayType], *, priority: int = 0) -> Tuple[WorldStateT, NodeStateT]
This method is called after the environment step to update the sensor's internal state. Args: world_state (WorldStateT): The current state of the world. node_state (NodeStateT): The current state of the node. dt (Union[float, BArrayType]): The time delta since the last step.
close
¶
close(world_state: WorldStateT, node_state: NodeStateT) -> WorldStateT
Release node-owned resources and return the final world state.
get_node
¶
get_node(nested_keys: Union[str, Sequence[str]]) -> Optional[FuncWorldNode]
Return a node by key path.
strkeys are treated as a single direct-child key.- Empty key sequence returns
self. - Vanilla nodes have no children and return
Nonefor non-empty lookup.
get_nodes_by_fn
¶
get_nodes_by_fn(fn: Callable[[FuncWorldNode], bool]) -> list[FuncWorldNode]
Return nodes in this subtree matching fn.
Vanilla nodes only evaluate self.
get_nodes_by_type
¶
get_nodes_by_type(node_type: Type[FuncWorldNode]) -> list[FuncWorldNode]
Return nodes in this subtree that are instances of node_type.
has_wrapper_attr
¶
has_wrapper_attr(name: str) -> bool
Checks if the attribute name exists in the environment.
set_wrapper_attr
¶
set_wrapper_attr(name: str, value: Any)
Sets the attribute name on the environment with value.