Shortcuts

worker.replay_buffer

replay buffer

IBuffer

class ding.worker.replay_buffer.base_buffer.IBuffer[源代码]
Overview:

Buffer interface

Interfaces:

default_config, push, update, sample, clear, count, state_dict, load_state_dict

abstract clear() None[源代码]
Overview:

Clear all the data and reset the related variables.

abstract count() int[源代码]
Overview:

Count how many valid datas there are in the buffer.

Returns:
  • count (int): Number of valid data.

classmethod default_config() EasyDict[源代码]
Overview:

Default config of this buffer class.

Returns:
  • default_config (EasyDict)

abstract load_state_dict(_state_dict: Dict[str, Any]) None[源代码]
Overview:

Load state dict to reproduce the buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer.

abstract push(data: List[Any] | Any, cur_collector_envstep: int) None[源代码]
Overview:

Push a data into buffer.

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (in Any type), or many(int List[Any] type).

  • cur_collector_envstep (int): Collector’s current env step.

abstract sample(batch_size: int, cur_learner_iter: int) list[源代码]
Overview:

Sample data with length batch_size.

Arguments:
  • size (int): The number of the data that will be sampled.

  • cur_learner_iter (int): Learner’s current iteration.

Returns:
  • sampled_data (list): A list of data with length batch_size.

abstract state_dict() Dict[str, Any][源代码]
Overview:

Provide a state dict to keep a record of current buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer. With the dict, one can easily reproduce the buffer.

abstract update(info: Dict[str, list]) None[源代码]
Overview:

Update data info, e.g. priority.

Arguments:
  • info (Dict[str, list]): Info dict. Keys depends on the specific buffer type.

NaiveReplayBuffer

class ding.worker.replay_buffer.naive_buffer.NaiveReplayBuffer(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[源代码]
Overview:

Naive replay buffer, can store and sample data. An naive implementation of replay buffer with no priority or any other advanced features. This buffer refers to multi-thread/multi-process and guarantees thread-safe, which means that methods like sample, push, clear are all mutual to each other.

Interface:

start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config

Property:

replay_buffer_size, push_count

clear() None[源代码]
Overview:

Clear all the data and reset the related variables.

close() None[源代码]
Overview:

Clear the buffer; Join the buffer’s used_data_remover thread if enables track_used_data.

count() int[源代码]
Overview:

Count how many valid datas there are in the buffer.

Returns:
  • count (int): Number of valid data.

classmethod default_config() EasyDict
Overview:

Default config of this buffer class.

Returns:
  • default_config (EasyDict)

load_state_dict(_state_dict: dict) None[源代码]
Overview:

Load state dict to reproduce the buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer.

push(data: List[Any] | Any, cur_collector_envstep: int) None[源代码]
Overview:

Push a data into buffer.

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (in Any type), or many(int List[Any] type).

  • cur_collector_envstep (int): Collector’s current env step.

    Not used in naive buffer, but preserved for compatibility.

sample(size: int, cur_learner_iter: int, sample_range: slice | None = None, replace: bool = False) list | None[源代码]
Overview:

Sample data with length size.

Arguments:
  • size (int): The number of the data that will be sampled.

  • cur_learner_iter (int): Learner’s current iteration. Not used in naive buffer, but preserved for compatibility.

  • sample_range (slice): Buffer slice for sampling, such as slice(-10, None), which means only sample among the last 10 data

  • replace (bool): Whether sample with replacement

Returns:
  • sample_data (list): A list of data with length size.

start() None[源代码]
Overview:

Start the buffer’s used_data_remover thread if enables track_used_data.

state_dict() dict[源代码]
Overview:

Provide a state dict to keep a record of current buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer. With the dict, one can easily reproduce the buffer.

update(info: dict) None[源代码]
Overview:

Naive Buffer does not need to update any info, but this method is preserved for compatibility.

AdvancedReplayBuffer

class ding.worker.replay_buffer.advanced_buffer.AdvancedReplayBuffer(cfg: dict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[源代码]
Overview:

Prioritized replay buffer derived from NaiveReplayBuffer. This replay buffer adds:

  1. Prioritized experience replay implemented by segment tree.

  2. Data quality monitor. Monitor use count and staleness of each data.

  3. Throughput monitor and control.

  4. Logger. Log 2) and 3) in tensorboard or text.

Interface:

start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config

Property:

beta, replay_buffer_size, push_count

clear() None[源代码]
Overview:

Clear all the data and reset the related variables.

close() None[源代码]
Overview:

Clear the buffer; Join the buffer’s used_data_remover thread if enables track_used_data. Join periodic throughtput monitor, flush tensorboard logger.

count() int[源代码]
Overview:

Count how many valid datas there are in the buffer.

Returns:
  • count (int): Number of valid data.

classmethod default_config() EasyDict
Overview:

Default config of this buffer class.

Returns:
  • default_config (EasyDict)

load_state_dict(_state_dict: dict, deepcopy: bool = False) None[源代码]
Overview:

Load state dict to reproduce the buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer.

push(data: List[Any] | Any, cur_collector_envstep: int) None[源代码]
Overview:

Push a data into buffer.

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (in Any type), or many(int List[Any] type).

  • cur_collector_envstep (int): Collector’s current env step.

sample(size: int, cur_learner_iter: int, sample_range: slice | None = None) list | None[源代码]
Overview:

Sample data with length size.

