Skip to content

unienv_data.storages.parquet

ParquetBatchType module-attribute

ParquetBatchType = Union[Dict[str, Any], NumpyArrayType, str]

ParquetSpaceType module-attribute

ParquetSpaceType = Union[DictSpace, BoxSpace, TextSpace, BinarySpace]

ParquetStorage

ParquetStorage(single_instance_space: ParquetSpaceType, storage_dir: str, capacity: Optional[int], piece_size: int, num_pieces: int, total_len: int, column_specs: List[Tuple[str, dtype, Tuple[int, ...]]], column_shapes: Dict[str, Tuple[int, ...]], read_only: bool = False, compression: Union[str, Dict[str, str], None] = 'snappy')

Bases: SpaceStorage[ParquetBatchType, NumpyArrayType, NumpyDeviceType, NumpyDtypeType, NumpyRNGType]

single_file_ext class-attribute instance-attribute

single_file_ext: Optional[str] = None

DEFAULT_KEY class-attribute instance-attribute

DEFAULT_KEY: str = 'data'

capacity instance-attribute

capacity = capacity

is_mutable property

is_mutable: bool

is_multiprocessing_safe property

is_multiprocessing_safe: bool

cache_filename property

cache_filename: Optional[Union[str, PathLike]]

backend property

backend: ComputeBackend[BArrayType, BDeviceType, BDtypeType, BRNGType]

device property

device: Optional[BDeviceType]

single_instance_space instance-attribute

single_instance_space = single_instance_space

create classmethod

create(single_instance_space: ParquetSpaceType, *args, capacity: Optional[int] = None, cache_path: Optional[Union[str, PathLike]] = None, multiprocessing: bool = False, piece_size: int = 1024, compression: Union[str, Dict[str, str], None] = 'snappy', **kwargs) -> ParquetStorage

load_from classmethod

load_from(path: Union[str, PathLike], single_instance_space: ParquetSpaceType, *, capacity: Optional[int] = None, read_only: bool = True, multiprocessing: bool = False, **kwargs) -> ParquetStorage

build_space_from_parquet_file staticmethod

build_space_from_parquet_file(path: Union[str, PathLike]) -> Tuple[int, Optional[int], ParquetSpaceType]

Infer space, count, and capacity from a parquet storage directory. Returns (count, capacity, space).

load_replay_buffer_from_raw_parquet staticmethod

load_replay_buffer_from_raw_parquet(path: Union[str, PathLike]) -> ReplayBuffer[ParquetBatchType, NumpyArrayType, NumpyDeviceType, NumpyDtypeType, NumpyRNGType]

get

get(index) -> ParquetBatchType

set

set(index, value: ParquetBatchType) -> None

extend_length

extend_length(length: int) -> None

shrink_length

shrink_length(length: int) -> None

dumps

dumps(path: Union[str, PathLike]) -> None

close

close() -> None

get_column

get_column(nested_keys: Sequence[str]) -> ParquetStorage

clear

clear() -> None

Clear all data inside the storage and set the length to 0 if the storage has unlimited capacity. For storages with fixed capacity, this should reset the storage to its initial state.