Skip to content
desperadoccy edited this page Jan 15, 2025 · 7 revisions

client的动态类继承结合client_manager组成了我们实现多样的运行模式的基硋。

client通过running_mode_for_client函数动态继承不同的模式类,目前支持threadprocessMPMTdistributed四种模式,来实现单个客户端内的运行模式。通过client_manager来实现不同客户端间的组织形式。

running_mode_for_client通过mode参数控制,它会通过globalvar获取config中的mode参数,根据mode参数选择不同的模式类。因为globalvar将信息存储在了sys.args中,所以保证了client在各个情况下均能获取到config中的mode参数。 所有的mode均遵循了类线程的设计模式,即有run、join、start等方法,方便client的启动和停止。

thread

"mode": "thread"

默认的运行模式,节省内存,适合对时间不敏感的任务。 由于python的GIL锁,多线程并不能真正的并行,所以该模式下的client并不能真正的并行。

process

"mode": "process"

适合对时间敏感的任务,可以真正的并行。该模式下的每个client均为一个进程,进程之间互不影响,可以真正的并行。 对内存消耗较大,适合对时间敏感的任务。

MPMT

MPMT模式是一种多进程多线程的模式。 该模式会启动多个进程,每个进程中运行多个client,客户端使用thread模式运行。 是一种介于process和thread之间的模式,适合对时间敏感的任务。

"mode": {
    "path": "core.MPMT.MPMT"
},

另外,MPMT模式通过在client_manager处更改process_num来自定义运行速度。

"client_manager": {
    "path": "clientmanager.MPMTClientManager.MPMTClientManager",
    "process_num": 2
}

distributed

目前设备之间的通信方式只支持mqtt,在运行之前推荐在server端安装mqtt服务器,指定mqtt服务器ip,请查看Detailed Configuration。若没有安装,框架会自动使用公开的mqtt服务器。

该模式支持分布式系统,支持多台机器的联合训练。 目前每个设备支持threadprocess两种模式。

即在mode中设置单机的运行模式。

"mode": "thread"

or

"mode": "procsess"

然后在client_manager中设置分布式的client_manager。

  "client_manager": {
    "path": "clientmanager.DistributedClientManager.DistributedClientManager",
    "sub_path": "clientmanager.DistributedClientManager.SubNormalClientManager",
    "client_num": [25, 25],
    "event": {
      "type": "mqtt"
    },
    "communication_proxy": {
      "type": "mqtt",
      "host": "127.0.0.1",
      "port": 1883
    }
  }

path为server端所用的服务器管理类,sub_path为client端所用的客户端管理类,client_num为每个client_manager所管理的client数量,event为设备之间event所使用的格式,目前只支持mqttcommunication_proxy为设备之间所用的通信代理类,用于传输数据,目前只支持mqtt

Getting Started

服务器端和客户端端的配置文件使用同一文件,通过如下命令运行server端和client端。

# server
python main.py config.json --uid 123
# sub client manager
python main_for_distributed.py config.json --uid 123 --mid 0

uid 为实验的唯一标识,用于各设备确定通信对象,mid 为client_manager的唯一标识,用于和配置中client_num对应,所以mid应该从0开始递增。

如需使用wandb,应关闭client处的wandb,只在server处开启wandb。

Detailed Configuration

如果单机器上运行多个sub client manager,需要为每个sub client manager设置不同的message_queue,也就是设置不同的port,即在global中设置:

"message_queue": {
    "port": 50001,
    "type": "mqtt",
    "mask_list": ["train_dataset", "test_dataset"]
}

原有的message_queue50000仅支持单机上多进程使用,因此分布式模式下需要对其进行包装。

type为指定的包装方式(通信方式),目前只支持mqttmask_list为不需要传输的数据,如train_datasettest_dataset,这些数据在main_for_distributed中自动加载,不需要再次传输。若客户端设备没有数据集且没有外部网络,可以将mask_list设置为空,以便服务器传输数据集。

如果有自己的mqtt服务器,在global中可以指定hostport

"mqtt": {
    "host": "127.0.0.1",
    "port": 1883
}

Clone this wiki locally