diff --git a/py-scripts/DeviceConfig.py b/py-scripts/DeviceConfig.py index fa3507edc..21e20250e 100755 --- a/py-scripts/DeviceConfig.py +++ b/py-scripts/DeviceConfig.py @@ -99,6 +99,7 @@ class ADB_DEVICES(Realm): def __init__(self, lanforge_ip=None, port=8080, _debug_on=False, + app_flags='0' ): super().__init__(lfclient_host=lanforge_ip, debug_=_debug_on) @@ -110,8 +111,9 @@ def __init__(self, lanforge_ip=None, # adb get url self.adb_url = '/adb/' - + self.app_flags = app_flags # stop app + async def stop_app(self, port_list=None): port_list = [] if port_list is None else port_list if (port_list == []): @@ -276,6 +278,9 @@ async def configure_wifi(self, port_list=None): ) ) + if self.app_flags.lower() != 'na': + adb_cmd += ' --es app_flags {}'.format(self.app_flags) + data = { 'shelf': 1, 'resource': port_data["shelf"], @@ -915,7 +920,7 @@ class DeviceConfig(Realm): def __init__(self, lanforge_ip=None, port=8080, file_name=None, _debug_on=False, csv_name=None, create_csv=False, - wait_time=60 + wait_time=60, app_flags='0' ): super().__init__(lfclient_host=lanforge_ip, debug_=_debug_on) @@ -926,7 +931,7 @@ def __init__(self, lanforge_ip=None, self.csv_name = csv_name self.wait_time = wait_time # Objects for alptops and adb class - self.adb_obj = ADB_DEVICES(lanforge_ip=self.lanforge_ip) + self.adb_obj = ADB_DEVICES(lanforge_ip=self.lanforge_ip, app_flags=app_flags) self.laptop_obj = LAPTOPS(lanforge_ip=self.lanforge_ip) # available devices @@ -942,6 +947,7 @@ def __init__(self, lanforge_ip=None, # wifi profiles self.profile_data = [] + self.app_flags = app_flags def get_all_devices(self, adb=True, laptops=True): adb_devices = [] @@ -1713,6 +1719,60 @@ def get_device_data(port_key, resource_key, port_data, resource_data): config_dev_list.append(dev['eid']) return config_dev_list + def filter_device_list(self, dev_list, name_to_res, res_to_name): + final_dev_list = [] + + for dev in dev_list: + + if len(dev.split('.')) == 2: + duplicated_with = f"Serial {res_to_name.get(dev, None)}" + dev_str = f"{dev} / {res_to_name.get(dev, None)}" + else: + duplicated_with = f"Resource ID {name_to_res.get(dev, None)}" + dev_str = f'{name_to_res.get(dev, "None (resource ID not allocated)")} / {dev}' + + # Device not found + if dev not in name_to_res and dev not in res_to_name: + logger.info(f"{dev_str} -> Not found in LANforge") + continue + + # Valid and unique device + if ( + dev not in final_dev_list + and res_to_name.get(dev, None) not in final_dev_list + and name_to_res.get(dev, None) not in final_dev_list + ): + final_dev_list.append(dev) + logger.info(f"{dev_str} -> Found in LANforge and ready to configure") + + # Duplicate cases + else: + if dev in final_dev_list: + logger.info( + f"{dev_str} -> The device {dev} is duplicated with itself in the provided device list" + ) + else: + logger.info( + f"{dev_str} -> The device {dev} is duplicated with {duplicated_with} in the provided device list" + ) + + return final_dev_list + + def change_port_to_ip(self, upstream_port): + if upstream_port.count('.') != 3: + target_port_list = self.name_to_eid(upstream_port) + shelf, resource, port, _ = target_port_list + try: + target_port_ip = self.json_get(f'/port/{shelf}/{resource}/{port}?fields=ip')['interface']['ip'] + upstream_port = target_port_ip + except Exception: + logging.warning(f'The upstream port is not an ethernet port. Proceeding with the given upstream_port {upstream_port}.') + logging.info(f"Upstream port IP {upstream_port}") + else: + logging.info(f"Upstream port IP {upstream_port}") + + return upstream_port + if __name__ == "__main__": help_summary = '''\ @@ -1756,10 +1816,16 @@ def get_device_data(port_key, resource_key, port_data, resource_data): Command to Connect devices in a group to a profile python3 DeviceConfig.py --lanforge_ip 192.168.204.74 --connect_profile --file_name g219 --wait_time 20 + EXAMPLE5: To configure specific devices using device list and SSID config + python3 DeviceConfig.py --lanforge_ip 192.168.247.77 --config --ssid Starlink123 --passwd lanforge --security wpa2 --upstream_port eth2 --wait_time 120 --device_list 1.10,1.11 + + EXAMPLE6: To configure all devices connected to lanforge using device list and SSID config + python3 DeviceConfig.py --lanforge_ip 192.168.247.77 --config --ssid Starlink123 --passwd lanforge --security wpa2 --upstream_port eth2 --wait_time 120 --device_list all + """ ) - parser.add_argument('--lanforge_ip', type=str, default='localhost', help='') + parser.add_argument('--lanforge_ip', '--mgr', dest="lanforge_ip", type=str, default='localhost', help='') parser.add_argument("--create_file", action="store_true") parser.add_argument('--file_name', type=str, default='', help='') parser.add_argument("--create_group", help='flag to use script to create a group', action="store_true") @@ -1777,95 +1843,238 @@ def get_device_data(port_key, resource_key, port_data, resource_data): parser.add_argument('--csv_name', type=str, default='', help='') parser.add_argument('--help_summary', help='Show summary of what this script does', action='store_true') parser.add_argument('--wait_time', type=int, help='Enter the maximum wait time for configurations to apply', default=60) - + parser.add_argument('--config', help='flag to use script for configuration of devices based on group and profile config', action='store_true') + + parser.add_argument('--device_list', help="Enter the devices on which the test should be run", default=[]) + parser.add_argument('--mgr_port', '--port', default=8080, help='port LANforge GUI HTTP service is running on') + parser.add_argument('--upstream_port', '-u', default='eth1', help='non-station port that generates traffic: ., e.g: 1.eth1') + parser.add_argument('--ssid', help='WiFi SSID for script objects to associate to') + parser.add_argument('--passwd', '--password', '--key', default="[BLANK]", help='WiFi passphrase/password/key') + parser.add_argument('--security', help='WiFi Security protocol: < open | wep | wpa | wpa2 | wpa3 >', default="open") + parser.add_argument("--eap_method", type=str, default='DEFAULT', help="Specify the EAP method for authentication.") + parser.add_argument("--eap_identity", type=str, default='', help="Specify the EAP identity for authentication.") + parser.add_argument("--ieee8021x", action="store_true", help='Enables 802.1X enterprise authentication for test stations.') + parser.add_argument("--ieee80211u", action="store_true", help='Enables IEEE 802.11u (Hotspot 2.0) support.') + parser.add_argument("--ieee80211w", type=int, default=1, help='Enables IEEE 802.11w (Management Frame Protection) support.') + parser.add_argument("--enable_pkc", action="store_true", help='Enables pkc support.') + parser.add_argument("--bss_transition", action="store_true", help='Enables BSS transition support.') + parser.add_argument("--power_save", action="store_true", help='Enables power-saving features.') + parser.add_argument("--disable_ofdma", action="store_true", help='Disables OFDMA support.') + parser.add_argument("--roam_ft_ds", action="store_true", help='Enables fast BSS transition (FT) support') + parser.add_argument("--key_management", type=str, default='DEFAULT', help='Specify the key management method (e.g., WPA-PSK, WPA-EAP') + parser.add_argument("--pairwise", type=str, default='NA') + parser.add_argument("--private_key", type=str, default='NA', help='Specify EAP private key certificate file.') + parser.add_argument("--ca_cert", type=str, default='NA', help='Specifiy the CA certificate file name') + parser.add_argument("--client_cert", type=str, default='NA', help='Specify the client certificate file name') + parser.add_argument("--pk_passwd", type=str, default='NA', help='Specify the password for the private key') + parser.add_argument("--pac_file", type=str, default='NA', help='Specify the pac file name') + parser.add_argument( + "--app_flags", + default='0', + help="Specify WiFi connection method for the WeCan app: 'NA' omits app_flags from adb command, 0 = 'connect to WiFi within app'/'best effort', 1 = 'none'/'connect to WiFi with gestures'.") args = parser.parse_args() if args.help_summary: print(help_summary) exit(0) - obj = DeviceConfig(lanforge_ip=args.lanforge_ip, file_name=args.file_name, wait_time=args.wait_time) + obj = DeviceConfig(lanforge_ip=args.lanforge_ip, file_name=args.file_name, wait_time=args.wait_time, app_flags=args.app_flags) + if args.config: + if args.ssid is None: + logger.error('For configuration need to Specify SSID , Password(Optional for "open" type security) and Security') + exit(1) + else: + if args.passwd == '[BLANK]' and args.security.lower() != 'open' or args.passwd != '[BLANK]' and args.security.lower() == 'open': + logger.error('Please provide valid passwd and security configuration') + exit(1) + if args.upstream_port: + upstream_port_ip = obj.change_port_to_ip(args.upstream_port) + config_dict = { + 'ssid': args.ssid, + 'passwd': args.passwd, + 'enc': args.security, + 'eap_method': args.eap_method, + 'eap_identity': args.eap_identity, + 'ieee80211': args.ieee8021x, + 'ieee80211u': args.ieee80211u, + 'ieee80211w': args.ieee80211w, + 'enable_pkc': args.enable_pkc, + 'bss_transition': args.bss_transition, + 'power_save': args.power_save, + 'disable_ofdma': args.disable_ofdma, + 'roam_ft_ds': args.roam_ft_ds, + 'key_management': args.key_management, + 'pairwise': args.pairwise, + 'private_key': args.private_key, + 'ca_cert': args.ca_cert, + 'client_cert': args.client_cert, + 'pk_passwd': args.pk_passwd, + 'pac_file': args.pac_file, + 'server_ip': upstream_port_ip + } + all_devices = obj.get_all_devices() + device_list = [] + res_to_name = {} + name_to_res = {} + all_device_list = [] + for device in all_devices: + if device["type"] == 'laptop': + device_list.append(device["shelf"] + '.' + device["resource"] + " " + device["hostname"]) + all_device_list.append(device["shelf"] + '.' + device["resource"]) + else: + device_list.append(device["shelf"] + '.' + device["resource"] + " " + device["serial"]) + all_device_list.append(device["serial"]) + logger.info("Available devices: %s", device_list) + if not args.device_list: + dev_list = input("Enter the desired resources to configure in comma based / enter 'all' to select all devices:") + if dev_list.lower() == "all": + dev_list = all_device_list.copy() + else: + dev_list = dev_list.split(',') + else: + flag = False + for dev in args.device_list.split(','): + if dev.lower() == "all": + flag = True + dev_list = all_device_list.copy() + break + if not flag: + dev_list = args.device_list.split(',') + if len(dev_list) == 0: + print("No devices entered for configuration, Exiting") + exit(1) + + for device in all_devices: + if device["type"] == 'laptop': + name_to_res[device["hostname"]] = device["shelf"] + '.' + device["resource"] + res_to_name[device["shelf"] + '.' + device["resource"]] = device["hostname"] + else: + name_to_res[device["serial"]] = device["eid"] + res_to_name[device["eid"]] = device["serial"] + filtered_dev_list = obj.filter_device_list(dev_list, name_to_res, res_to_name) + if len(filtered_dev_list) == 0: + print("No valid devices found for configuration, Exiting") + exit(1) + print("Entered device list:", dev_list) + time.sleep(3) + print('Final filtered device list:', filtered_dev_list) + dev_list = asyncio.run(obj.connectivity(device_list=filtered_dev_list, wifi_config=config_dict)) + all_devices = obj.get_all_devices() + for device in all_devices: + if device["type"] == 'laptop': + name_to_res[device["hostname"]] = device["shelf"] + '.' + device["resource"] + res_to_name[device["shelf"] + '.' + device["resource"]] = device["hostname"] + else: + name_to_res[device["serial"]] = device["eid"] + res_to_name[device["eid"]] = device["serial"] + config_dev_list = dev_list.copy() + for dev in dev_list: + config_dev_list.append(res_to_name.get(dev, dev)) + prev_dev_list = filtered_dev_list.copy() + final_config_dev_list = [] + for dev in prev_dev_list: + if dev in config_dev_list: + final_config_dev_list.append(dev) + failed_logs = [] + success_logs = [] + for dev in prev_dev_list: + if len(dev.split('.')) == 2: + dev_str = f"{dev} / {res_to_name.get(dev, 'NA')}" + else: + dev_str = f"{name_to_res.get(dev, 'NA')} / {dev}" + + if dev in final_config_dev_list or res_to_name.get(dev, None) in final_config_dev_list or name_to_res.get(dev, None) in final_config_dev_list: + success_logs.append(f"{dev_str} -> Successfully Configured") + else: + failed_logs.append(f"{dev_str} -> Configuration Failed") + logger.info("Configuration Results:") + for log in success_logs: + logger.info(log) + for log in failed_logs: + logger.error(log) + logger.info(f"Devices set for configuration {prev_dev_list}") + logger.info(f"Devices Configured {final_config_dev_list}") + else: + if args.create_file: + if not os.path.exists(args.file_name + '.csv'): + if args.file_name == '': + logger.info("--file_name argument is parser") + else: + obj.initiate_group() + displayed_dataframe = obj.display_groups(obj.groups) + df = pd.DataFrame(obj.all_available_devices).T + logger.warning("\n%s", df.to_string()) - if args.create_file: - if not os.path.exists(args.file_name + '.csv'): + edit_inp = obj.take_input("add") + device_tracker = defaultdict(list) + + for group_name, devices in edit_inp.items(): + for device in devices: + device_tracker[device].append(group_name) + + duplicates = {dev: groups for dev, groups in device_tracker.items() if len(groups) > 1} + + if duplicates: + logger.warning("⚠️ Duplicate device(s) found in multiple groups:") + for device, groups in duplicates.items(): + logger.warning(f" - Device '{device}' found in groups: {', '.join(groups)}") + sys.exit("❌ Exiting script due to duplicate group assignments.") + else: + logger.info("✅ All device assignments are unique across groups.") + obj.update_groups_file(edit_inp, "add", extra_obj=displayed_dataframe) + else: + logger.warning(f"'{args.file_name}' The csv file name already exists.") + elif args.create_group: if args.file_name == '': - logger.info("--file_name argument is required") + logger.info("--file_name argument is parser") else: obj.initiate_group() displayed_dataframe = obj.display_groups(obj.groups) df = pd.DataFrame(obj.all_available_devices).T logger.warning("\n%s", df.to_string()) - edit_inp = obj.take_input("add") device_tracker = defaultdict(list) for group_name, devices in edit_inp.items(): for device in devices: device_tracker[device].append(group_name) - duplicates = {dev: groups for dev, groups in device_tracker.items() if len(groups) > 1} - if duplicates: logger.warning("⚠️ Duplicate device(s) found in multiple groups:") for device, groups in duplicates.items(): logger.warning(f" - Device '{device}' found in groups: {', '.join(groups)}") sys.exit("❌ Exiting script due to duplicate group assignments.") - else: - logger.info("✅ All device assignments are unique across groups.") + obj.update_groups_file(edit_inp, "add", extra_obj=displayed_dataframe) - else: - logger.warning(f"'{args.file_name}' The csv file name already exists.") - elif args.create_group: - if args.file_name == '': - logger.info("--file_name argument is required") - else: + elif args.remove_group: obj.initiate_group() displayed_dataframe = obj.display_groups(obj.groups) - df = pd.DataFrame(obj.all_available_devices).T - logger.warning("\n%s", df.to_string()) - edit_inp = obj.take_input("add") - device_tracker = defaultdict(list) - - for group_name, devices in edit_inp.items(): - for device in devices: - device_tracker[device].append(group_name) - duplicates = {dev: groups for dev, groups in device_tracker.items() if len(groups) > 1} - if duplicates: - logger.warning("⚠️ Duplicate device(s) found in multiple groups:") - for device, groups in duplicates.items(): - logger.warning(f" - Device '{device}' found in groups: {', '.join(groups)}") - sys.exit("❌ Exiting script due to duplicate group assignments.") - - obj.update_groups_file(edit_inp, "add", extra_obj=displayed_dataframe) - elif args.remove_group: - obj.initiate_group() - displayed_dataframe = obj.display_groups(obj.groups) - edit_inp = obj.take_input("remove") - obj.update_groups_file(edit_inp, "remove", extra_obj=displayed_dataframe) - elif args.update_group: - obj.initiate_group() - logger.info(pd.DataFrame(obj.all_available_devices).T) - displayed_dataframe = obj.display_groups(obj.groups) - edit_inp = obj.take_input("edit") - obj.update_groups_file(edit_inp, "edit", extra_obj=displayed_dataframe) - elif args.get_groups: - obj.initiate_group() - obj.display_groups(obj.groups) - elif args.create_profile: - obj.create_profile(data=args.profile_config, delete_profiles=args.delete_profiles.strip().split(",")) - elif args.connect_profile: - input_dict = {} - obj.initiate_group() - grp_data = obj.display_groups(obj.groups) - prof_data = obj.display_profiles() - grp_profile = input("Enter the group and profile ex(group_name:profile_name,group2:prof2) :") - group_names = grp_profile.split(',') - for i in group_names: - key, value = i.split(':') - input_dict[key] = value - asyncio.run(obj.connectivity(config=input_dict, flag=1)) - elif args.create_csv: - if args.csv_name == '': - obj.device_csv_file() - else: - obj.device_csv_file(args.csv_name) + edit_inp = obj.take_input("remove") + obj.update_groups_file(edit_inp, "remove", extra_obj=displayed_dataframe) + elif args.update_group: + obj.initiate_group() + logger.info(pd.DataFrame(obj.all_available_devices).T) + displayed_dataframe = obj.display_groups(obj.groups) + edit_inp = obj.take_input("edit") + obj.update_groups_file(edit_inp, "edit", extra_obj=displayed_dataframe) + elif args.get_groups: + obj.initiate_group() + obj.display_groups(obj.groups) + elif args.create_profile: + obj.create_profile(data=args.profile_config, delete_profiles=args.delete_profiles.strip().split(",")) + elif args.connect_profile: + input_dict = {} + obj.initiate_group() + grp_data = obj.display_groups(obj.groups) + prof_data = obj.display_profiles() + grp_profile = input("Enter the group and profile ex(group_name:profile_name,group2:prof2) :") + group_names = grp_profile.split(',') + for i in group_names: + key, value = i.split(':') + input_dict[key] = value + asyncio.run(obj.connectivity(config=input_dict, flag=1)) + elif args.create_csv: + if args.csv_name == '': + obj.device_csv_file() + else: + obj.device_csv_file(args.csv_name)