-
Notifications
You must be signed in to change notification settings - Fork 20
mode
client的动态类继承结合client_manager组成了我们实现多样的运行模式的基硋。
client通过running_mode_for_client函数动态继承不同的模式类,目前支持thread、process、MPMT、distributed四种模式,来实现单个客户端内的运行模式。通过client_manager来实现不同客户端间的组织形式。
running_mode_for_client通过mode参数控制,它会通过globalvar获取config中的mode参数,根据mode参数选择不同的模式类。因为globalvar将信息存储在了sys.args中,所以保证了client在各个情况下均能获取到config中的mode参数。
所有的mode均遵循了类线程的设计模式,即有run、join、start等方法,方便client的启动和停止。
"mode": "thread"默认的运行模式,节省内存,适合对时间不敏感的任务。 由于python的GIL锁,多线程并不能真正的并行,所以该模式下的client并不能真正的并行。
"mode": "process"适合对时间敏感的任务,可以真正的并行。该模式下的每个client均为一个进程,进程之间互不影响,可以真正的并行。 对内存消耗较大,适合对时间敏感的任务。
MPMT模式是一种多进程多线程的模式。 该模式会启动多个进程,每个进程中运行多个client,客户端使用thread模式运行。 是一种介于process和thread之间的模式,适合对时间敏感的任务。
"mode": {
"path": "core.MPMT.MPMT"
},另外,MPMT模式通过在client_manager处更改process_num来自定义运行速度。
"client_manager": {
"path": "clientmanager.MPMTClientManager.MPMTClientManager",
"process_num": 2
}目前设备之间的通信方式只支持mqtt,在运行之前推荐在server端安装mqtt服务器,指定mqtt服务器ip,请查看Detailed Configuration。若没有安装,框架会自动使用公开的mqtt服务器。
该模式支持分布式系统,支持多台机器的联合训练。
目前每个设备支持thread和process两种模式。
即在mode中设置单机的运行模式。
"mode": "thread"or
"mode": "procsess"然后在client_manager中设置分布式的client_manager。
"client_manager": {
"path": "clientmanager.DistributedClientManager.DistributedClientManager",
"sub_path": "clientmanager.DistributedClientManager.SubNormalClientManager",
"client_num": [25, 25],
"event": {
"type": "mqtt"
},
"communication_proxy": {
"type": "mqtt",
"host": "127.0.0.1",
"port": 1883
}
}path为server端所用的服务器管理类,sub_path为client端所用的客户端管理类,client_num为每个client_manager所管理的client数量,event为设备之间event所使用的格式,目前只支持mqtt,communication_proxy为设备之间所用的通信代理类,用于传输数据,目前只支持mqtt。
服务器端和客户端端的配置文件使用同一文件,通过如下命令运行server端和client端。
# server
python main.py config.json --uid 123
# sub client manager
python main_for_distributed.py config.json --uid 123 --mid 0uid 为实验的唯一标识,用于各设备确定通信对象,mid 为client_manager的唯一标识,用于和配置中client_num对应,所以mid应该从0开始递增。
如需使用wandb,应关闭client处的wandb,只在server处开启wandb。
如果单机器上运行多个sub client manager,需要为每个sub client manager设置不同的message_queue,也就是设置不同的port,即在global中设置:
"message_queue": {
"port": 50001,
"type": "mqtt",
"mask_list": ["train_dataset", "test_dataset"]
}原有的message_queue为50000仅支持单机上多进程使用,因此分布式模式下需要对其进行包装。
type为指定的包装方式(通信方式),目前只支持mqtt。
mask_list为不需要传输的数据,如train_dataset和test_dataset,这些数据在main_for_distributed中自动加载,不需要再次传输。若客户端设备没有数据集且没有外部网络,可以将mask_list设置为空,以便服务器传输数据集。
如果有自己的mqtt服务器,在global中可以指定host和port。
"mqtt": {
"host": "127.0.0.1",
"port": 1883
}Getting Started - 整体流程 - Module Guide - 现有算法 - Contact Us