Arguments:
  • size (int): The number of the data that will be sampled.

  • cur_learner_iter (int): Learner’s current iteration, used to calculate staleness.

  • sample_range (slice): Buffer slice for sampling, such as slice(-10, None), which means only sample among the last 10 data

Returns:
  • sample_data (list): A list of data with length size

ReturnsKeys:
  • necessary: original keys(e.g. obs, action, next_obs, reward, info), replay_unique_id, replay_buffer_idx

  • optional(if use priority): IS, priority

start() None[源代码]
Overview:

Start the buffer’s used_data_remover thread if enables track_used_data.

state_dict() dict[源代码]
Overview:

Provide a state dict to keep a record of current buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer. With the dict, one can easily reproduce the buffer.

update(info: dict) None[源代码]
Overview:

Update a data’s priority. Use repaly_buffer_idx to locate, and use replay_unique_id to verify.

Arguments:
  • info (dict): Info dict containing all necessary keys for priority update.

ArgumentsKeys:
  • necessary: replay_unique_id, replay_buffer_idx, priority. All values are lists with the same length.

EpisodeReplayBuffer

class ding.worker.replay_buffer.episode_buffer.EpisodeReplayBuffer(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[源代码]
Overview:

Episode replay buffer is a buffer to store complete episodes, i.e. Each element in episode buffer is an episode. Some algorithms do not want to sample batch_size complete episodes, however, they want some transitions with some fixed length. As a result, sample should be overwritten for those requirements.

Interface:

start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config

__init__(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer') None
Overview:

Initialize the buffer

Arguments:
  • cfg (dict): Config dict.

  • tb_logger (Optional['SummaryWriter']): Outer tb logger. Usually get this argument in serial mode.

  • exp_name (Optional[str]): Name of this experiment.

  • instance_name (Optional[str]): Name of this instance.

clear() None
Overview:

Clear all the data and reset the related variables.

close() None
Overview:

Clear the buffer; Join the buffer’s used_data_remover thread if enables track_used_data.

count() int
Overview:

Count how many valid datas there are in the buffer.

Returns:
  • count (int): Number of valid data.

classmethod default_config() EasyDict
Overview:

Default config of this buffer class.

Returns:
  • default_config (EasyDict)

load_state_dict(_state_dict: dict) None
Overview:

Load state dict to reproduce the buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer.

push(data: List[Any] | Any, cur_collector_envstep: int) None
Overview:

Push a data into buffer.

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (in Any type), or many(int List[Any] type).

  • cur_collector_envstep (int): Collector’s current env step.

    Not used in naive buffer, but preserved for compatibility.

sample(size: int, cur_learner_iter: int, sample_range: slice | None = None, replace: bool = False) list | None
Overview:

Sample data with length size.

Arguments:
  • size (int): The number of the data that will be sampled.

  • cur_learner_iter (int): Learner’s current iteration. Not used in naive buffer, but preserved for compatibility.

  • sample_range (slice): Buffer slice for sampling, such as slice(-10, None), which means only sample among the last 10 data

  • replace (bool): Whether sample with replacement

Returns:
  • sample_data (list): A list of data with length size.

start() None
Overview:

Start the buffer’s used_data_remover thread if enables track_used_data.

state_dict() dict
Overview:

Provide a state dict to keep a record of current buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer. With the dict, one can easily reproduce the buffer.

update(info: dict) None
Overview:

Naive Buffer does not need to update any info, but this method is preserved for compatibility.

create_buffer

Overview:

Create a buffer according to cfg and other arguments.

Arguments:
  • cfg (EasyDict): Buffer config.

ArgumentsKeys:
  • necessary: type

get_buffer_cls

Overview:

Get a buffer class according to cfg.

Arguments:
  • cfg (EasyDict): Buffer config.

ArgumentsKeys:
  • necessary: type

utils

UsedDataRemover

class ding.worker.replay_buffer.utils.UsedDataRemover[源代码]
Overview:

UsedDataRemover is a tool to remove file datas that will no longer be used anymore.

Interface:

start, close, add_used_data

add_used_data(data: Any) None[源代码]
Overview:

Delete all datas in self._used_data. Then join the delete_used_data thread.

Arguments:
  • data (Any): Add a used data item into self._used_data for further remove.

close() None[源代码]
Overview:

Delete all datas in self._used_data. Then join the delete_used_data thread.

start() None[源代码]
Overview:

Start the delete_used_data thread.

SampledDataAttrMonitor

class ding.worker.replay_buffer.utils.SampledDataAttrMonitor(time_: BaseTime, expire: int | float)[源代码]
Overview:

SampledDataAttrMonitor is to monitor read-out indicators for expire times recent read-outs. Indicators include: read out time; average and max of read out data items’ use; average, max and min of read out data items’ priorityl; average and max of staleness.

Interface:

__init__, fixed_time, current_time, freeze, unfreeze, register_attribute_value, __getattr__

Property:

time, expire

PeriodicThruputMonitor

class ding.worker.replay_buffer.utils.PeriodicThruputMonitor(name, cfg, logger, tb_logger)[源代码]
Overview:

PeriodicThruputMonitor is a tool to record and print logs(text & tensorboard) how many datas are pushed/sampled/removed/valid in a period of time. For tensorboard, you can view it in ‘buffer_{$NAME}_sec’.

Interface:

close

Property:

push_data_count, sample_data_count, remove_data_count, valid_count

备注

thruput_log thread is initialized and started in __init__ method, so PeriodicThruputMonitor only provide one signle interface close