worker.collector.comm¶
base_comm_collector¶
Please Reference ding/worker/collector/comm/base_comm_collector.py for usage
BaseCommCollector¶
- class ding.worker.collector.comm.base_comm_collector.BaseCommCollector(cfg)[源代码]¶
- Overview:
Abstract baseclass for common collector.
- Interfaces:
__init__, get_policy_update_info, send_metadata, send_stepdata start, close, _create_collector
- Property:
collector_uid
- _create_collector(task_info: dict) BaseParallelCollector[源代码]¶
- Overview:
Receive
task_infopassed from coordinator and create a collector.- Arguments:
task_info (
dict): Task info dict from coordinator. Should be like Returns:collector (
BaseParallelCollector): Created base collector.
- Note:
Four methods(‘send_metadata’, ‘send_stepdata’, ‘get_policy_update_info’), and policy are set. The reason why they are set here rather than base collector is, they highly depend on the specific task. Only after task info is passed from coordinator to comm collector through learner slave, can they be clarified and initialized.
- abstract get_policy_update_info(path: str) Any[源代码]¶
- Overview:
Get policy information in corresponding path. Will be registered in base collector.
- Arguments:
path (
str): path to policy update information.
- abstract send_metadata(metadata: Any) None[源代码]¶
- Overview:
Store meta data in queue, which will be retrieved by callback function “deal_with_collector_data” in collector slave, then will be sent to coordinator. Will be registered in base collector.
- Arguments:
metadata (
Any): meta data.
create_comm_collector¶
- Overview:
Given the key(comm_collector_name), create a new comm collector instance if in comm_map’s values, or raise an KeyError. In other words, a derived comm collector must first register, then can call
create_comm_collectorto get the instance.- Arguments:
cfg (
EasyDict): Collector config. Necessary keys: [import_names, comm_collector_type].
- Returns:
collector (
BaseCommCollector): The created new comm collector, should be an instance of one of comm_map’s values.
flask_fs_collector¶
Please Reference ding/worker/collector/comm/flask_fs_collector.py for usage
CollectorSlave¶
- class ding.worker.collector.comm.flask_fs_collector.CollectorSlave(*args, callback_fn: Dict[str, Callable], **kwargs)[源代码]¶
- Overview:
A slave, whose master is coordinator. Used to pass message between comm collector and coordinator.
- Interfaces:
__init__, _process_task
- __init__(*args, callback_fn: Dict[str, Callable], **kwargs) None[源代码]¶
- Overview:
Init callback functions additionally. Callback functions are methods in comm collector.
- _process_task(task: dict) dict | TaskFail[源代码]¶
- Overview:
Process a task according to input task info dict, which is passed in by master coordinator. For each type of task, you can refer to corresponding callback function in comm collector for details.
- Arguments:
cfg (
EasyDict): Task dict. Must contain key “name”.
- Returns:
result (
Union[dict, TaskFail]): Task result dict, or task fail exception.
FlaskFileSystemCollector¶
- class ding.worker.collector.comm.flask_fs_collector.FlaskFileSystemCollector(cfg: dict)[源代码]¶
- Overview:
An implementation of CommLearner, using flask and the file system.
- Interfaces:
__init__, deal_with_resource, deal_with_collector_start, deal_with_collector_data, deal_with_collector_close, get_policy_update_info, send_stepdata, send_metadata, start, close
- __init__(cfg: dict) None[源代码]¶
- Overview:
Initialization method.
- Arguments:
cfg (
EasyDict): Config dict
- deal_with_collector_data() dict[源代码]¶
- Overview:
Callback function in
CollectorSlave. Get data sample dict from_metadata_queue, which will be sent to coordinator afterwards.- Returns:
data (
Any): Data sample dict.
- deal_with_collector_start(task_info: dict) None[源代码]¶
- Overview:
Callback function in
CollectorSlave. Create a collector and start a collector thread of the created one.- Arguments:
task_info (
dict): Task info dict.
- Note:
In
_create_collectormethod in base classBaseCommCollector, 4 methods ‘send_metadata’, ‘send_stepdata’, ‘get_policy_update_info’, and policy are set. You can refer to it for details.
- deal_with_resource() dict[源代码]¶
- Overview:
Callback function in
CollectorSlave. Return how many resources are needed to start current collector.- Returns:
resource (
dict): Resource info dict, including [‘gpu’, ‘cpu’].
- get_policy_update_info(path: str) dict[源代码]¶
- Overview:
Get policy information in corresponding path.
- Arguments:
path (
str): path to policy update information.
- send_metadata(metadata: dict) None[源代码]¶
- Overview:
Store learn info dict in queue, which will be retrieved by callback function “deal_with_collector_learn” in collector slave, then will be sent to coordinator.
- Arguments:
metadata (
Any): meta data.
utils¶
Please Reference ding/worker/collector/comm/utils.py for usage
NaiveCollector¶
- class ding.worker.collector.comm.utils.NaiveCollector(*args, prefix='', **kwargs)[源代码]¶
- Overview:
A slave, whose master is coordinator. Used to pass message between comm collector and coordinator.
- Interfaces:
_process_task, _get_timestep
- _process_task(task)[源代码]¶
- Overview:
Process a task according to input task info dict, which is passed in by master coordinator. For each type of task, you can refer to corresponding callback function in comm collector for details.
- Arguments:
cfg (
EasyDict): Task dict. Must contain key “name”.
- Returns:
result (
Union[dict, TaskFail]): Task result dict, or task fail exception.