From 1a1589c70f0ac75baaf8c3ab80d77483cea92c47 Mon Sep 17 00:00:00 2001 From: Sidartha-CT Date: Mon, 25 May 2026 05:14:48 +0000 Subject: [PATCH 1/3] lf_multi_traffic.py : Update build version in comments to latest. Signed-off-by: Sidartha-CT --- py-scripts/lf_multi_traffic.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/py-scripts/lf_multi_traffic.py b/py-scripts/lf_multi_traffic.py index e187e934e..a62801e60 100644 --- a/py-scripts/lf_multi_traffic.py +++ b/py-scripts/lf_multi_traffic.py @@ -277,9 +277,9 @@ SCRIPT_CATEGORIES: Performance, Functional, Automation VERIFIED_ON: - Working date - 09/04/2026 - Build version - 5.5.2 - kernel version - 6.15.6+ + Working date - 21/04/2026 + Build version - 5.5.3 + kernel version - 6.18.14+ LICENSE : Copyright (C) 2020-2026 Candela Technologies Inc @@ -12851,9 +12851,9 @@ def parse_args(): SCRIPT_CATEGORIES: Performance, Functional, Automation VERIFIED_ON: - Working date - 09/04/2026 - Build version - 5.5.2 - kernel version - 6.15.6+ + Working date - 21/04/2026 + Build version - 5.5.3 + kernel version - 6.18.14+ LICENSE : Copyright (C) 2020-2026 Candela Technologies Inc From 90f53becaebef28683c3659d5664c04e4ea83f02 Mon Sep 17 00:00:00 2001 From: Sidartha-CT Date: Mon, 25 May 2026 05:33:51 +0000 Subject: [PATCH 2/3] lf_multi_traffic.py : Split main() into seperate helpers. Verified CLI: python3 lf_multi_traffic.py --mgr 192.168.207.78 --upstream_port eth1 --parallel_tests ping_test,qos_test,ftp_test,http_test,mcast_test,vs_test,thput_test --ping_target www.google.com --ping_interval 5 --ping_duration 1 --ping_device_list 1.4,1.11,1.12 --qos_tos VO,VI,BE,BK --qos_duration 1m --qos_device_list 1.4,1.11,1.12 --qos_traffic_type lf_tcp --qos_download 10000000 --ftp_duration 1m --ftp_file_size 5MB --ftp_device_list 1.4,1.11,1.12 --ftp_bands 5G --http_duration 1m --http_file_size 5MB --http_device_list 1.4,1.11,1.12 --http_bands 5G --mcast_tos VO --mcast_test_duration 1m --mcast_side_b_min_bps 10000000 --mcast_device_list 1.4,1.11,1.12 --mcast_endp_type mc_udp --vs_url https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8 --vs_media_source hls --vs_media_quality 4k --vs_duration 1m --vs_device_list 1.4,1.11,1.12 --thput_test_duration 1m --thput_traffic_type lf_udp --thput_device_list 1.4,1.11,1.12 --thput_upload 10000000 Signed-off-by: Sidartha-CT --- py-scripts/lf_multi_traffic.py | 544 ++++++++++++++++++--------------- 1 file changed, 305 insertions(+), 239 deletions(-) diff --git a/py-scripts/lf_multi_traffic.py b/py-scripts/lf_multi_traffic.py index a62801e60..af7787b6b 100644 --- a/py-scripts/lf_multi_traffic.py +++ b/py-scripts/lf_multi_traffic.py @@ -393,7 +393,7 @@ class MultiTraffic(Realm): """ def __init__(self, ip='localhost', port=8080, order_priority="series", result_dir="", dowebgui=False, test_name='', no_cleanup=False, - robot_test=False, robot_ip=None, coordinate=None, rotation=None, do_bandsteering=False, bssids=None, cycles=1, duration_to_skip=None): + robot_test=False, robot_ip=None, coordinate=None, rotation=None, do_bandsteering=False, bssids=None, cycles=1, duration_to_skip=None, test_map=None, args=None, args_dict=None): super().__init__(lfclient_host=ip, lfclient_port=port) self.lanforge_ip = ip @@ -460,6 +460,9 @@ def __init__(self, ip='localhost', port=8080, order_priority="series", result_di self.rotation_list = rotation.split(',') self.bssids = bssids.split(",") if bssids else [] self.rotation_enabled = True if rotation != '' else False + self.test_map = test_map + self.args = args + self.args_dict = args_dict if args else {} # Multi-threading/multiprocessing synchronization events for individual test completions self.ftp_done_event = Event() self.qos_done_event = Event() @@ -507,6 +510,217 @@ def __init__(self, ip='localhost', port=8080, order_priority="series", result_di self.robot_rotate_call_counter = Value('i', 0) self.robot_call_counter = Value('i', 0) + def start_tests(self, test_map, tests_to_run_series, tests_to_run_parallel, duration_dict, args, args_dict): + """ + Start and manage execution of configured traffic test suites. + + Args: + test_map (dict): + Mapping of test names to corresponding execution functions and labels. + + tests_to_run_series (list): + List of test names scheduled for sequential execution. + + tests_to_run_parallel (list): + List of test names scheduled for parallel execution. + + duration_dict (dict): + Dictionary containing duration values for each test. + + args (argparse.Namespace): + Parsed command-line arguments. + + args_dict (dict): + Dictionary version of parsed command-line arguments. + + Returns: + None + """ + iszoom = 'zoom_test' in tests_to_run_parallel or 'zoom_test' in tests_to_run_series + isrb = 'rb_test' in tests_to_run_parallel or 'rb_test' in tests_to_run_series + isyt = 'yt_test' in tests_to_run_parallel or 'yt_test' in tests_to_run_series + self.series_tests = tests_to_run_series + self.parallel_tests = tests_to_run_parallel + self.misc_clean_up(layer3=True, layer4=True, generic=True, port_5000=iszoom, port_5002=isyt, port_5003=isrb) + if args.series_tests or args.parallel_tests: + series_threads = [] + parallel_threads = [] + # Process series tests + if args.series_tests: + ordered_series_tests = args.series_tests.split(',') + if args.dowebgui: + gen_order = ["ping_test", "qos_test", "ftp_test", "http_test", "mcast_test", "vs_test", "thput_test", "yt_test", "rb_test", "zoom_test", "teams_test"] + temp_ord_list = [] + for test_name in gen_order: + if test_name in ordered_series_tests: + temp_ord_list.append(test_name) + ordered_series_tests = temp_ord_list.copy() + for idx, test_name in enumerate(ordered_series_tests): + test_name = test_name.strip().lower() + if test_name in test_map: + func, label = test_map[test_name] + args.current = "series" + if test_name in ['rb_test', 'zoom_test', 'yt_test', 'teams_test']: + if test_name == "rb_test": + obj_no = 1 + while f"rb_test_{obj_no}" in self.rb_obj_dict["series"]: + obj_no += 1 + obj_name = f"rb_test_{obj_no}" + self.rb_obj_dict["series"][obj_name] = manager.dict({"obj": None, "data": None}) + logging.debug("Adding rb_test object to parallel execution") + elif test_name == "yt_test": + obj_no = 1 + while f"yt_test_{obj_no}" in self.yt_obj_dict["series"]: + obj_no += 1 + obj_name = f"yt_test_{obj_no}" + self.yt_obj_dict["series"][obj_name] = manager.dict({"obj": None, "data": None}) + elif test_name == "zoom_test": + obj_no = 1 + while f"zoom_test_{obj_no}" in self.zoom_obj_dict["series"]: + obj_no += 1 + obj_name = f"zoom_test_{obj_no}" + self.zoom_obj_dict["series"][obj_name] = manager.dict({"obj": None, "data": None}) + logging.debug("Adding zoom_test object to parallel execution") + elif test_name == "teams_test": + obj_no = 1 + while f"teams_test_{obj_no}" in self.teams_obj_dict["series"]: + obj_no += 1 + obj_name = f"teams_test_{obj_no}" + self.teams_obj_dict["series"][obj_name] = manager.dict({"obj": None, "data": None}) + logging.debug("Adding teams_test object to parallel execution") + series_threads.append(multiprocessing.Process(target=run_test_safe(func, f"{label} [Series {idx + 1}]", args, self, duration_dict[test_name]))) + else: + series_threads.append(threading.Thread( + target=run_test_safe(func, f"{label} [Series {idx + 1}]", args, self, duration_dict[test_name]) + )) + else: + logging.warning(f"Unknown test '{test_name}' in --series_tests") + + # Process parallel tests + if args.parallel_tests: + """ + Process parallel test configurations, enforcing order when web GUI reporting is enabled. + Initializes appropriate threading or multiprocessing workers based on test type requirements. + """ + ordered_parallel_tests = args.parallel_tests.split(',') + + if args.dowebgui: + gen_order = ["ping_test", "qos_test", "ftp_test", "http_test", "mcast_test", "vs_test", "thput_test", "yt_test", "rb_test", "zoom_test", "teams_test"] + ordered_parallel_tests = [t for t in gen_order if t in ordered_parallel_tests] + + for idx, test_name in enumerate(ordered_parallel_tests): + test_name = test_name.strip().lower() + if test_name in test_map: + func, label = test_map[test_name] + args.current = "parallel" + if test_name in ['rb_test', 'zoom_test', 'yt_test', 'teams_test']: + if test_name == "rb_test": + self.rb_obj_dict["parallel"]["rb_test"] = manager.dict({"obj": None, "data": None}) + logging.debug("Adding rb_test object to parallel execution") + elif test_name == "yt_test": + self.yt_obj_dict["parallel"]["yt_test"] = manager.dict({"obj": None, "data": None}) + logging.debug("Adding yt_test object to parallel execution") + elif test_name == "zoom_test": + self.zoom_obj_dict["parallel"]["zoom_test"] = manager.dict({"obj": None, "data": None}) + logging.debug("Adding zoom_test object to parallel execution") + elif test_name == "teams_test": + self.teams_obj_dict["parallel"]["teams_test"] = manager.dict({"obj": None, "data": None}) + logging.debug("Adding teams_test object to parallel execution") + + parallel_threads.append(multiprocessing.Process( + target=run_test_safe(func, f"{label} [Parallel {idx + 1}]", args, self, duration_dict[test_name]) + )) + else: + parallel_threads.append(threading.Thread( + target=run_test_safe(func, f"{label} [Parallel {idx + 1}]", args, self, duration_dict[test_name]) + )) + else: + logging.warning(f"Unknown test '{test_name}' in --parallel_tests") + + if args.dowebgui: + """ + Initialize overall execution status and tracking CSV for Web GUI reporting. + """ + self.overall_status = { + "ping": "notstarted", "qos": "notstarted", "ftp": "notstarted", "http": "notstarted", + "mc": "notstarted", "vs": "notstarted", "thput": "notstarted", "rb": "notstarted", + "zoom": "notstarted", "yt": "notstarted", "teams": "notstarted", + "time": datetime.datetime.now().strftime("%Y %d %H:%M:%S"), + "status": "running", "current_mode": "tbd", "current_test_name": "tbd" + } + self.overall_csv.append(self.overall_status.copy()) + df1 = pd.DataFrame(self.overall_csv) + df1.to_csv(f'{args.result_dir}/overall_status.csv', index=False) + + """ + Execute scheduled test scenarios sequentially and/or in parallel according to priority. + """ + if args.order_priority == 'series': + self.current_exec = "series" + for t in series_threads: + t.start() + t.join() + self.series_index += 1 + + # Then run parallel tests + if parallel_threads: + self.misc_clean_up(layer3=True, layer4=True, generic=True, port_5000=iszoom, port_5002=isyt, port_5003=isrb) + logging.info('Starting parallel tests...') + time.sleep(10) + + self.current_exec = "parallel" + for t in parallel_threads: + t.start() + + self.parallel_index = 0 + for t in parallel_threads: + t.join() + self.parallel_index += 1 + + else: + self.current_exec = "parallel" + for t in parallel_threads: + t.start() + + for t in parallel_threads: + t.join() + + if series_threads: + self.misc_clean_up(layer3=True, layer4=True, generic=True, port_5000=iszoom, port_5002=isyt, port_5003=isrb) + logging.info('Starting series tests...') + time.sleep(5) + + self.current_exec = "series" + for t in series_threads: + t.start() + t.join() + else: + logger.error("Provide either --parallel_tests or --series_tests") + exit(1) + + """ + Finalize test execution: perform cleanup, save logs, generate the overall report, + and update the Web GUI status if applicable. + """ + self.misc_clean_up(layer3=True, layer4=True, generic=True, port_5000=iszoom, port_5002=isyt, port_5003=isrb) + log_file = save_logs() + logging.info(f"Logs saved to: {log_file}") + + test_results_df = pd.DataFrame(list(test_results_list)) + self.generate_overall_report(test_results_df=test_results_df, args_dict=args_dict) + + if self.dowebgui: + try: + self.overall_status["status"] = "completed" + self.overall_status["time"] = datetime.datetime.now().strftime("%Y %d %H:%M:%S") + self.overall_csv.append(self.overall_status.copy()) + df1 = pd.DataFrame(self.overall_csv) + df1.to_csv(f'{self.result_dir}/overall_status.csv', index=False) + except Exception as e: + logging.error(f"Error while writing status file for webui: {e}") + + logging.info(f"\nTest Results Summary:\n{test_results_df}") + def webgui_stop_check(self, test_name): """ Checks the running JSON file generated when a test is executed via the web GUI. @@ -7958,7 +8172,7 @@ def render_each_test(self, ce): test_setup_info = { 'Configuration': configmap, 'Website / IP': curr_ping_obj.target, - 'No of Devices': '{} (V:{}, A:{}, W:{}, L:{}, M:{})'.format(len(curr_ping_obj.sta_list), len(curr_ping_obj.sta_list) - len(curr_ping_obj.real_sta_list), curr_ping_obj.android, curr_ping_obj.windows, curr_ping_obj.linux, curr_ping_obj.mac), # noqa E501 + 'No of Devices': '{} (V:{}, A:{}, W:{}, L:{}, M:{})'.format(len(curr_ping_obj.sta_list), len(curr_ping_obj.sta_list) - len(curr_ping_obj.real_sta_list), curr_ping_obj.android, curr_ping_obj.windows, curr_ping_obj.linux, curr_ping_obj.mac), # noqa E501 'Duration (in minutes)': curr_ping_obj.duration } if self.robot_test and not self.do_bandsteering: @@ -13429,59 +13643,38 @@ def parse_args(): return args -def main(): - ''' - Main Execution Block - -------------------- - Initializes the command-line argument parser, defines argument flags, - and orchestrates test suite execution for the MULTI TRAFFIC TEST. - ''' - args = parse_args() - help_summary = '''\ -lf_multi_traffic.py is used to execute multiple testcases -either in series, parallel, or hybrid mode. -The script supports running tests sequentially (series), simultaneously (parallel), -or a combination of both, with configurable execution priority. -''' - if args.help_summary: - print(help_summary) - exit(0) - args.zoom_duration = normalise_time(args.zoom_duration) - args.ping_duration = normalise_time(args.ping_duration) - args.teams_duration = normalise_time(args.teams_duration) - args_dict = vars(args) - duration_dict = {} - multi_traffic_obj = MultiTraffic( - ip=args.mgr, - port=args.mgr_port, - order_priority=args.order_priority, - test_name=args.test_name, - result_dir=args.result_dir, - dowebgui=args.dowebgui, - no_cleanup=args.no_cleanup, - robot_test=args.robot_test, - robot_ip=args.robot_ip, - coordinate=args.coordinate, - rotation=args.rotation, - do_bandsteering=args.do_bandsteering, - bssids=args.bssids, - cycles=args.cycles, - duration_to_skip=args.duration_to_skip) - # Map available test flags to their respective execution methods - test_map = { - "ping_test": (run_ping_test, "PING TEST"), - "http_test": (run_http_test, "HTTP TEST"), - "ftp_test": (run_ftp_test, "FTP TEST"), - "qos_test": (run_qos_test, "QoS TEST"), - "vs_test": (run_vs_test, "VIDEO STREAMING TEST"), - "thput_test": (run_thput_test, "THROUGHPUT TEST"), - "mcast_test": (run_mcast_test, "MULTICAST TEST"), - "yt_test": (run_yt_test, "YOUTUBE TEST"), - "rb_test": (run_rb_test, "REAL BROWSER TEST"), - "zoom_test": (run_zoom_test, "ZOOM TEST"), - "teams_test": (run_teams_test, "TEAMS TEST"), - } +def validate_arguments(args, test_map, args_dict): + """ + Validate command-line arguments and prepare test execution metadata. + + This function: + - Ensures at least one test suite is provided. + - Validates test names against the supported test map. + - Detects duplicate test entries. + - Validates and prepares duration configuration for tests. + - Generates execution-ready series and parallel test lists. + + Args: + args (argparse.Namespace): + Parsed command-line arguments. + + test_map (dict): + Mapping of supported test names and execution handlers. + args_dict (dict): + Dictionary version of parsed command-line arguments. + + Returns: + tuple: + tests_to_run_series (list): + Validated list of sequential tests. + + tests_to_run_parallel (list): + Validated list of parallel tests. + + duration_dict (dict): + Dictionary mapping test names to execution durations. + """ # Ensure at least one test suite is provided if not args.series_tests and not args.parallel_tests: logger.error("Please provide tests cases --parallel_tests or --series_tests") @@ -13520,7 +13713,7 @@ def main(): if args.parallel_tests and (len(tests_to_run_parallel) != len(set(tests_to_run_parallel))): logger.error("in --parallel dont specify duplicate tests") exit(0) - + duration_dict = {} duration_flag = False if args.series_tests: for test in args.series_tests.split(','): @@ -13548,192 +13741,65 @@ def main(): logger.error(f"wrong duration type for {test_name}") if duration_flag: exit(1) - multi_traffic_obj.duration_dict = duration_dict.copy() - # args.current = "series" - iszoom = 'zoom_test' in tests_to_run_parallel or 'zoom_test' in tests_to_run_series - isrb = 'rb_test' in tests_to_run_parallel or 'rb_test' in tests_to_run_series - isyt = 'yt_test' in tests_to_run_parallel or 'yt_test' in tests_to_run_series - multi_traffic_obj.series_tests = tests_to_run_series - multi_traffic_obj.parallel_tests = tests_to_run_parallel - multi_traffic_obj.misc_clean_up(layer3=True, layer4=True, generic=True, port_5000=iszoom, port_5002=isyt, port_5003=isrb) - if args.series_tests or args.parallel_tests: - series_threads = [] - parallel_threads = [] - # Process series tests - if args.series_tests: - ordered_series_tests = args.series_tests.split(',') - if args.dowebgui: - gen_order = ["ping_test", "qos_test", "ftp_test", "http_test", "mcast_test", "vs_test", "thput_test", "yt_test", "rb_test", "zoom_test", "teams_test"] - temp_ord_list = [] - for test_name in gen_order: - if test_name in ordered_series_tests: - temp_ord_list.append(test_name) - ordered_series_tests = temp_ord_list.copy() - for idx, test_name in enumerate(ordered_series_tests): - test_name = test_name.strip().lower() - if test_name in test_map: - func, label = test_map[test_name] - args.current = "series" - if test_name in ['rb_test', 'zoom_test', 'yt_test', 'teams_test']: - if test_name == "rb_test": - obj_no = 1 - while f"rb_test_{obj_no}" in multi_traffic_obj.rb_obj_dict["series"]: - obj_no += 1 - obj_name = f"rb_test_{obj_no}" - multi_traffic_obj.rb_obj_dict["series"][obj_name] = manager.dict({"obj": None, "data": None}) - logging.debug("Adding rb_test object to parallel execution") - elif test_name == "yt_test": - obj_no = 1 - while f"yt_test_{obj_no}" in multi_traffic_obj.yt_obj_dict["series"]: - obj_no += 1 - obj_name = f"yt_test_{obj_no}" - multi_traffic_obj.yt_obj_dict["series"][obj_name] = manager.dict({"obj": None, "data": None}) - elif test_name == "zoom_test": - obj_no = 1 - while f"zoom_test_{obj_no}" in multi_traffic_obj.zoom_obj_dict["series"]: - obj_no += 1 - obj_name = f"zoom_test_{obj_no}" - multi_traffic_obj.zoom_obj_dict["series"][obj_name] = manager.dict({"obj": None, "data": None}) - logging.debug("Adding zoom_test object to parallel execution") - elif test_name == "teams_test": - obj_no = 1 - while f"teams_test_{obj_no}" in multi_traffic_obj.teams_obj_dict["series"]: - obj_no += 1 - obj_name = f"teams_test_{obj_no}" - multi_traffic_obj.teams_obj_dict["series"][obj_name] = manager.dict({"obj": None, "data": None}) - logging.debug("Adding teams_test object to parallel execution") - series_threads.append(multiprocessing.Process(target=run_test_safe(func, f"{label} [Series {idx + 1}]", args, multi_traffic_obj, duration_dict[test_name]))) - else: - series_threads.append(threading.Thread( - target=run_test_safe(func, f"{label} [Series {idx + 1}]", args, multi_traffic_obj, duration_dict[test_name]) - )) - else: - logging.warning(f"Unknown test '{test_name}' in --series_tests") - - # Process parallel tests - if args.parallel_tests: - """ - Process parallel test configurations, enforcing order when web GUI reporting is enabled. - Initializes appropriate threading or multiprocessing workers based on test type requirements. - """ - ordered_parallel_tests = args.parallel_tests.split(',') - - if args.dowebgui: - gen_order = ["ping_test", "qos_test", "ftp_test", "http_test", "mcast_test", "vs_test", "thput_test", "yt_test", "rb_test", "zoom_test", "teams_test"] - ordered_parallel_tests = [t for t in gen_order if t in ordered_parallel_tests] - - for idx, test_name in enumerate(ordered_parallel_tests): - test_name = test_name.strip().lower() - if test_name in test_map: - func, label = test_map[test_name] - args.current = "parallel" - if test_name in ['rb_test', 'zoom_test', 'yt_test', 'teams_test']: - if test_name == "rb_test": - multi_traffic_obj.rb_obj_dict["parallel"]["rb_test"] = manager.dict({"obj": None, "data": None}) - logging.debug("Adding rb_test object to parallel execution") - elif test_name == "yt_test": - multi_traffic_obj.yt_obj_dict["parallel"]["yt_test"] = manager.dict({"obj": None, "data": None}) - logging.debug("Adding yt_test object to parallel execution") - elif test_name == "zoom_test": - multi_traffic_obj.zoom_obj_dict["parallel"]["zoom_test"] = manager.dict({"obj": None, "data": None}) - logging.debug("Adding zoom_test object to parallel execution") - elif test_name == "teams_test": - multi_traffic_obj.teams_obj_dict["parallel"]["teams_test"] = manager.dict({"obj": None, "data": None}) - logging.debug("Adding teams_test object to parallel execution") - - parallel_threads.append(multiprocessing.Process( - target=run_test_safe(func, f"{label} [Parallel {idx + 1}]", args, multi_traffic_obj, duration_dict[test_name]) - )) - else: - parallel_threads.append(threading.Thread( - target=run_test_safe(func, f"{label} [Parallel {idx + 1}]", args, multi_traffic_obj, duration_dict[test_name]) - )) - else: - logging.warning(f"Unknown test '{test_name}' in --parallel_tests") - - if args.dowebgui: - """ - Initialize overall execution status and tracking CSV for Web GUI reporting. - """ - multi_traffic_obj.overall_status = { - "ping": "notstarted", "qos": "notstarted", "ftp": "notstarted", "http": "notstarted", - "mc": "notstarted", "vs": "notstarted", "thput": "notstarted", "rb": "notstarted", - "zoom": "notstarted", "yt": "notstarted", "teams": "notstarted", - "time": datetime.datetime.now().strftime("%Y %d %H:%M:%S"), - "status": "running", "current_mode": "tbd", "current_test_name": "tbd" - } - multi_traffic_obj.overall_csv.append(multi_traffic_obj.overall_status.copy()) - df1 = pd.DataFrame(multi_traffic_obj.overall_csv) - df1.to_csv(f'{args.result_dir}/overall_status.csv', index=False) - - """ - Execute scheduled test scenarios sequentially and/or in parallel according to priority. - """ - if args.order_priority == 'series': - multi_traffic_obj.current_exec = "series" - for t in series_threads: - t.start() - t.join() - multi_traffic_obj.series_index += 1 - - # Then run parallel tests - if parallel_threads: - multi_traffic_obj.misc_clean_up(layer3=True, layer4=True, generic=True, port_5000=iszoom, port_5002=isyt, port_5003=isrb) - logging.info('Starting parallel tests...') - time.sleep(10) - - multi_traffic_obj.current_exec = "parallel" - for t in parallel_threads: - t.start() - - multi_traffic_obj.parallel_index = 0 - for t in parallel_threads: - t.join() - multi_traffic_obj.parallel_index += 1 - - else: - multi_traffic_obj.current_exec = "parallel" - for t in parallel_threads: - t.start() - - for t in parallel_threads: - t.join() - - if series_threads: - multi_traffic_obj.misc_clean_up(layer3=True, layer4=True, generic=True, port_5000=iszoom, port_5002=isyt, port_5003=isrb) - logging.info('Starting series tests...') - time.sleep(5) - - multi_traffic_obj.current_exec = "series" - for t in series_threads: - t.start() - t.join() - else: - logger.error("Provide either --parallel_tests or --series_tests") - exit(1) - - """ - Finalize test execution: perform cleanup, save logs, generate the overall report, - and update the Web GUI status if applicable. - """ - multi_traffic_obj.misc_clean_up(layer3=True, layer4=True, generic=True, port_5000=iszoom, port_5002=isyt, port_5003=isrb) - log_file = save_logs() - logging.info(f"Logs saved to: {log_file}") - - test_results_df = pd.DataFrame(list(test_results_list)) - multi_traffic_obj.generate_overall_report(test_results_df=test_results_df, args_dict=args_dict) + return tests_to_run_series, tests_to_run_parallel, duration_dict - if multi_traffic_obj.dowebgui: - try: - multi_traffic_obj.overall_status["status"] = "completed" - multi_traffic_obj.overall_status["time"] = datetime.datetime.now().strftime("%Y %d %H:%M:%S") - multi_traffic_obj.overall_csv.append(multi_traffic_obj.overall_status.copy()) - df1 = pd.DataFrame(multi_traffic_obj.overall_csv) - df1.to_csv(f'{multi_traffic_obj.result_dir}/overall_status.csv', index=False) - except Exception as e: - logging.error(f"Error while writing status file for webui: {e}") - logging.info(f"\nTest Results Summary:\n{test_results_df}") +def main(): + ''' + Main Execution Block + -------------------- + Initializes the command-line argument parser, defines argument flags, + and orchestrates test suite execution for the MULTI TRAFFIC TEST. + ''' + args = parse_args() + help_summary = '''\ +lf_multi_traffic.py is used to execute multiple testcases +either in series, parallel, or hybrid mode. +The script supports running tests sequentially (series), simultaneously (parallel), +or a combination of both, with configurable execution priority. +''' + if args.help_summary: + print(help_summary) + exit(0) + args.zoom_duration = normalise_time(args.zoom_duration) + args.ping_duration = normalise_time(args.ping_duration) + args.teams_duration = normalise_time(args.teams_duration) + args_dict = vars(args) + test_map = { + "ping_test": (run_ping_test, "PING TEST"), + "http_test": (run_http_test, "HTTP TEST"), + "ftp_test": (run_ftp_test, "FTP TEST"), + "qos_test": (run_qos_test, "QoS TEST"), + "vs_test": (run_vs_test, "VIDEO STREAMING TEST"), + "thput_test": (run_thput_test, "THROUGHPUT TEST"), + "mcast_test": (run_mcast_test, "MULTICAST TEST"), + "yt_test": (run_yt_test, "YOUTUBE TEST"), + "rb_test": (run_rb_test, "REAL BROWSER TEST"), + "zoom_test": (run_zoom_test, "ZOOM TEST"), + "teams_test": (run_teams_test, "TEAMS TEST"), + } + multi_traffic_obj = MultiTraffic( + ip=args.mgr, + port=args.mgr_port, + order_priority=args.order_priority, + test_name=args.test_name, + result_dir=args.result_dir, + dowebgui=args.dowebgui, + no_cleanup=args.no_cleanup, + robot_test=args.robot_test, + robot_ip=args.robot_ip, + coordinate=args.coordinate, + rotation=args.rotation, + do_bandsteering=args.do_bandsteering, + bssids=args.bssids, + cycles=args.cycles, + duration_to_skip=args.duration_to_skip, + test_map=test_map) + # Map available test flags to their respective execution methods + tests_to_run_series, tests_to_run_parallel, duration_dict = validate_arguments(args, test_map, args_dict) + multi_traffic_obj.duration_dict = duration_dict.copy() + # args.current = "series" + multi_traffic_obj.start_tests(test_map, tests_to_run_series, tests_to_run_parallel, duration_dict, args, args_dict) def run_test_safe(test_func, test_name, args, multi_traffic_obj, duration): From 0cf42e228272a7cf67fb458f23ab82ff8385adff Mon Sep 17 00:00:00 2001 From: Sidartha-CT Date: Mon, 25 May 2026 06:11:12 +0000 Subject: [PATCH 3/3] lf_multi_traffic.py : Add function call support for CLI arguments Verified CLI : python3 lf_multi_traffic.py --run_specific_scenario Signed-off-by: Sidartha-CT --- py-scripts/lf_multi_traffic.py | 249 ++++++++++++++++++++++++++++++++- 1 file changed, 247 insertions(+), 2 deletions(-) diff --git a/py-scripts/lf_multi_traffic.py b/py-scripts/lf_multi_traffic.py index af7787b6b..55f523924 100644 --- a/py-scripts/lf_multi_traffic.py +++ b/py-scripts/lf_multi_traffic.py @@ -262,6 +262,23 @@ NOTE : Add traffic related args from EXAMPLE 1 + EXAMPLE-5: Import and use lf_multi_traffic in another script by passing CLI-equivalent arguments as function parameters + + from lf_multi_traffic import initialize_multitraffic_obj + + base_class_obj = initialize_multitraffic_obj( + mgr="192.168.207.78", + upstream_port="eth1", + parallel_tests="ping_test", + + ping_target="www.google.com", + ping_interval=5, + ping_duration=1, + ping_device_list="1.4,1.11,1.12", + ) + + base_class_obj.start_scenario() + NOTES: 1. Duration format: s (seconds), m (minutes), h (hours) @@ -510,6 +527,23 @@ def __init__(self, ip='localhost', port=8080, order_priority="series", result_di self.robot_rotate_call_counter = Value('i', 0) self.robot_call_counter = Value('i', 0) + def start_scenario(self): + """ + Validate configured test scenarios and initiate execution. + + This method: + - Validates all configured series and parallel test arguments. + - Prepares duration mappings for each test. + - Stores validated duration configuration. + - Starts execution of the configured traffic test scenarios. + + Returns: + None + """ + tests_to_run_series, tests_to_run_parallel, duration_dict = validate_arguments(self.args, self.test_map, self.args_dict) + self.duration_dict = duration_dict.copy() + self.start_tests(self.test_map, tests_to_run_series, tests_to_run_parallel, self.duration_dict, self.args, self.args_dict) + def start_tests(self, test_map, tests_to_run_series, tests_to_run_parallel, duration_dict, args, args_dict): """ Start and manage execution of configured traffic test suites. @@ -12782,7 +12816,7 @@ def normalise_time(value): return value # unknown suffix → unchanged -def parse_args(): +def parse_args(return_parser=False): parser = argparse.ArgumentParser( prog="lf_multi_traffic.py", formatter_class=argparse.RawTextHelpFormatter, @@ -13049,6 +13083,23 @@ def parse_args(): NOTE : Add traffic related args from EXAMPLE 1 + EXAMPLE-5: Import and use lf_multi_traffic in another script by passing CLI-equivalent arguments as function parameters + + from lf_multi_traffic import initialize_multitraffic_obj + + base_class_obj = initialize_multitraffic_obj( + mgr="192.168.207.78", + upstream_port="eth1", + parallel_tests="ping_test", + + ping_target="www.google.com", + ping_interval=5, + ping_duration=1, + ping_device_list="1.4,1.11,1.12", + ) + + base_class_obj.start_scenario() + NOTES: @@ -13096,6 +13147,7 @@ def parse_args(): parser.add_argument('--result_dir', help="Specify the result dir to store the runtime logs ", default='') parser.add_argument('--no_cleanup', help='Do not cleanup before exit', action='store_true') parser.add_argument('--help_summary', action="store_true", help='Show summary of what this script does') + parser.add_argument('--run_specific_scenario', action='store_true', help='Specify the scenario to run just by giving CLI as function call') # PING ARGS # without config parser.add_argument('--ping_test', @@ -13637,8 +13689,9 @@ def parse_args(): parser.add_argument('--cycles', type=int, default=1, help='No of cycles to perform band steering') parser.add_argument('--bssids', type=str, default='', help='hostname for where Robot server is running') parser.add_argument("--duration_to_skip", type=int, help='Specify the maximum time in seconds to skip a point if there is an obstacle', default=60) - # + if return_parser: + return parser args = parser.parse_args() return args @@ -13761,6 +13814,9 @@ def main(): if args.help_summary: print(help_summary) exit(0) + if args.run_specific_scenario: + run_specific_scenario() + exit(0) args.zoom_duration = normalise_time(args.zoom_duration) args.ping_duration = normalise_time(args.ping_duration) args.teams_duration = normalise_time(args.teams_duration) @@ -13882,6 +13938,106 @@ def save_logs(): return log_filename +def convert_kwargs_to_cli_list(kwargs): + """ + Convert keyword arguments into CLI-compatible argument list format. + + Boolean arguments are added as standalone flags when True, + while other argument types are added as key-value pairs. + + Args: + kwargs (dict): + Dictionary containing argument names and values. + + Returns: + list: + CLI-style argument list suitable for argparse parsing. + """ + cli_list = [] + for key, value in kwargs.items(): + if isinstance(value, bool): + if value: + cli_list.append("--{}".format(str(key))) + else: + cli_list.append("--{}".format(key)) + cli_list.append(str(value)) + return cli_list + + +def initialize_multitraffic_obj(**kwargs): + """ + Initialize and configure the MultiTraffic base class object. + + This function: + - Converts keyword arguments into CLI-compatible format. + - Parses and normalizes execution arguments. + - Creates test mappings for supported traffic tests. + - Initializes the MultiTraffic execution object. + - Returns a fully configured MultiTraffic instance. + + Args: + **kwargs: + Arbitrary keyword arguments representing CLI parameters + or configuration values for test execution. + + Returns: + MultiTraffic: + Configured MultiTraffic class instance ready for execution. + """ + global test_results_list + test_results_list = manager.list() + cli_list = None + cli = False + if "cli" in kwargs: + cli = kwargs["cli"] + if "cli_list" in kwargs: + cli_list = kwargs["cli_list"] + if cli and cli_list: + # TODO directly placing cli_list and validation + pass + else: + cli_list = convert_kwargs_to_cli_list(kwargs) + parser = parse_args(return_parser=True) + args = parser.parse_args(cli_list) + args.zoom_duration = normalise_time(args.zoom_duration) + args.ping_duration = normalise_time(args.ping_duration) + args.teams_duration = normalise_time(args.teams_duration) + args_dict = vars(args) + test_map = { + "ping_test": (run_ping_test, "PING TEST"), + "http_test": (run_http_test, "HTTP TEST"), + "ftp_test": (run_ftp_test, "FTP TEST"), + "qos_test": (run_qos_test, "QoS TEST"), + "vs_test": (run_vs_test, "VIDEO STREAMING TEST"), + "thput_test": (run_thput_test, "THROUGHPUT TEST"), + "mcast_test": (run_mcast_test, "MULTICAST TEST"), + "yt_test": (run_yt_test, "YOUTUBE TEST"), + "rb_test": (run_rb_test, "REAL BROWSER TEST"), + "zoom_test": (run_zoom_test, "ZOOM TEST"), + "teams_test": (run_teams_test, "TEAMS TEST"), + } + multi_traffic_obj = MultiTraffic( + ip=args.mgr, + port=args.mgr_port, + order_priority=args.order_priority, + test_name=args.test_name, + result_dir=args.result_dir, + dowebgui=args.dowebgui, + no_cleanup=args.no_cleanup, + robot_test=args.robot_test, + robot_ip=args.robot_ip, + coordinate=args.coordinate, + rotation=args.rotation, + do_bandsteering=args.do_bandsteering, + bssids=args.bssids, + cycles=args.cycles, + duration_to_skip=args.duration_to_skip, + args=args, + args_dict=args_dict, + test_map=test_map) + return multi_traffic_obj + + def run_ping_test(args, multi_traffic_obj: MultiTraffic): """Executes a ping test using the provided arguments.""" return multi_traffic_obj.run_ping_test( @@ -14364,5 +14520,94 @@ def run_teams_test(args, multi_traffic_obj: MultiTraffic): ) +def run_specific_scenario(): + """ + Example scenario runner for executing predefined multi-traffic tests. + + This function demonstrates how to: + - Import and use `initialize_multitraffic_obj` from another script. + - Pass CLI-equivalent arguments directly as keyword parameters. + - Configure multiple traffic test types in a single scenario. + - Initialize the MultiTraffic base object programmatically. + - Start execution of the configured traffic test scenario. + + Example: + from lf_multi_traffic import initialize_multitraffic_obj + + Returns: + None + """ + base_class_obj = initialize_multitraffic_obj( + mgr="192.168.207.78", + upstream_port="eth1", + parallel_tests="ping_test,qos_test,ftp_test,http_test,mcast_test,vs_test,thput_test", + + ping_target="www.google.com", + ping_interval=5, + ping_duration=1, + ping_device_list="1.4,1.11,1.12", + + qos_tos="VO,VI,BE,BK", + qos_duration="1m", + qos_device_list="1.4,1.11,1.12", + qos_traffic_type="lf_tcp", + qos_download=10000000, + + ftp_duration="1m", + ftp_file_size="5MB", + ftp_device_list="1.4,1.11,1.12", + ftp_bands="5G", + + http_duration="1m", + http_file_size="5MB", + http_device_list="1.4,1.11,1.12", + http_bands="5G", + + mcast_tos="VO", + mcast_test_duration="1m", + mcast_side_b_min_bps=10000000, + mcast_device_list="1.4,1.11,1.12", + mcast_endp_type="mc_udp", + + vs_url="https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8", + vs_media_source="hls", + vs_media_quality="4k", + vs_duration="1m", + vs_device_list="1.4,1.11,1.12", + + thput_test_duration="1m", + thput_traffic_type="lf_udp", + thput_device_list="1.4,1.11,1.12", + thput_upload=10000000, + + rb_duration="1m", + rb_device_list="1.4,1.11,1.12", + rb_webgui_incremental="no_increment", + rb_count=10, + + yt_url="https://youtu.be/BHACKCNDMW8?si=psTEUzrc77p38aU1", + yt_duration="1m", + yt_res="144p", + yt_device_list="1.4,1.11,1.12", + + zoom_signin_email="candelatech2@gmail.com", + zoom_signin_passwd="CANDELAtech1@530048", + zoom_duration=2, + zoom_host="1.4", + zoom_participants=2, + zoom_device_list="1.4,1.11,1.12", + zoom_audio=True, + zoom_video=True, + + teams_duration="2m", + teams_device_list="1.4,1.11,1.12", + teams_audio=True, + teams_video=True, + + no_cleanup=True + ) + base_class_obj.start_scenario() + + if __name__ == "__main__": main()