Skip to content

unienv_data.samplers.multiprocessing_sampler

TaskInfoT module-attribute

TaskInfoT = TypeVar('TaskInfoT')

ResultT module-attribute

ResultT = TypeVar('ResultT')

MultiProcessingSampleManager

MultiProcessingSampleManager(target_fn: Callable[[TaskInfoT], ResultT], n_workers: int = 4, ctx: Optional[BaseContext] = None, done_event=None, doze_time: float = 0.001, daemon: Optional[bool] = None)

Bases: Generic[TaskInfoT, ResultT]

sampler_result_queue instance-attribute

sampler_result_queue: Queue = Queue()

sampler_work_queue instance-attribute

sampler_work_queue: Queue = Queue()

done_event instance-attribute

done_event = done_event or Event()

target_fn instance-attribute

target_fn = target_fn

doze_time instance-attribute

doze_time = doze_time

remaining_work instance-attribute

remaining_work = 0

workers instance-attribute

workers: List[Process] = []

closed instance-attribute

closed = False

n_workers property

n_workers

close

close()

fetch

fetch(block: bool = True, timeout: Optional[float] = None) -> Optional[ResultT]

add_work

add_work(work_info: TaskInfoT)

MultiprocessingSampler

MultiprocessingSampler(sampler: BatchSampler[SamplerBatchT, SamplerArrayType, SamplerDeviceType, SamplerDtypeType, SamplerRNGType, BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], n_workers: int = 4, n_buffers: int = 8, ctx: Optional[BaseContext] = None, doze_time: float = 0.005, daemon: Optional[bool] = None)

Bases: BatchSampler[SamplerBatchT, SamplerArrayType, SamplerDeviceType, SamplerDtypeType, SamplerRNGType, BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]

mp_ctx instance-attribute

mp_ctx = ctx

sampler instance-attribute

sampler = sampler

closed instance-attribute

closed = False

n_workers instance-attribute

n_workers = n_workers

n_buffers instance-attribute

n_buffers = n_buffers

functions_to_close instance-attribute

functions_to_close = []

sampled_space property

sampled_space

sampled_metadata_space property

sampled_metadata_space

backend property

backend

device property

device

data property

data

rng property writable

rng

data_rng property writable

data_rng

is_mutable class-attribute instance-attribute

is_mutable: bool = False

single_space instance-attribute

single_space = single_space

single_metadata_space instance-attribute

single_metadata_space = single_metadata_space

batch_size instance-attribute

batch_size = batch_size

manual_seed

manual_seed(seed)

get_at

get_at(idx)

get_at_with_metadata

get_at_with_metadata(idx)

get_flattened_at

get_flattened_at(idx)

get_flattened_at_with_metadata

get_flattened_at_with_metadata(idx)

close

close()

set_flattened_at

set_flattened_at(idx: Union[IndexableType, BArrayType], value: BArrayType) -> None

Overwrite existing samples using flattened data.

append_flattened

append_flattened(value: BArrayType) -> None

Append one flattened sample to the batch.

extend_flattened

extend_flattened(value: BArrayType) -> None

Append a batched block of flattened samples.

set_at

set_at(idx: Union[IndexableType, BArrayType], value: BatchT) -> None

Overwrite existing samples using structured data.

remove_at

remove_at(idx: Union[IndexableType, BArrayType]) -> None

Remove one or more samples from the batch.

append

append(value: BatchT) -> None

Append one structured sample to the batch.

extend

extend(value: BatchT) -> None

Append a batched block of structured samples.

extend_from

extend_from(other: BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], chunk_size: int = 8, tqdm: bool = False) -> None

Copy data from another batch in bounded-size chunks.

get_slice

get_slice(idx: Union[IndexableType, BArrayType]) -> BatchBase[BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType]

Create a lazy view over a subset of indices.

get_column

get_column(nested_keys: Sequence[str]) -> BatchBase[Any, BArrayType, BDeviceType, BDtypeType, BRNGType]

Create a lazy view over a nested field inside each sample.

sample_index

sample_index() -> SamplerArrayType

Draw random indices for one batch.

sample_flat

sample_flat() -> SamplerArrayType

Sample a batch and return it in flattened form.

sample_flat_with_metadata

sample_flat_with_metadata() -> Tuple[SamplerArrayType, Optional[Dict[str, Any]]]

Sample a flattened batch together with optional metadata.

sample

sample() -> SamplerBatchT

Sample a batch in its structured form.

sample_with_metadata

sample_with_metadata() -> Tuple[SamplerBatchT, Optional[Dict[str, Any]]]

Sample a structured batch together with optional metadata.

epoch_iter

epoch_iter() -> Iterator[SamplerBatchT]

Iterate once over a random permutation of the source data.

epoch_iter_with_metadata

epoch_iter_with_metadata() -> Iterator[Tuple[SamplerBatchT, Optional[Dict[str, Any]]]]

Iterate once over shuffled structured batches plus metadata.

epoch_flat_iter

epoch_flat_iter() -> Iterator[SamplerArrayType]

Iterate once over shuffled batches in flattened form.

epoch_flat_iter_with_metadata

epoch_flat_iter_with_metadata() -> Iterator[Tuple[SamplerArrayType, Optional[Dict[str, Any]]]]

Iterate once over shuffled flattened batches plus metadata.

worker_loop

worker_loop(fetch_fn: Callable[[BatchSampler[SamplerBatchT, SamplerArrayType, SamplerDeviceType, SamplerDtypeType, SamplerRNGType, BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], Any], Any], workid_queue: Queue, result_queue: Queue, done_event: Event, doze_time: float = 0.005)

close_fn_for_wrapped_fn

close_fn_for_wrapped_fn(wrapped_fn)

wrap_sample_function

wrap_sample_function(sampler: BatchSampler[SamplerBatchT, SamplerArrayType, SamplerDeviceType, SamplerDtypeType, SamplerRNGType, BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], n_workers: int, n_buffers: int, fn: Callable[[], Any], ctx: Optional[BaseContext] = None, doze_time: float = 0.001, daemon: Optional[bool] = None, done_event=None)

wrap_epoch_iter_function

wrap_epoch_iter_function(sampler: BatchSampler[SamplerBatchT, SamplerArrayType, SamplerDeviceType, SamplerDtypeType, SamplerRNGType, BatchT, BArrayType, BDeviceType, BDtypeType, BRNGType], n_workers: int, n_buffers: int, getter_fn: Callable[[Any], Any], ctx: Optional[BaseContext] = None, doze_time: float = 0.005, daemon: Optional[bool] = None, done_event=None)