Replay Buffers And Storage¶
unienv_data brings the same typed-space approach to stored data.
BatchBase¶
BatchBase is the common interface for dataset-like collections of structured samples.
It gives you a uniform API for:
- indexed reads
- structured or flattened access
- metadata-aware retrieval
- column views and sliced views
- extension from other batches
Because a batch knows the Space of one sample, UniEnv can keep structured data and flattened tensor views in sync.
ReplayBuffer¶
ReplayBuffer is the main mutable buffer abstraction.
Key properties:
- ring-buffer overwrite semantics
- persistence metadata
- pluggable backing storage
- optional multiprocessing-safe counters and locking
- step-wise
append(value)with implicit segment starts mark_segment_end()to finalize the current segmentget_segments()for ReplayBuffer-owned segment metadata, with a legacy fallback for old metadata
Typical creation pattern:
import numpy as np
from unienv_interface.backends.numpy import NumpyComputeBackend
from unienv_interface.space.spaces import BoxSpace, DictSpace
from unienv_data.replay_buffer import ReplayBuffer
from unienv_data.storages.parquet import ParquetStorage
space = DictSpace(
NumpyComputeBackend,
{
"obs": BoxSpace(NumpyComputeBackend, 0.0, 1.0, np.float32, shape=(4,)),
"action": BoxSpace(NumpyComputeBackend, -1.0, 1.0, np.float32, shape=(2,)),
},
)
buffer = ReplayBuffer.create(
ParquetStorage,
space,
cache_path="cache/replay",
capacity=100_000,
)
Load a persisted buffer with:
restored = ReplayBuffer.load_from(
"cache/replay",
backend=NumpyComputeBackend,
)
Use append(value) for one unbatched step/sample at a time. The first append after creation, clear(), or the previous mark_segment_end() opens a segment implicitly.
import torch
from unienv_interface.backends.pytorch import PyTorchComputeBackend
from unienv_interface.space.spaces import BoxSpace
from unienv_data.replay_buffer import ReplayBuffer
from unienv_data.storages.pytorch import PytorchTensorStorage
space = BoxSpace(PyTorchComputeBackend, -10, 10, torch.int64, shape=(2,))
buffer = ReplayBuffer.create(
PytorchTensorStorage,
space,
capacity=4,
is_memmap=False,
)
buffer.append(torch.tensor([1, 2], dtype=torch.int64))
buffer.append(torch.tensor([3, 4], dtype=torch.int64))
buffer.mark_segment_end()
assert buffer.get_segments() == [(0, 2)]
For segment-aware storages, ReplayBuffer.get_segments() returns logical chronological half-open ranges derived from ReplayBuffer-owned physical metadata. segments_known is the flag that says whether ReplayBuffer is tracking segment boundaries itself; physical_segments stores those boundaries as inclusive physical index ranges. With fixed capacity, round-robin overwrite is supported and the ranges stay in logical buffer order.
Visibility note: the base storage append path writes through the active index and may make values visible immediately, but some implementations (notably
VideoStorage) buffer data untilmark_segment_end().
Storage Backends¶
UniEnv includes multiple storage implementations, each useful for a different tradeoff.
ParquetStorage: columnar storage for NumPy-friendly structured dataHDF5Storage: HDF5-backed persistenceFlattenedStorage: storage built around flattened samples, often paired with another inner storagePytorchTensorStorage: tensor or memmap-backed PyTorch storageNPZStorage:.npz-based storageImageStorageandVideoStorage: media-oriented storageDictStorage: structured composition over multiple sub-storages
The storage layer is intentionally separate from the replay-buffer semantics, so you can swap persistence formats without changing higher-level code.
VideoStorage streams frames into a temporary file for the active segment and only registers the final file on mark_segment_end(). Do not read from the active segment before finalizing it.
from unienv_data.replay_buffer import ReplayBuffer
from unienv_data.storages.video_storage import VideoStorage
rb = ReplayBuffer.create(
VideoStorage,
space, # e.g. BoxSpace(..., shape=(H, W, 3))
cache_path="cache/video",
capacity=None,
codec="libx264rgb", # RGB video
file_pixel_format="rgb24",
buffer_pixel_format="rgb24",
file_ext="mkv",
)
for frame in frames:
rb.append(frame)
rb.mark_segment_end()
print(rb.storage.get_segments()) # backend-specific physical file ranges
print(rb.get_segments()) # logical half-open ranges, e.g. [(0, 3)]
For depth or other single-channel videos, prefer ffv1 with a 2D (H, W) space and a grayscale pixel format such as gray16le. For RGB video, use an RGB-capable codec such as libx264rgb (or h264/libx264 with a matching YUV pixel format like yuv420p). Set pixel formats explicitly; there is no automatic RGB fallback.
Trajectories And Samplers¶
Beyond single-transition replay:
StepSamplersamples fixed-size batches from stored dataMultiprocessingSamplersupports higher-throughput sampling workflows
Use these when the consumer of data is a learner or evaluation process rather than interactive environment code. Episode-level grouping now comes from ReplayBuffer segment APIs.
Practical Advice¶
Use ReplayBuffer when you need mutation and online accumulation.
Use read-only BatchBase wrappers and integrations when the data already exists elsewhere.
Pick the storage backend based on the data you need to persist:
- mostly numeric tabular data:
ParquetStorage - existing HDF5 workflows:
HDF5Storage - PyTorch-heavy pipelines or memmap tensors:
PytorchTensorStorage - image or video artifacts:
ImageStorageandVideoStorage