unienv_data¶
BatchBase
¶
BatchBase(single_space: Space[BatchT, BDeviceType, BDtypeType, BRNGType], single_metadata_space: Optional[DictSpace[BDeviceType, BDtypeType, BRNGType]] = None)
Bases: ABC, Generic[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Base interface for dataset-like collections of structured samples.
BatchBase stores the per-sample space definition and provides a uniform
API for retrieving either structured values or their flattened backend-array
representation. Concrete subclasses decide where data lives and which
mutation operations are supported.
Initialize the batch contract for one logical sample.
get_flattened_at
¶
get_flattened_at(idx: Union[IndexableType, BArrayType]) -> BArrayType
Fetch samples as flattened backend arrays.
get_flattened_at_with_metadata
¶
get_flattened_at_with_metadata(idx: Union[IndexableType, BArrayType]) -> Tuple[BArrayType, Optional[Dict[str, Any]]]
Fetch flattened samples together with optional per-sample metadata.
set_flattened_at
¶
set_flattened_at(idx: Union[IndexableType, BArrayType], value: BArrayType) -> None
Overwrite existing samples using flattened data.
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
extend_flattened
¶
extend_flattened(value: BArrayType) -> None
Append a batched block of flattened samples.
get_at
¶
get_at(idx: Union[IndexableType, BArrayType]) -> BatchT
Fetch samples in their structured form.
get_at_with_metadata
¶
get_at_with_metadata(idx: Union[IndexableType, BArrayType]) -> Tuple[BatchT, Optional[Dict[str, Any]]]
Fetch structured samples together with optional metadata.
set_at
¶
set_at(idx: Union[IndexableType, BArrayType], value: BatchT) -> None
Overwrite existing samples using structured data.
remove_at
¶
remove_at(idx: Union[IndexableType, BArrayType]) -> None
Remove one or more samples from the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
get_column
¶
get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a nested field inside each sample.
BatchSampler
¶
BatchSampler(single_space: Space[BatchT, BDeviceType, BDtypeType, BRNGType], single_metadata_space: Optional[DictSpace[BDeviceType, BDtypeType, BRNGType]] = None, batch_size: int = 1)
Bases: Generic[SamplerBatchT, SamplerArrayType, SamplerDeviceType, SamplerDtypeType, SamplerRNGType, BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], BatchBase[SamplerBatchT, SamplerArrayType, SamplerDeviceType, SamplerDtypeType, SamplerRNGType]
Read-only batch wrapper that samples mini-batches from another batch.
Configure the sampling batch shape without binding the source data yet.
sampled_space
property
¶
sampled_space: Space[SamplerBatchT, SamplerDeviceType, SamplerDtypeType, SamplerRNGType]
sampled_metadata_space
property
¶
sampled_metadata_space: Optional[DictSpace[SamplerDeviceType, SamplerDtypeType, SamplerRNGType]]
get_flattened_at
¶
get_flattened_at(idx: Union[IndexableType, BArrayType]) -> BArrayType
Fetch samples as flattened backend arrays.
get_flattened_at_with_metadata
¶
get_flattened_at_with_metadata(idx: Union[IndexableType, BArrayType]) -> Tuple[BArrayType, Optional[Dict[str, Any]]]
Fetch flattened samples together with optional per-sample metadata.
set_flattened_at
¶
set_flattened_at(idx: Union[IndexableType, BArrayType], value: BArrayType) -> None
Overwrite existing samples using flattened data.
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
extend_flattened
¶
extend_flattened(value: BArrayType) -> None
Append a batched block of flattened samples.
get_at
¶
get_at(idx: Union[IndexableType, BArrayType]) -> BatchT
Fetch samples in their structured form.
get_at_with_metadata
¶
get_at_with_metadata(idx: Union[IndexableType, BArrayType]) -> Tuple[BatchT, Optional[Dict[str, Any]]]
Fetch structured samples together with optional metadata.
set_at
¶
set_at(idx: Union[IndexableType, BArrayType], value: BatchT) -> None
Overwrite existing samples using structured data.
remove_at
¶
remove_at(idx: Union[IndexableType, BArrayType]) -> None
Remove one or more samples from the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
get_column
¶
get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a nested field inside each sample.
manual_seed
¶
manual_seed(seed: int) -> None
Reset sampler RNG state, including the optional data-index RNG.
sample_flat_with_metadata
¶
sample_flat_with_metadata() -> Tuple[SamplerArrayType, Optional[Dict[str, Any]]]
Sample a flattened batch together with optional metadata.
sample_with_metadata
¶
sample_with_metadata() -> Tuple[SamplerBatchT, Optional[Dict[str, Any]]]
Sample a structured batch together with optional metadata.
epoch_iter
¶
epoch_iter() -> Iterator[SamplerBatchT]
Iterate once over a random permutation of the source data.
epoch_iter_with_metadata
¶
epoch_iter_with_metadata() -> Iterator[Tuple[SamplerBatchT, Optional[Dict[str, Any]]]]
Iterate once over shuffled structured batches plus metadata.
epoch_flat_iter
¶
epoch_flat_iter() -> Iterator[SamplerArrayType]
Iterate once over shuffled batches in flattened form.
epoch_flat_iter_with_metadata
¶
epoch_flat_iter_with_metadata() -> Iterator[Tuple[SamplerArrayType, Optional[Dict[str, Any]]]]
Iterate once over shuffled flattened batches plus metadata.
SpaceStorage
¶
SpaceStorage(single_instance_space: Space[BatchT, BDeviceType, BDtypeType, BRNGType])
Bases: ABC, Generic[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
SpaceStorage is an abstract base class for storages that hold instances of a specific space.
It provides a common interface for creating, loading, and managing the storage of instances of a given space.
Note that if you want your space storage to support multiprocessing, you need to check / implement __getstate__ and __setstate__ methods to ensure that the storage can be pickled and unpickled correctly.
Bind the storage to the space of one stored element.
single_file_ext
class-attribute
instance-attribute
¶
single_file_ext: Optional[str] = None
The total capacity (number of single instances) of the storage. If None, the storage has unlimited capacity.
capacity
class-attribute
instance-attribute
¶
capacity: Optional[int] = None
The cache path for the storage. If None, the storage will not use caching.
cache_filename
class-attribute
instance-attribute
¶
cache_filename: Optional[Union[str, PathLike]] = None
Can the storage instance be safely used in multiprocessing environments after creation? If True, the storage can be used in multiprocessing environments.
is_multiprocessing_safe
class-attribute
instance-attribute
¶
is_multiprocessing_safe: bool = False
Is the storage mutable? If False, the storage is read-only.
create
classmethod
¶
create(single_instance_space: Space[BatchT, BDeviceType, BDtypeType, BRNGType], *args, capacity: Optional[int], cache_path: Optional[Union[str, PathLike]] = None, multiprocessing: bool = False, **kwargs) -> SpaceStorage[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a new storage instance for samples from single_instance_space.
load_from
classmethod
¶
load_from(path: Union[str, PathLike], single_instance_space: Space[BatchT, BDeviceType, BDtypeType, BRNGType], *, capacity: Optional[int] = None, read_only: bool = True, multiprocessing: bool = False, **kwargs) -> SpaceStorage[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Restore a storage instance previously written to disk.
append
¶
append(value) -> None
Append one structured sample to the storage.
Appended values may be immediately visible or only visible after
mark_segment_end() depending on the storage implementation.
extend_length
¶
extend_length(length: int) -> None
This is used by capacity = None storages to extend the length of the storage If this is called on a storage with a fixed capacity, we will simply ignore the call.
shrink_length
¶
shrink_length(length: int) -> None
This is used by capacity = None storages to shrink the length of the storage If this is called on a storage with a fixed capacity, we will simply ignore the call.
get
abstractmethod
¶
get(index: Union[IndexableType, BArrayType]) -> BatchT
Read one or more samples from storage.
set
abstractmethod
¶
set(index: Union[IndexableType, BArrayType], value: BatchT) -> None
Write one or more samples into storage.
clear
¶
clear() -> None
Clear all data inside the storage and set the length to 0 if the storage has unlimited capacity. For storages with fixed capacity, this should reset the storage to its initial state.
dumps
abstractmethod
¶
dumps(path: Union[str, PathLike]) -> None
Dumps the storage to the specified path.
This is used for storages that have a single file extension (e.g. .pt for PyTorch).
ToBackendOrDeviceBatch
¶
ToBackendOrDeviceBatch(batch: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], backend: Optional[ComputeBackend[WrapperBArrayT, WrapperBDeviceT, WrapperBDtypeT, WrapperBRngT]] = None, device: Optional[WrapperBDeviceT] = None)
Bases: Generic[WrapperBatchT, WrapperBArrayT, WrapperBDeviceT, WrapperBDtypeT, WrapperBRngT, BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], BatchBase[WrapperBatchT, WrapperBArrayT, WrapperBDeviceT, WrapperBDtypeT, WrapperBRngT]
backend
property
¶
backend: ComputeBackend[WrapperBArrayT, WrapperBDeviceT, WrapperBDtypeT, WrapperBRngT]
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
get_column
¶
get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a nested field inside each sample.
CombinedBatch
¶
CombinedBatch(batches: Sequence[BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]], device: Optional[BDeviceType] = None, use_thread_pool: bool = False)
Bases: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
get_column
¶
get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a nested field inside each sample.
get_flattened_at_with_metadata
¶
get_flattened_at_with_metadata(idx: Union[IndexableType, BaseException]) -> Tuple[BArrayType, Dict[str, Any]]
get_at_with_metadata
¶
get_at_with_metadata(idx: Union[IndexableType, BArrayType]) -> Tuple[BatchT, Dict[str, Any]]
set_flattened_at
¶
set_flattened_at(idx: Union[IndexableType, BArrayType], value: BArrayType) -> None
SliceStackedBatch
¶
SliceStackedBatch(batch: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], fixed_offset: BArrayType, get_valid_mask_function: Optional[Callable[[SliceStackedBatch, BArrayType, BatchT, Dict[str, Any]], BArrayType]] = None, fill_invalid_data: bool = True, stack_metadata: bool = False)
Bases: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
A batch that stacks frames with given fixed offsets. This is a read-only batch, since it is a view of the original batch. If you want to change the data, you should mutate the containing batch instead.
set_flattened_at
¶
set_flattened_at(idx: Union[IndexableType, BArrayType], value: BArrayType) -> None
Overwrite existing samples using flattened data.
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
extend_flattened
¶
extend_flattened(value: BArrayType) -> None
Append a batched block of flattened samples.
set_at
¶
set_at(idx: Union[IndexableType, BArrayType], value: BatchT) -> None
Overwrite existing samples using structured data.
remove_at
¶
remove_at(idx: Union[IndexableType, BArrayType]) -> None
Remove one or more samples from the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
get_column
¶
get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a nested field inside each sample.
expand_index
¶
expand_index(index: BArrayType) -> BArrayType
Expand indexes to slice the data Args: index (BArrayType): A 1D tensor of indices to expand. Can be boolean or integer. Returns: BArrayType: A 2D tensor of indices, where each row corresponds to the expanded indices for each batch item.
get_valid_mask_flattened
¶
get_valid_mask_flattened(expanded_idx: BArrayType, data: BArrayType, metadata: Dict[str, Any]) -> BArrayType
Get the valid mask for the flattened data. Args: expanded_idx (B, T): A 2D tensor of indices to slice the data. data (B, T, *D): The data to slice. Returns: Valid Mask (B, T)
get_valid_mask
¶
get_valid_mask(expanded_idx: BArrayType, data: BatchT, metadata: Dict[str, Any]) -> BArrayType
Get the valid mask for the data. Args: expanded_idx (B, T): A 2D tensor of indices to slice the data. data (BatchT): The data to slice. Returns: Valid Mask (B, T)
fill_data_with_stack_mask
¶
fill_data_with_stack_mask(space: Optional[Space[Any, BDeviceType, BDtypeType, BRNGType]], data: Union[BArrayType, BatchT, Any], valid_mask: BArrayType) -> Union[BArrayType, BatchT, Any]
Fill the data with the mask as if the frames were frame-stacked. Args: space (Optional[Space]): The space to fill the data with. If None, the data is assumed to be a backend array. data (Union[BArrayType, BatchT, Any]): The data to fill sized (B, T, D) valid_mask (BArrayType): The mask to fill the data with, sized (B, T) Returns: Union[BArrayType, BatchT, Any]: The filled data, sized (B, T, D)
get_at_with_metadata
¶
get_at_with_metadata(idx: Union[IndexableType, BArrayType]) -> Tuple[BatchT, Dict[str, Any]]
get_valid_mask_function_with_episodeid_key
staticmethod
¶
get_valid_mask_function_with_episodeid_key(episode_id_key: Union[str, int] = 'episode_id', is_in_metadata: bool = False) -> Callable[[SliceStackedBatch, BArrayType, BatchT], BArrayType]
get_valid_mask_function_with_episode_end_key
staticmethod
¶
get_valid_mask_function_with_episode_end_key(episode_end_key: Union[str, int] = 'episode_end', is_in_metadata: bool = False) -> Callable[[SliceStackedBatch, BArrayType, BatchT], BArrayType]
FrameStackedBatch
¶
FrameStackedBatch(batch: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], prefetch_horizon: int = 0, postfetch_horizon: int = 0, get_valid_mask_function: Optional[Callable[[SliceStackedBatch, BArrayType, BatchT, Dict[str, Any]], BArrayType]] = None, fill_invalid_data: bool = True, stack_metadata: bool = False)
Bases: SliceStackedBatch[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
A batch that stacks frames in a sliding window manner. This batch allows for prefetching and postfetching of frames, which can be useful for training models that require temporal context (e.g. Diffusion Policy, ACT, etc.) This is a read-only batch, since it is a view of the original batch. If you want to change the data, you should mutate the containing batch instead.
set_flattened_at
¶
set_flattened_at(idx: Union[IndexableType, BArrayType], value: BArrayType) -> None
Overwrite existing samples using flattened data.
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
extend_flattened
¶
extend_flattened(value: BArrayType) -> None
Append a batched block of flattened samples.
get_at_with_metadata
¶
get_at_with_metadata(idx: Union[IndexableType, BArrayType]) -> Tuple[BatchT, Dict[str, Any]]
set_at
¶
set_at(idx: Union[IndexableType, BArrayType], value: BatchT) -> None
Overwrite existing samples using structured data.
remove_at
¶
remove_at(idx: Union[IndexableType, BArrayType]) -> None
Remove one or more samples from the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
get_column
¶
get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a nested field inside each sample.
expand_index
¶
expand_index(index: BArrayType) -> BArrayType
Expand indexes to slice the data Args: index (BArrayType): A 1D tensor of indices to expand. Can be boolean or integer. Returns: BArrayType: A 2D tensor of indices, where each row corresponds to the expanded indices for each batch item.
get_valid_mask_flattened
¶
get_valid_mask_flattened(expanded_idx: BArrayType, data: BArrayType, metadata: Dict[str, Any]) -> BArrayType
Get the valid mask for the flattened data. Args: expanded_idx (B, T): A 2D tensor of indices to slice the data. data (B, T, *D): The data to slice. Returns: Valid Mask (B, T)
get_valid_mask
¶
get_valid_mask(expanded_idx: BArrayType, data: BatchT, metadata: Dict[str, Any]) -> BArrayType
Get the valid mask for the data. Args: expanded_idx (B, T): A 2D tensor of indices to slice the data. data (BatchT): The data to slice. Returns: Valid Mask (B, T)
fill_data_with_stack_mask
¶
fill_data_with_stack_mask(space: Optional[Space[Any, BDeviceType, BDtypeType, BRNGType]], data: Union[BArrayType, BatchT, Any], valid_mask: BArrayType) -> Union[BArrayType, BatchT, Any]
Fill the data with the mask as if the frames were frame-stacked. Args: space (Optional[Space]): The space to fill the data with. If None, the data is assumed to be a backend array. data (Union[BArrayType, BatchT, Any]): The data to fill sized (B, T, D) valid_mask (BArrayType): The mask to fill the data with, sized (B, T) Returns: Union[BArrayType, BatchT, Any]: The filled data, sized (B, T, D)
get_valid_mask_function_with_episodeid_key
staticmethod
¶
get_valid_mask_function_with_episodeid_key(episode_id_key: Union[str, int] = 'episode_id', is_in_metadata: bool = False) -> Callable[[SliceStackedBatch, BArrayType, BatchT], BArrayType]
get_valid_mask_function_with_episode_end_key
staticmethod
¶
get_valid_mask_function_with_episode_end_key(episode_end_key: Union[str, int] = 'episode_end', is_in_metadata: bool = False) -> Callable[[SliceStackedBatch, BArrayType, BatchT], BArrayType]
SubIndexedBatch
¶
SubIndexedBatch(batch: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], sub_indexes: Union[IndexableType, BArrayType])
Bases: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
A batch that offers a view of the original batch with a fixed index range. This does not support batch extension.
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
remove_at
¶
remove_at(idx: Union[IndexableType, BArrayType]) -> None
Remove one or more samples from the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
get_column
¶
get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a nested field inside each sample.
SubItemBatch
¶
SubItemBatch(batch: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], sub_indexes: Sequence[Any])
Bases: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
A batch that offers a view of the original batch with a fixed index. This is a read-only batch, since it is a view of the original batch. If you want to change the data, you should mutate the containing batch instead.
set_flattened_at
¶
set_flattened_at(idx: Union[IndexableType, BArrayType], value: BArrayType) -> None
Overwrite existing samples using flattened data.
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
extend_flattened
¶
extend_flattened(value: BArrayType) -> None
Append a batched block of flattened samples.
set_at
¶
set_at(idx: Union[IndexableType, BArrayType], value: BatchT) -> None
Overwrite existing samples using structured data.
remove_at
¶
remove_at(idx: Union[IndexableType, BArrayType]) -> None
Remove one or more samples from the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
get_column
¶
get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a nested field inside each sample.
TransformedBatch
¶
TransformedBatch(batch: BatchBase[SourceDataT, SourceBArrT, SourceBDeviceT, SourceBDTypeT, SourceBDRNGT], transformation: DataTransformation, metadata_transformation: Optional[DataTransformation] = None)
Bases: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
metadata_transformation
instance-attribute
¶
metadata_transformation = metadata_transformation if single_metadata_space is not None else None
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
get_column
¶
get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a nested field inside each sample.
get_flattened_at_with_metadata
¶
get_flattened_at_with_metadata(idx: Union[IndexableType, BArrayType]) -> Tuple[BArrayType, Optional[Dict[str, Any]]]
set_flattened_at
¶
set_flattened_at(idx: Union[IndexableType, BArrayType], value: BArrayType) -> None
get_at_with_metadata
¶
get_at_with_metadata(idx: Union[IndexableType, BArrayType]) -> Tuple[BatchT, Optional[Dict[str, Any]]]
StepSampler
¶
StepSampler(data: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], batch_size: int, seed: Optional[int] = None, device: Optional[BDeviceType] = None)
Bases: BatchSampler[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType, BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
sampled_space
property
¶
sampled_space: Space[SamplerBatchT, SamplerDeviceType, SamplerDtypeType, SamplerRNGType]
sampled_metadata_space
property
¶
sampled_metadata_space: Optional[DictSpace[SamplerDeviceType, SamplerDtypeType, SamplerRNGType]]
set_flattened_at
¶
set_flattened_at(idx: Union[IndexableType, BArrayType], value: BArrayType) -> None
Overwrite existing samples using flattened data.
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
extend_flattened
¶
extend_flattened(value: BArrayType) -> None
Append a batched block of flattened samples.
set_at
¶
set_at(idx: Union[IndexableType, BArrayType], value: BatchT) -> None
Overwrite existing samples using structured data.
remove_at
¶
remove_at(idx: Union[IndexableType, BArrayType]) -> None
Remove one or more samples from the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
get_column
¶
get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a nested field inside each sample.
manual_seed
¶
manual_seed(seed: int) -> None
Reset sampler RNG state, including the optional data-index RNG.
sample_flat_with_metadata
¶
sample_flat_with_metadata() -> Tuple[SamplerArrayType, Optional[Dict[str, Any]]]
Sample a flattened batch together with optional metadata.
sample_with_metadata
¶
sample_with_metadata() -> Tuple[SamplerBatchT, Optional[Dict[str, Any]]]
Sample a structured batch together with optional metadata.
epoch_iter
¶
epoch_iter() -> Iterator[SamplerBatchT]
Iterate once over a random permutation of the source data.
epoch_iter_with_metadata
¶
epoch_iter_with_metadata() -> Iterator[Tuple[SamplerBatchT, Optional[Dict[str, Any]]]]
Iterate once over shuffled structured batches plus metadata.
epoch_flat_iter
¶
epoch_flat_iter() -> Iterator[SamplerArrayType]
Iterate once over shuffled batches in flattened form.
epoch_flat_iter_with_metadata
¶
epoch_flat_iter_with_metadata() -> Iterator[Tuple[SamplerArrayType, Optional[Dict[str, Any]]]]
Iterate once over shuffled flattened batches plus metadata.
MultiprocessingSampler
¶
MultiprocessingSampler(sampler: BatchSampler[SamplerBatchT, SamplerArrayType, SamplerDeviceType, SamplerDtypeType, SamplerRNGType, BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], n_workers: int = 4, n_buffers: int = 8, ctx: Optional[BaseContext] = None, doze_time: float = 0.005, daemon: Optional[bool] = None)
Bases: BatchSampler[SamplerBatchT, SamplerArrayType, SamplerDeviceType, SamplerDtypeType, SamplerRNGType, BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
set_flattened_at
¶
set_flattened_at(idx: Union[IndexableType, BArrayType], value: BArrayType) -> None
Overwrite existing samples using flattened data.
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
extend_flattened
¶
extend_flattened(value: BArrayType) -> None
Append a batched block of flattened samples.
set_at
¶
set_at(idx: Union[IndexableType, BArrayType], value: BatchT) -> None
Overwrite existing samples using structured data.
remove_at
¶
remove_at(idx: Union[IndexableType, BArrayType]) -> None
Remove one or more samples from the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
get_column
¶
get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a nested field inside each sample.
sample_flat_with_metadata
¶
sample_flat_with_metadata() -> Tuple[SamplerArrayType, Optional[Dict[str, Any]]]
Sample a flattened batch together with optional metadata.
sample_with_metadata
¶
sample_with_metadata() -> Tuple[SamplerBatchT, Optional[Dict[str, Any]]]
Sample a structured batch together with optional metadata.
epoch_iter
¶
epoch_iter() -> Iterator[SamplerBatchT]
Iterate once over a random permutation of the source data.
epoch_iter_with_metadata
¶
epoch_iter_with_metadata() -> Iterator[Tuple[SamplerBatchT, Optional[Dict[str, Any]]]]
Iterate once over shuffled structured batches plus metadata.
epoch_flat_iter
¶
epoch_flat_iter() -> Iterator[SamplerArrayType]
Iterate once over shuffled batches in flattened form.
epoch_flat_iter_with_metadata
¶
epoch_flat_iter_with_metadata() -> Iterator[Tuple[SamplerArrayType, Optional[Dict[str, Any]]]]
Iterate once over shuffled flattened batches plus metadata.
ReplayBuffer
¶
ReplayBuffer(storage: SpaceStorage[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], storage_path_relative: str, count: int = 0, offset: int = 0, cache_path: Optional[Union[str, PathLike]] = None, multiprocessing: bool = False, *, maintain_segment_metadata: bool = False, physical_segments: Optional[Sequence[Tuple[int, int]]] = None, active_segment_start_physical: Optional[int] = None, active_segment_length: int = 0)
Bases: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Ring-buffer style batch storage built on top of a SpaceStorage.
The buffer exposes the standard BatchBase interface while handling
round-robin overwrite semantics, persistence metadata, and optional
multiprocessing-safe counters.
Wrap a storage object with replay-buffer indexing semantics.
append_flattened
¶
append_flattened(value: BArrayType) -> None
Append one flattened sample to the batch.
remove_at
¶
remove_at(idx: Union[IndexableType, BArrayType]) -> None
Remove one or more samples from the batch.
extend_from
¶
extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None
Copy data from another batch in bounded-size chunks.
get_slice
¶
get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a lazy view over a subset of indices.
create
staticmethod
¶
create(storage_cls: Type[SpaceStorage[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]], single_instance_space: Space[BatchT, BDeviceType, BDtypeType, BRNGType], *args, cache_path: Optional[Union[str, PathLike]] = None, capacity: Optional[int] = None, multiprocessing: bool = False, maintain_segment_metadata: bool = False, **kwargs) -> ReplayBuffer[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Create a new replay buffer and its backing storage.
get_length_from_path
staticmethod
¶
get_length_from_path(path: Union[str, PathLike]) -> Optional[int]
get_capacity_from_path
staticmethod
¶
get_capacity_from_path(path: Union[str, PathLike]) -> Optional[int]
get_space_from_path
staticmethod
¶
get_space_from_path(path: Union[str, PathLike], *, backend: ComputeBackend[BArrayType, BDeviceType, BDtypeType, BRNGType], device: Optional[BDeviceType] = None) -> Optional[Space[BatchT, BDeviceType, BDtypeType, BRNGType]]
load_from
staticmethod
¶
load_from(path: Union[str, PathLike], *, backend: ComputeBackend[BArrayType, BDeviceType, BDtypeType, BRNGType], device: Optional[BDeviceType] = None, read_only: bool = True, multiprocessing: bool = False, single_instance_space: Optional[Space[BatchT, BDeviceType, BDtypeType, BRNGType]] = None, **storage_kwargs) -> ReplayBuffer[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]
Load a replay buffer plus its backing storage from disk.
If single_instance_space is provided, it is validated against the
space reconstructed from the on-disk metadata and, if compatible,
reused instead of keeping the reconstructed copy. This avoids
allocating duplicate Space objects when many buffers with
identical spaces are loaded simultaneously (e.g. for
CombinedBatch).
dumps
¶
dumps(path: Union[str, PathLike])
Persist the replay buffer metadata and its backing storage.
get_flattened_at_with_metadata
¶
get_flattened_at_with_metadata(idx: Union[IndexableType, BArrayType]) -> BArrayType
set_flattened_at
¶
set_flattened_at(idx: Union[IndexableType, BArrayType], value: BArrayType) -> None
ReplayBufferCollectionWrapper
¶
ReplayBufferCollectionWrapper(env: Env[BArrayType, ContextType, ObsType, ActType, RenderFrame, BDeviceType, BDtypeType, BRNGType], replay_buffers: Union[ReplayBuffer, Sequence[ReplayBuffer]], replay_mode: Literal['auto', 'shared', 'per_env'] = 'auto', transition_builder: Optional[Union[Callable[..., Any], DataTransformation]] = None, validate_transition: bool = False, dump_on_episode_end: bool = False, dump_on_reset_boundary: bool = False, dump_path: Optional[PathLike] = None)
Bases: Wrapper[BArrayType, ContextType, ObsType, ActType, RenderFrame, BDeviceType, BDtypeType, BRNGType, BArrayType, ContextType, ObsType, ActType, RenderFrame, BDeviceType, BDtypeType, BRNGType]
Automatically record every transition into one or more replay buffers.
Parameters¶
env : Env
The environment to wrap.
replay_buffers : ReplayBuffer | Sequence[ReplayBuffer]
One replay buffer (shared mode) or one per environment slot (per‑env
mode).
replay_mode : "auto", "shared" or "per_env"
* "shared" - a single flat replay buffer.
* "per_env" - one buffer per batched slot, with episode segments.
* "auto" - resolve based on batch_size and buffer count.
transition_builder : callable or DataTransformation, optional
Either a callable (cached_obs, cached_context, action, next_obs,
reward, terminated, truncated, env_index, rb_single_space) ->
transition, or a DataTransformation that maps a canonical
raw transition dict to the replay buffer's format. When None
the default DictSpace‑based builder is used.
validate_transition : bool
When True every built transition is validated against the target
replay buffer's space (single‑instance for append / batched for
shared extend). Off by default for speed.
dump_on_episode_end : bool
When True, automatically call dumps(path) on the replay
buffer(s) after an episode ends (i.e., when terminated or truncated
is true). Requires dump_path to be set.
dump_on_reset_boundary : bool
When True, automatically call dumps(path) on reset boundaries
(full reset or masked reset). Requires dump_path to be set.
dump_path : str | os.PathLike | None
Filesystem path where the replay buffer(s) should be dumped when
auto-dump is triggered. May be reassigned after construction to
redirect subsequent dumps. Required if dump_on_episode_end or
dump_on_reset_boundary is True. Repeated dumps overwrite
the same path.
shared
classmethod
¶
shared(env: Env, replay_buffer: ReplayBuffer, **kwargs: Any) -> 'ReplayBufferCollectionWrapper'
Create a wrapper in shared mode with a single replay buffer.
per_env
classmethod
¶
per_env(env: Env, replay_buffers: Sequence[ReplayBuffer], **kwargs: Any) -> 'ReplayBufferCollectionWrapper'
Create a wrapper in per‑env mode with one buffer per slot.
reset
¶
reset(*args: Any, mask: Optional[BArrayType] = None, seed: Optional[int] = None, **kwargs: Any) -> Tuple[ContextType, ObsType, Dict[str, Any]]
step
¶
step(action: ActType) -> Tuple[ObsType, Union[SupportsFloat, BArrayType], Union[bool, BArrayType], Union[bool, BArrayType], Dict[str, Any]]
reset_async
¶
reset_async(*args: Any, mask: Optional[BArrayType] = None, seed: Optional[int] = None, **kwargs: Any) -> None
step_wait
¶
step_wait() -> Tuple[ObsType, Union[SupportsFloat, BArrayType], Union[bool, BArrayType], Union[bool, BArrayType], Dict[str, Any]]
set_replay_buffers
¶
set_replay_buffers(replay_buffers: Union[ReplayBuffer, Sequence[ReplayBuffer]], *, replay_mode: Optional[Literal['auto', 'shared', 'per_env']] = None, require_same_space: bool = True, allow_mid_episode: bool = False, finalize_old_segments: bool = False, dump_old: bool = False) -> List[ReplayBuffer]
Swap the replay buffer(s) without rebuilding the wrapper.
Parameters¶
replay_buffers : ReplayBuffer | Sequence[ReplayBuffer]
The new replay buffer(s) to use.
replay_mode : "auto", "shared", "per_env", or None
If provided, change the replay mode. If None, keep the
current mode.
require_same_space : bool
If True, the new buffers must have the same single_space
as the old buffers. Default is True.
allow_mid_episode : bool
If True, allow swapping even if this splits an episode
across old/new buffers. Default is False.
finalize_old_segments : bool
If True, close any open segments in the old buffers before
swapping. Default is False.
dump_old : bool
If True, dump the old buffers before swapping. Requires
dump_path to be set. Only does this safely after
segment finalization where needed. Default is False.
Returns¶
List[ReplayBuffer] The old replay buffer(s) that were replaced.
Raises¶
ValueError
If the new buffers don't match the expected layout or space.
RuntimeError
If there are open segments and allow_mid_episode and
finalize_old_segments are both False.