diff --git a/bin/wfbench b/bin/wfbench index 4bc03cba..68c73a02 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright (c) 2021-2025 The WfCommons Team. @@ -12,19 +12,23 @@ import os import pathlib import subprocess import time -import sys import signal -import queue +import sys import argparse import re import json import logging -import pandas as pd +import psutil from io import StringIO from filelock import FileLock from pathos.helpers import mp as multiprocessing -from typing import List, Optional + +from abc import ABC, abstractmethod +from typing import List, Optional, IO + +int32_max = 2147483647 +this_dir = pathlib.Path(__file__).resolve().parent # Configure logging @@ -35,10 +39,6 @@ logging.basicConfig( handlers=[logging.StreamHandler()] ) - -this_dir = pathlib.Path(__file__).resolve().parent - - def log_info(msg: str): """ Log an info message to stderr @@ -66,6 +66,227 @@ def log_error(msg: str): """ logging.error(msg) +# Utility process class +####################### + +class ProcessHandle: + def __init__(self, proc: multiprocessing.Process | subprocess.Popen): + self._proc = proc + + @property + def pid(self): + return self._proc.pid + + def terminate(self): + self._proc.terminate() + + def terminate_along_with_children(self): + if isinstance(self._proc, multiprocessing.Process): + self._proc.terminate() + return + try: + pgid = os.getpgid(self._proc.pid) + os.killpg(pgid, signal.SIGKILL) + except ProcessLookupError: + pass # group leader already gone, try children directly + except PermissionError: + pass + finally: + # Catch any re-parented children (ppid=1) that psutil can still see + try: + for child in psutil.Process(self._proc.pid).children(recursive=True): + try: + child.kill() + except psutil.NoSuchProcess: + pass + except psutil.NoSuchProcess: + pass + + def wait(self): + if isinstance(self._proc, multiprocessing.Process): + self._proc.join() + else: + self._proc.wait() + + def is_alive(self): + if isinstance(self._proc, multiprocessing.Process): + return self._proc.is_alive() + else: + return self._proc.poll() is None + +# Benchmark classes +################### + +class Benchmark(ABC): + @abstractmethod + def run(self) -> multiprocessing.Process: + pass + +class IOReadBenchmark: + def __init__(self): + self.to_read : dict[str, (IO, int)] = {} + + def add_read_operation(self, filepath: str, opened_file: IO, num_bytes: int): + self.to_read[filepath] = (opened_file, num_bytes) + + def run(self) -> ProcessHandle | None: + if len(self.to_read) <= 0: + return None + p = multiprocessing.Process(target=self.benchmark_function, args=()) + p.start() + return ProcessHandle(p) + + def benchmark_function(self): + for filepath, (opened_file, bytes_to_read) in self.to_read.items(): + log_debug(f"Reading {bytes_to_read} bytes from {filepath}...") + opened_file.read(bytes_to_read) + + +class IOWriteBenchmark: + def __init__(self): + self.to_write : dict[str, (IO, int)] = {} + + def add_write_operation(self, filepath: str, opened_file: IO, num_bytes: int): + self.to_write[filepath] = (opened_file, num_bytes) + + def run(self) -> ProcessHandle | None: + if len(self.to_write) <= 0: + return None + p = multiprocessing.Process(target=self.benchmark_function, args=()) + p.start() + return ProcessHandle(p) + + def benchmark_function(self): + for filepath, (opened_file, bytes_to_write) in self.to_write.items(): + log_debug(f"Writing {bytes_to_write} bytes to {filepath}...") + opened_file.write(os.urandom(int(bytes_to_write))) + opened_file.flush() + + +class CPUBenchmark: + def __init__(self, cpu_threads: Optional[int] = 5, + mem_threads: Optional[int] = 5, + core: Optional[int] = None, + total_mem: Optional[int] = None): + self.cpu_threads = cpu_threads + self.mem_threads = mem_threads + self.core = core + self.total_mem = total_mem + self.work = None + + def set_work(self, work: int): + self.work = work + + def set_infinite_work(self): + self.work = int32_max # "infinite" + + def run(self) -> list[ProcessHandle | None]: + if self.work is None or self.work <= 0: + return [None, None] + + total_mem = f"{self.total_mem}B" if self.total_mem else f"{100.0 / os.cpu_count()}%" + cpu_work_per_thread = int(1000000 * self.work / (16384 * self.cpu_threads)) if self.cpu_threads != 0 else int32_max ** 2 + cpu_samples = min(cpu_work_per_thread, int32_max) + cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max + if cpu_ops > int32_max: + log_info("Exceeded maximum allowed value of cpu work.") + cpu_ops = int32_max + + + # Start CPU benchmark, if need be + cpu_proc_handle = None + if self.cpu_threads > 0: + log_debug(f"Running CPU benchmark with {self.cpu_threads} threads for {self.work if self.work < int32_max else 'infinite'} units of work...") + cpu_prog = ["stress-ng", "--monte-carlo", f"{self.cpu_threads}", + "--monte-carlo-method", "pi", + "--monte-carlo-rand", "lcg", + "--monte-carlo-samples", f"{cpu_samples}", + "--monte-carlo-ops", f"{cpu_ops}", + "--quiet"] + cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) + cpu_proc_handle = ProcessHandle(cpu_proc) + + # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) + if self.core: + os.sched_setaffinity(cpu_proc.pid, {self.core}) + + # Start Memory benchmark, if need be + mem_proc_handle = None + if self.mem_threads > 0: + # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows + log_debug(f"Running memory benchmark with {self.mem_threads} threads...") + mem_prog = ["stress-ng", "--vm", f"{self.mem_threads}", + "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"] + mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) + mem_proc_handle = ProcessHandle(mem_proc) + if self.core: + os.sched_setaffinity(mem_proc.pid, {self.core}) + + return [cpu_proc_handle, mem_proc_handle] + + +class GPUBenchmark: + + @staticmethod + def get_available_gpus(): + proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, _ = proc.communicate() + df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ") + return df[df["utilization.gpu"] <= 5].index.to_list() + + def __init__(self): + self.work = None + self.duration = None + self.device = None + + def set_device(self): + available_gpus = self.get_available_gpus() # checking for available GPUs + if not available_gpus: + log_error("No GPU available") + sys.exit(1) + self.device = available_gpus[0] + log_debug(f"GPU benchmark instantiated for device {self.device}") + + def set_work(self, work: int): + self.work = work + + def set_time(self, duration: float): + self.duration = duration + + def run(self) -> ProcessHandle | None: + if self.work is None and self.duration is None: + return None + + if self.duration is not None: + log_debug(f"Running GPU benchmark for {self.duration} seconds") + gpu_prog = [ + f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={self.device} {this_dir.joinpath('./gpu_benchmark')} {self.work} {self.duration}"] + else: + log_debug(f"Running GPU benchmark for {self.work} units of work") + gpu_prog = [ + f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={self.device} {this_dir.joinpath('./gpu_benchmark')} {self.work}"] + + p = subprocess.Popen(gpu_prog, shell=True) + return ProcessHandle(p) + +# def kill_process_and_children(proc): +# if proc is None: +# return +# try: +# parent = psutil.Process(proc.pid) +# children = parent.children(recursive=True) +# for child in children: +# child.kill() +# parent.kill() +# +# except psutil.NoSuchProcess: +# pass # Process is already dead + + + +# Utility code functions +######################## def lock_core(path_locked: pathlib.Path, path_cores: pathlib.Path) -> int: @@ -128,187 +349,6 @@ def unlock_core(path_locked: pathlib.Path, finally: lock.release() -def monitor_progress(proc, cpu_queue): - """Monitor progress from the CPU benchmark process.""" - for line in iter(proc.stdout.readline, ""): # No decode needed - line = line.strip() - if line.startswith("Progress:"): - try: - progress = float(line.split()[1].strip('%')) - cpu_queue.put(progress) - except (ValueError, IndexError): - pass - -def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, - cpu_threads: Optional[int] = 5, - mem_threads: Optional[int] = 5, - cpu_work: Optional[int] = 100, - core: Optional[int] = None, - total_mem: Optional[int] = None) -> List: - """ - Run CPU and memory benchmark. - - :param cpu_queue: Queue to push CPU benchmark progress as a float. - :type cpu_queue: multiprocessing.Queue - :param cpu_threads: Number of threads for CPU benchmark. - :type cpu_threads: Optional[int] - :param mem_threads: Number of threads for memory benchmark. - :type mem_threads: Optional[int] - :param cpu_work: Total work units for CPU benchmark. - :type cpu_work: Optional[int] - :param core: Core to pin the benchmark processes to. - :type core: Optional[int] - :param total_mem: Total memory to use for memory benchmark. - :type total_mem: Optional[float] - - :return: Lists of CPU and memory subprocesses. - :rtype: List - """ - total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" - cpu_work_per_thread = int(cpu_work / cpu_threads) - - cpu_procs = [] - mem_procs = [] - cpu_prog = [f"{this_dir.joinpath('cpu-benchmark')}", f"{cpu_work_per_thread}"] - mem_prog = ["stress-ng", "--vm", f"{mem_threads}", - "--vm-bytes", f"{total_mem}", "--vm-keep"] - - for i in range(cpu_threads): - cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - - # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) - if core: - os.sched_setaffinity(cpu_proc.pid, {core}) - cpu_procs.append(cpu_proc) - - # Start a thread to monitor the progress of each CPU benchmark process - monitor_thread = multiprocessing.Process(target=monitor_progress, args=(cpu_proc, cpu_queue)) - monitor_thread.start() - - if mem_threads > 0: - # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows - mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) - if core: - os.sched_setaffinity(mem_proc.pid, {core}) - mem_procs.append(mem_proc) - - return cpu_procs, mem_procs - - -def io_read_benchmark_user_input_data_size(inputs, - rundir=None, - memory_limit=None): - if memory_limit is None: - memory_limit = -1 - memory_limit = int(memory_limit) - log_debug("Starting IO Read Benchmark...") - for file, size in inputs.items(): - with open(rundir.joinpath(file), "rb") as fp: - log_debug(f"Reading '{file}'") - chunk_size = min(size, memory_limit) - while fp.read(chunk_size): - pass - log_debug("Completed IO Read Benchmark!") - - -def io_write_benchmark_user_input_data_size(outputs, - rundir=None, - memory_limit=None): - if memory_limit is None: - memory_limit = sys.maxsize - memory_limit = int(memory_limit) - for file_name, file_size in outputs.items(): - log_debug(f"Writing output file '{file_name}'") - file_size_todo = file_size - while file_size_todo > 0: - with open(rundir.joinpath(file_name), "ab") as fp: - chunk_size = min(file_size_todo, memory_limit) - file_size_todo -= fp.write(os.urandom(int(chunk_size))) - - -def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None): - """Alternate between reading and writing to a file, ensuring read only happens after write.""" - - if memory_limit is None: - memory_limit = 10 * 1024 * 1024 # sys.maxsize - memory_limit = int(memory_limit) - - # queue will have messages in the form (cpu_percent_completed) - # Get the last message and trash the rest - - # Create empty files - for name in outputs: - open(rundir.joinpath(name), "wb").close() - - io_completed = 0 - bytes_read = { - name: 0 - for name in inputs - } - bytes_written = { - name: 0 - for name in outputs - } - - # get size of inputs - inputs = { - name: os.path.getsize(rundir.joinpath(name)) - for name in inputs - } - - while io_completed < 100: - cpu_percent = max(io_completed, cpu_queue.get()) - while True: # Get the last message - try: - cpu_percent = max(io_completed, cpu_queue.get_nowait()) - except queue.Empty: - break - - log_debug(f"CPU Percent: {cpu_percent}") - if cpu_percent: - bytes_to_read = { - name: int(size * (cpu_percent / 100) - bytes_read[name]) - for name, size in inputs.items() - } - bytes_to_write = { - name: int(size * (cpu_percent / 100) - bytes_written[name]) - for name, size in outputs.items() - } - io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit) - io_write_benchmark_user_input_data_size(bytes_to_write, rundir, memory_limit=memory_limit) - - bytes_read = { - name: bytes_read[name] + bytes_to_read[name] - for name in bytes_to_read - } - bytes_written = { - name: bytes_written[name] + bytes_to_write[name] - for name in bytes_to_write - } - - log_debug(f"Bytes Read: {bytes_read}") - log_debug(f"Bytes Written: {bytes_written}") - - io_completed = cpu_percent - - if io_completed >= 100: - break - -def get_available_gpus(): - proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, _ = proc.communicate() - df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ") - return df[df["utilization.gpu"] <= 5].index.to_list() - - -def gpu_benchmark(time: int = 100, - work: int = 100, - device: int = 0): #work, device - - gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"] - log_debug(f"Running GPU Benchmark: {gpu_prog}") - subprocess.Popen(gpu_prog, shell=True) - def get_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() @@ -319,22 +359,28 @@ def get_parser() -> argparse.ArgumentParser: parser.add_argument("--path-lock", default=None, help="Path to lock file.") parser.add_argument("--path-cores", default=None, help="Path to cores file.") parser.add_argument("--cpu-work", default=None, help="Amount of CPU work.") + parser.add_argument("--num-chunks", default=10, help="Number of chunks used for pipelining I/O and " + "computation throughout the execution (fewer chunks may be used " + "if amounts of work and or input/output file sizes are too small).") parser.add_argument("--gpu-work", default=None, help="Amount of GPU work.") - parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete the task (overrides CPU and GPU works).") - parser.add_argument("--mem", type=float, default=None, help="Max amount (in MB) of memory consumption.") - parser.add_argument("--output-files", help="output file names with sizes in bytes as a JSON dictionary " + parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete " + "the computational portion of the benchmark (overrides CPU and GPU works). " + "Is only approximate since I/O time may make the overall time longer.") + parser.add_argument("--mem", type=float, default=None, help="Maximum memory consumption (in MB).") + parser.add_argument("--output-files", help="Output file names with sizes in bytes as a JSON dictionary " "(e.g., --output-files {\\\"file1\\\": 1024, \\\"file2\\\": 2048}).") - parser.add_argument("--input-files", help="input files names as a JSON array " + parser.add_argument("--input-files", help="Input files names as a JSON array " "(e.g., --input-files [\\\"file3\\\", \\\"file4\\\"]).") - parser.add_argument("--debug", action="store_true", help="Enable debug messages.") + parser.add_argument("--silent", action="store_true", help="Disable all log messages.") + parser.add_argument("--debug", action="store_true", help="Enable debug log messages.") parser.add_argument("--with-flowcept", action="store_true", default=False, help="Enable Flowcept monitoring.") parser.add_argument("--workflow_id", default=None, help="Id to group tasks in a workflow.") return parser - + def begin_flowcept(args): - log_info("Running with Flowcept.") + log_debug("Running with Flowcept.") from flowcept import Flowcept, FlowceptTask # TODO: parametrize to allow storing individual tasks f = Flowcept(workflow_id=args.workflow_id, @@ -350,6 +396,35 @@ def end_flowcept(flowcept, flowcept_task): flowcept.stop() +def compute_num_chunks(args): + # Compute the (feasible number of chunks) + min_chunk_size_time = 1.0 # At least 1 second per chunk, if we're doing time-based + # TODO: Pick reasonable factors below so that a chunk takes about min_chunk_size_time sec on a reasonable machine + min_chunk_size_cpu_work = 3000000 * min_chunk_size_time # 1s on my MacBook Pro + min_chunk_size_gpu_work = 30000000 * min_chunk_size_time # unknown..... + + if args.time: + num_chunks = min(int(args.num_chunks), int(float(args.time) / min_chunk_size_time)) + else: + if args.cpu_work: + num_chunks_cpu = min(int(args.num_chunks), int(float(args.cpu_work) / min_chunk_size_cpu_work)) + else: + num_chunks_cpu = 1 + if args.gpu_work: + num_chunks_gpu = min(int(args.num_chunks), int(float(args.gpu_work) / min_chunk_size_gpu_work)) + else: + num_chunks_gpu = 1 + num_chunks = min(num_chunks_cpu, num_chunks_gpu) + + num_chunks = max(num_chunks, 1) # The above computations may say "zero" + return num_chunks + +def kill_current_handles(handles: list[ProcessHandle]): + for handle in handles: + if handle is not None and handle.is_alive(): + handle.terminate_along_with_children() + + def main(): """Main program.""" parser = get_parser() @@ -359,6 +434,8 @@ def main(): if args.with_flowcept: flowcept, flowcept_task = begin_flowcept(args) + if args.silent: + logging.getLogger().setLevel(logging.NOTSET) if args.debug: logging.getLogger().setLevel(logging.DEBUG) @@ -372,132 +449,164 @@ def main(): path_cores = pathlib.Path(args.path_cores) core = lock_core(path_locked, path_cores) - log_info(f"Starting {args.name} Benchmark") - - mem_bytes = args.mem * 1024 * 1024 if args.mem else None - - procs = [] - io_proc = None - outputs_dict = {} - - cpu_queue = multiprocessing.Queue() - - log_debug(f"Working directory: {os.getcwd()}") - - # Deal with input/output files if any + if not args.time and (not args.cpu_work and not args.gpu_work): + log_error("At least one of --time, --cpu-work, or --gpu-work should be provided.") + sys.exit(1) + + # Compute the (feasible) number of chunks based on the arguments + num_chunks = compute_num_chunks(args) + log_debug(f"Executing benchmark with {num_chunks} chunks.") + + # At this point we know the number of chunks, and we can just iterate as follows (N = num_chunks + 2) + # step 0 sep 1 step 2 step N-3 step N-2 step N-1 + # READ READ READ ... READ - - + # - COMPUTE_CPU COMPUTE_CPU ... COMPUTE_CPU COMPUTE_CPU - + # - COMPUTE_GPU COMPUTE_GPU ... COMPUTE_GPU COMPUTE_GPU - + # - - WRITE ... WRITE WRITE WRITE + # (Intermediate READ and WRITE steps may do nothing for some files if there is too little data) + + # Construct a list of benchmark steps, where each step is a list of IO benchmarks (Read or Write) + # and a list of non-IO benchmarks (CPU, GPU). Initially these are all "do nothing" benchmarks + N = num_chunks + 2 + steps = [{"io_read_benchmark": IOReadBenchmark(), + "io_write_benchmark": IOWriteBenchmark(), + "cpu_benchmark": CPUBenchmark(cpu_threads=int(10 * args.percent_cpu), + mem_threads=int(10 - 10 * args.percent_cpu), + core=core, + total_mem=args.mem * 1000 * 1000 if args.mem else None), + "gpu_benchmark": GPUBenchmark()} for i in range(N)] + + min_chunk_size_data = 1000 # 1KB per chunk at a minimum for each input / output file, otherwise the file + # is read/written all at once at the beginning/end + + # Augment I/O read benchmarks for each input file cleaned_input = "{}" if args.input_files is None else re.sub(r'\\+', '', args.input_files) + try: + input_files = json.loads(cleaned_input) + except json.JSONDecodeError as e: + log_error(f"Failed to decode --input-files JSON string argument: {e}") + sys.exit(1) + + for file_path in input_files: + file_size = os.path.getsize(file_path) + # If file is zero-size, do nothing + if file_size == 0: + continue + opened_file = open(rundir / file_path, "rb") + # If file is "small" only read it at the beginning + if file_size < num_chunks * min_chunk_size_data: + steps[0]["io_read_benchmark"].add_read_operation(file_path, opened_file, file_size) + continue + # Otherwise, read it in chunks + for step in range(0, N-2): + num_bytes = file_size // num_chunks + (file_size % num_chunks > step) + steps[step]["io_read_benchmark"].add_read_operation(file_path, opened_file, num_bytes) + + # Augment I/O write benchmarks for each output file cleaned_output = "{}" if args.output_files is None else re.sub(r'\\+', '', args.output_files) - # print("CLEANED INPUT", cleaned_input) - # print("CLEANED OUTPUT", cleaned_output) - - if cleaned_input or cleaned_output: - log_debug("Starting IO benchmark...") - - # Attempt to parse the cleaned string - try: - outputs_dict = json.loads(cleaned_output) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --output-files JSON string argument: {e}") - sys.exit(1) - - try: - inputs_array = json.loads(cleaned_input) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --input-files JSON string argument: {e}") - sys.exit(1) - - # print("OUTPUT", outputs_dict) - # print("INPUTS", inputs_array) - - # Create a multiprocessing event that in the first run is set to True - write_done_event = multiprocessing.Event() - # Set this to True to allow the first read to happen - write_done_event.set() - # Print the value of the event - # print("Event Value:", write_done_event.is_set()) - - io_proc = multiprocessing.Process( - target=io_alternate, - args=(inputs_array, outputs_dict, cpu_queue, mem_bytes, rundir, write_done_event) - ) - io_proc.start() - procs.append(io_proc) - - if args.gpu_work: - log_info(f"Starting GPU Benchmark for {args.name}...") - available_gpus = get_available_gpus() #checking for available GPUs - - if not available_gpus: - log_error("No GPU available") - sys.exit(1) - else: - device = available_gpus[0] - log_debug(f"Running on GPU {device}") - - if args.time: - log_debug(f" Time:{args.time}, Work:{args.gpu_work}, Device:{device}") - gpu_benchmark(time=int(args.time), work=int(args.gpu_work), device=device) - else: - gpu_benchmark(work=int(args.gpu_work), device=device) - + try: + output_files = json.loads(cleaned_output) + except json.JSONDecodeError as e: + log_error(f"Failed to decode --output-files JSON string argument: {e}") + sys.exit(1) + + for file_path, file_size in output_files.items(): + # Open the file for writing no matter what (it should be created) + opened_file = open(rundir / file_path, "wb") + # If file is zero-size, do nothing + if file_size == 0: + continue + # If file is "small" only write it at the end + if file_size < num_chunks * min_chunk_size_data: + steps[N-1]["io_write_benchmark"].add_write_operation(file_path, opened_file, file_size) + continue + # Otherwise, write it in chunks + for step in range(2, N): + num_bytes = file_size // num_chunks + (file_size % num_chunks > (step - 2)) + steps[step]["io_write_benchmark"].add_write_operation(file_path, opened_file, num_bytes) + + # Augment CPU benchmark with computation (if need be) if args.cpu_work: - log_info(f"Starting CPU and Memory Benchmarks for {args.name}...") - if core: - log_debug(f"{args.name} acquired core {core}") - - mem_threads=int(10 - 10 * args.percent_cpu) - cpu_procs, mem_procs = cpu_mem_benchmark(cpu_queue=cpu_queue, - cpu_threads=int(10 * args.percent_cpu), - mem_threads=mem_threads, - cpu_work=sys.maxsize if args.time else int(args.cpu_work), - core=core, - total_mem=mem_bytes) - - procs.extend(cpu_procs) if args.time: - time.sleep(int(args.time)) - for proc in procs: - if isinstance(proc, multiprocessing.Process): - if proc.is_alive(): - proc.terminate() - elif isinstance(proc, subprocess.Popen): - proc.terminate() + for step in range(1, N-1): + steps[step]["cpu_benchmark"].set_infinite_work() else: - for proc in procs: - if isinstance(proc, subprocess.Popen): - proc.wait() - if io_proc is not None and io_proc.is_alive(): - # io_proc.terminate() - io_proc.join() - - for mem_proc in mem_procs: - try: - os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails - except subprocess.TimeoutExpired: - log_debug("Memory process did not terminate; force-killing.") - # As a fallback, use pkill if any remaining instances are stuck - subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() - - log_debug("Completed CPU and Memory Benchmarks!") - - # NOTE: If you would like to run only IO add time.sleep(2) - # Check if all procs are done, if not, kill them - log_debug("Checking if all processes are done...") - for proc in procs: - if isinstance(proc, multiprocessing.Process): - if proc.is_alive(): - proc.terminate() - proc.join() - if isinstance(proc, subprocess.Popen): - proc.wait() + for step in range(1, N-1): + chunk_work = int(args.cpu_work) // num_chunks + (int(args.cpu_work) % num_chunks > step - 1) + steps[step]["cpu_benchmark"].set_work(chunk_work) + # Augment GPU benchmark with computation (if need be) + if args.gpu_work: + if args.time: + for step in range(1, N - 1): + steps[step]["gpu_benchmark"].set_device() + steps[step]["gpu_benchmark"].set_work(int(args.gpu_work)) + steps[step]["gpu_benchmark"].set_time(float(args.time)) + else: + for step in range(1, N - 1): + chunk_work = int(args.gpu_work) // num_chunks + (int(args.gpu_work) % num_chunks > step - 1) + steps[step]["gpu_benchmark"].set_device() + steps[step]["gpu_benchmark"].set_work(chunk_work) + + # All benchmarks have been specified, we can just go through the steps blindly + # log_info(f"Starting {args.name} Benchmark") + + current_proc_handles = [] + try: + for step_index, step in enumerate(steps): + log_debug(f"**** STEP {step_index} ***") + io_read_process = step["io_read_benchmark"].run() + current_proc_handles += [io_read_process] + io_write_process = step["io_write_benchmark"].run() + [cpu_benchmark_process, memory_benchmark_process] = step["cpu_benchmark"].run() + current_proc_handles += [cpu_benchmark_process, memory_benchmark_process] + gpu_benchmark_process = step["gpu_benchmark"].run() + current_proc_handles += [gpu_benchmark_process] + current_proc_handles[:] = [io_read_process, cpu_benchmark_process, memory_benchmark_process, gpu_benchmark_process] + + # If time based, sleep the required amount of time and kill the process + if args.time: + if cpu_benchmark_process is not None or gpu_benchmark_process is not None: + time.sleep(float(args.time) / num_chunks) + if cpu_benchmark_process is not None: + cpu_benchmark_process.terminate_along_with_children() + if gpu_benchmark_process is not None: + gpu_benchmark_process.terminate() + + # Wait for the I/O processes to be done + if io_read_process is not None: + io_read_process.wait() + if io_write_process is not None: + io_write_process.wait() + + # Wait for the CPU process to be done + if cpu_benchmark_process is not None: + cpu_benchmark_process.wait() + + # Kill the Memory process + if memory_benchmark_process is not None: + memory_benchmark_process.terminate_along_with_children() + memory_benchmark_process.wait() + + # Wait for the GPU Process to be done + if gpu_benchmark_process is not None: + gpu_benchmark_process.wait() + except KeyboardInterrupt: + log_debug("Detected Keyboard interrupt: cleaning up processes...") + kill_current_handles(current_proc_handles) + finally: + log_debug("Aborting: cleaning up processes...") + kill_current_handles(current_proc_handles) + + + # Cleanups if core: unlock_core(path_locked, path_cores, core) if args.with_flowcept: end_flowcept(flowcept, flowcept_task) - log_info(f"Benchmark {args.name} completed!") + log_info(f"{args.name} benchmark completed") if __name__ == "__main__": main() diff --git a/pyproject.toml b/pyproject.toml index 47a72e57..4e3179b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,11 +30,11 @@ dependencies = [ "networkx", "numpy", "pandas", + "psutil", "python-dateutil", "requests", "scipy>=1.16.1", "pyyaml", - "pandas", "shortuuid", "stringcase", "filelock", diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 724425f8..4e824d12 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -96,7 +96,8 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command= user="wfcommons", privileged=True, tty=True, - detach=True + detach=True, + init=True # For zombies ) # Installing WfCommons on container diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 2eccca46..0ccc010c 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -130,7 +130,7 @@ def run_workflow_dask(container, num_tasks, str_dirpath): exit_code, output = container.exec_run("python ./dask_workflow.py", user="wfcommons", stdout=True, stderr=True) # Check sanity assert (exit_code == 0) - assert (output.decode().count("completed!") == num_tasks) + assert (output.decode().count("benchmark completed") == num_tasks) # TODO: Look at the (I think) generated run.json file on the container? def run_workflow_parsl(container, num_tasks, str_dirpath): @@ -163,7 +163,7 @@ def run_workflow_bash(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", user="wfcommons", stdout=True, stderr=True) # Check sanity assert (exit_code == 0) - assert (output.decode().count("completed") == num_tasks) + assert (output.decode().count("benchmark completed") == num_tasks) def run_workflow_taskvine(container, num_tasks, str_dirpath): # Run the workflow! @@ -187,7 +187,6 @@ def run_workflow_cwl(container, num_tasks, str_dirpath): # Note that the input file is hardcoded and Blast-specific exit_code, output = container.exec_run(cmd="cwltool ./main.cwl --split_fasta_00000001_input ./data/workflow_infile_0001 ", user="wfcommons", stdout=True, stderr=True) - # print(output.decode()) # Check sanity assert (exit_code == 0) # this below is ugly (the 3 is for "workflow", "compile_output_files" and "compile_log_files", @@ -199,7 +198,6 @@ def run_workflow_streamflow(container, num_tasks, str_dirpath): # Note that the input file is hardcoded and Blast-specific exit_code, output = container.exec_run(cmd="streamflow run ./streamflow.yml", user="wfcommons", stdout=True, stderr=True) - # print(output.decode()) # Check sanity assert (exit_code == 0) # 2 extra "COMPLETED Step" ("COMPLETED Step /compile_output_files", "COMPLETED Step /compile_log_files") diff --git a/tests/wfbench/test_wfbench.py b/tests/wfbench/test_wfbench.py index ea86809f..03dd13d3 100644 --- a/tests/wfbench/test_wfbench.py +++ b/tests/wfbench/test_wfbench.py @@ -54,7 +54,6 @@ def _workflow_as_expected(dirpath: pathlib.Path, with json_path.open("r") as f: generated_json = json.load(f) - # Check the number of tasks assert(len(workflow.tasks) == len(generated_json['workflow']['specification']['tasks'])) @@ -170,7 +169,7 @@ def test_create_from_recipe(self) -> None: # Run the workflow sys.stderr.write("Running workflow...\n") exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", stdout=True, stderr=True) - + print(output.decode()) # Kill the container _shutdown_docker_container_and_remove_image(container) diff --git a/wfcommons/wfbench/bench.py b/wfcommons/wfbench/bench.py index 2f013546..7bee4eb0 100644 --- a/wfcommons/wfbench/bench.py +++ b/wfcommons/wfbench/bench.py @@ -86,6 +86,7 @@ def create_benchmark_from_synthetic_workflow( percent_cpu: Union[float, Dict[str, float]] = 0.6, cpu_work: Union[int, Dict[str, int]] = None, gpu_work: Union[int, Dict[str, int]] = None, + num_chunks: Optional[int] = 10, time: Optional[int] = None, mem: Optional[float] = None, lock_files_folder: Optional[pathlib.Path] = None, @@ -102,6 +103,8 @@ def create_benchmark_from_synthetic_workflow( :type cpu_work: Union[int, Dict[str, int]] :param gpu_work: Maximum GPU work per workflow task. :type gpu_work: Union[int, Dict[str, int]] + :param num_chunks: Number of chunks for pipelining I/O and computation for each task execution. + :type num_chunks: Optional[int] :param time: Time limit for running each task (in seconds). :type time: Optional[int] :param mem: Maximum amount of memory consumption per task (in MB). @@ -164,6 +167,7 @@ def create_benchmark_from_synthetic_workflow( task_percent_cpu, task_cpu_work, task_gpu_work, + num_chunks, time, task_memory, lock_files_folder, @@ -252,6 +256,7 @@ def create_benchmark(self, percent_cpu: Union[float, Dict[str, float]] = 0.6, cpu_work: Union[int, Dict[str, int]] = None, gpu_work: Union[int, Dict[str, int]] = None, + num_chunks: Optional[int] = 10, time: Optional[int] = None, data: Optional[int] = 0, mem: Optional[float] = None, @@ -269,6 +274,8 @@ def create_benchmark(self, :type cpu_work: Union[int, Dict[str, int]] :param gpu_work: GPU work per workflow task. :type gpu_work: Union[int, Dict[str, int]] + :param num_chunks: Number of chunks for pipelining I/O and computation for each task execution. + :type num_chunks: Optional[int] :param time: Time limit for running each task (in seconds). :type time: Optional[int] :param data: Total workflow data footprint (in MB). @@ -308,6 +315,7 @@ def create_benchmark(self, cpu_work, gpu_work, time, + num_chunks, mem, lock_files_folder, cores, @@ -367,6 +375,7 @@ def _set_argument_parameters(self, cpu_work: Union[int, Dict[str, int]], gpu_work: Union[int, Dict[str, int]], time: Optional[int], + num_chunks: Optional[int], mem: Optional[float], lock_files_folder: Optional[pathlib.Path], cores: Optional[pathlib.Path], @@ -381,6 +390,7 @@ def _set_argument_parameters(self, params.extend(cpu_params) gpu_params = self._generate_task_gpu_params(task, gpu_work) params.extend(gpu_params) + params.extend([f"--num-chunks {num_chunks}"]) if mem: params.extend([f"--mem {mem}"])