Shortcuts

worker.collector.comm

base_comm_collector

Please Reference ding/worker/collector/comm/base_comm_collector.py for usage

BaseCommCollector

class ding.worker.collector.comm.base_comm_collector.BaseCommCollector(cfg)[源代码]
Overview:

Abstract baseclass for common collector.

Interfaces:

__init__, get_policy_update_info, send_metadata, send_stepdata start, close, _create_collector

Property:

collector_uid

__init__(cfg)[源代码]
Overview:

Initialization method.

Arguments:
  • cfg (EasyDict): Config dict

_create_collector(task_info: dict) BaseParallelCollector[源代码]
Overview:

Receive task_info passed from coordinator and create a collector.

Arguments:
  • task_info (dict): Task info dict from coordinator. Should be like Returns:

  • collector (BaseParallelCollector): Created base collector.

Note:

Four methods(‘send_metadata’, ‘send_stepdata’, ‘get_policy_update_info’), and policy are set. The reason why they are set here rather than base collector is, they highly depend on the specific task. Only after task info is passed from coordinator to comm collector through learner slave, can they be clarified and initialized.

close() None[源代码]
Overview:

Close comm collector.

abstract get_policy_update_info(path: str) Any[源代码]
Overview:

Get policy information in corresponding path. Will be registered in base collector.

Arguments:
  • path (str): path to policy update information.

abstract send_metadata(metadata: Any) None[源代码]
Overview:

Store meta data in queue, which will be retrieved by callback function “deal_with_collector_data” in collector slave, then will be sent to coordinator. Will be registered in base collector.

Arguments:
  • metadata (Any): meta data.

abstract send_stepdata(stepdata: Any) None[源代码]
Overview:

Save step data in corresponding path. Will be registered in base collector.

Arguments:
  • stepdata (Any): step data.

start() None[源代码]
Overview:

Start comm collector.

create_comm_collector

Overview:

Given the key(comm_collector_name), create a new comm collector instance if in comm_map’s values, or raise an KeyError. In other words, a derived comm collector must first register, then can call create_comm_collector to get the instance.

Arguments:
  • cfg (EasyDict): Collector config. Necessary keys: [import_names, comm_collector_type].

Returns:
  • collector (BaseCommCollector): The created new comm collector, should be an instance of one of comm_map’s values.

flask_fs_collector

Please Reference ding/worker/collector/comm/flask_fs_collector.py for usage

CollectorSlave

class ding.worker.collector.comm.flask_fs_collector.CollectorSlave(*args, callback_fn: Dict[str, Callable], **kwargs)[源代码]
Overview:

A slave, whose master is coordinator. Used to pass message between comm collector and coordinator.

Interfaces:

__init__, _process_task

__init__(*args, callback_fn: Dict[str, Callable], **kwargs) None[源代码]
Overview:

Init callback functions additionally. Callback functions are methods in comm collector.

_process_task(task: dict) dict | TaskFail[源代码]
Overview:

Process a task according to input task info dict, which is passed in by master coordinator. For each type of task, you can refer to corresponding callback function in comm collector for details.

Arguments:
  • cfg (EasyDict): Task dict. Must contain key “name”.

Returns:
  • result (Union[dict, TaskFail]): Task result dict, or task fail exception.

FlaskFileSystemCollector

class ding.worker.collector.comm.flask_fs_collector.FlaskFileSystemCollector(cfg: dict)[源代码]
Overview:

An implementation of CommLearner, using flask and the file system.

Interfaces:

__init__, deal_with_resource, deal_with_collector_start, deal_with_collector_data, deal_with_collector_close, get_policy_update_info, send_stepdata, send_metadata, start, close

__init__(cfg: dict) None[源代码]
Overview:

Initialization method.

Arguments:
  • cfg (EasyDict): Config dict

close() None[源代码]
Overview:

Close comm collector itself and the collector slave.

deal_with_collector_data() dict[源代码]
Overview:

Callback function in CollectorSlave. Get data sample dict from _metadata_queue, which will be sent to coordinator afterwards.

Returns:
  • data (Any): Data sample dict.

deal_with_collector_start(task_info: dict) None[源代码]
Overview:

Callback function in CollectorSlave. Create a collector and start a collector thread of the created one.

Arguments:
  • task_info (dict): Task info dict.

Note:

In _create_collector method in base class BaseCommCollector, 4 methods ‘send_metadata’, ‘send_stepdata’, ‘get_policy_update_info’, and policy are set. You can refer to it for details.

deal_with_resource() dict[源代码]
Overview:

Callback function in CollectorSlave. Return how many resources are needed to start current collector.

Returns:
  • resource (dict): Resource info dict, including [‘gpu’, ‘cpu’].

get_policy_update_info(path: str) dict[源代码]
Overview:

Get policy information in corresponding path.

Arguments:
  • path (str): path to policy update information.

send_metadata(metadata: dict) None[源代码]
Overview:

Store learn info dict in queue, which will be retrieved by callback function “deal_with_collector_learn” in collector slave, then will be sent to coordinator.

Arguments:
  • metadata (Any): meta data.

send_stepdata(path: str, stepdata: list) None[源代码]
Overview:

Save collector’s step data in corresponding path.

Arguments:
  • path (str): Path to save data.

  • stepdata (Any): Data of one step.

start() None[源代码]
Overview:

Start comm collector itself and the collector slave.

utils

Please Reference ding/worker/collector/comm/utils.py for usage

NaiveCollector

class ding.worker.collector.comm.utils.NaiveCollector(*args, prefix='', **kwargs)[源代码]
Overview:

A slave, whose master is coordinator. Used to pass message between comm collector and coordinator.

Interfaces:

_process_task, _get_timestep

_process_task(task)[源代码]
Overview:

Process a task according to input task info dict, which is passed in by master coordinator. For each type of task, you can refer to corresponding callback function in comm collector for details.

Arguments:
  • cfg (EasyDict): Task dict. Must contain key “name”.

Returns:
  • result (Union[dict, TaskFail]): Task result dict, or task fail exception.