From 4d7e4af62555e2fb5ca79ddf41defbf8e394cb03 Mon Sep 17 00:00:00 2001 From: David Alonso Date: Wed, 16 Aug 2023 22:23:21 +0100 Subject: [PATCH 1/3] first working glamdring version --- mbatch/mbatch.py | 88 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 71 insertions(+), 17 deletions(-) diff --git a/mbatch/mbatch.py b/mbatch/mbatch.py index 9e4b69c..33d23c1 100644 --- a/mbatch/mbatch.py +++ b/mbatch/mbatch.py @@ -89,7 +89,8 @@ def gitcheck(config,cname,package): def get_command(global_vals, stage_configs, stage): stage_config = stage_configs[stage] execution = stage_config['exec'] - if not(execution in ['python','python3']): + if (not(execution in ['python','python3']) and + not('python' in execution)): # Execution may sometimes be an absolute path raise_exception("Only Python execution " "is currently supported. " "Shell support will be " @@ -119,11 +120,11 @@ def run_local(cmds,dry_run=False,verbose=True): cmds = [] for c in icmds: cmds = cmds + shlex.split(c) - if verbose: print(f"Running {cmds} locally...") if dry_run: if verbose: print(' '.join(cmds)) return str(random.randint(1,32768)) else: + if verbose: print(f"Running {cmds} locally...") sp = subprocess.run(cmds,stderr=sys.stderr, stdout=subprocess.PIPE) output = sp.stdout.decode("utf-8") if verbose: print(output) @@ -198,7 +199,44 @@ def get_tpc(sbatch_config,constraint,partition): fprint(HTML(f"WARNING: Number of threads per core not specified. Assuming hyperthreading by factor 2x ...")) tpc = 2 return tpc - + +def submit_glamdring(stage, parallel_config, execution, + script, pargs, dry_run, output_dir, + project, root_dir, depstr): + command = ' '.join([execution,script,pargs]) + f' --output-dir {output_dir}' + + nproc = parallel_config.get('nproc', 1) + ncores = parallel_config.get('ncores_per_node', 1) + openmp = parallel_config.get('openmp', False) + queue = parallel_config.get('queue', 'cmb') + memory_gb = parallel_config.get('memory_gb', 0.1*ncores*nproc) + mem = "%.1lf" % (memory_gb/(nproc*ncores)) + out_file_root = get_out_file_root(root_dir,stage,project,'glamdring') + out_file = out_file_root+'_%j.txt' + + # Construct glamdring launch command + cmd = f'./addqueue -q {queue} ' + if openmp: + cmd += '-s ' + cmd += f'-n {nproc}x{ncores} ' + cmd += f'-m {mem} ' + if depstr: + cmd += depstr + ' ' + cmd += '--sbatch ' + cmd += f'-o {out_file} ' + cmd += command + cmd = cmd.split() + + # Submit job + out = run_local(cmd, dry_run=dry_run, verbose=True) + if dry_run: + return out + # Retrieve jobid + jobid = out.split()[3] + print("JOB ID is ", jobid) + return jobid + + def submit_slurm(stage,sbatch_config,parallel_config,execution, script,pargs,dry_run,output_dir,site,project,root_dir, depstr=None,account=None,qos=None,partition=None,constraint=None): @@ -532,6 +570,7 @@ def main(): parser.add_argument("-q", "--qos", type=str, default=None,help="QOS name") parser.add_argument("-p", "--partition", type=str, default=None,help="Partition name") parser.add_argument("-c", "--constraint", type=str, default=None,help="Constraint name") + parser.add_argument('--glamdring', action='store_true', help='Use if running on Oxford\'s Glamdring cluster') args = parser.parse_args() if args.force_local and args.force_slurm: raise_exception("You can\'t force both local and SLURM.") @@ -543,8 +582,8 @@ def main(): root_dir = config['root_dir'] # Check if we have SLURM - have_slurm = check_slurm() - if not(have_slurm): + have_slurm = check_slurm() if not args.glamdring else False + if not(have_slurm) and not(args.glamdring): print("No SLURM detected. We will be locally " "executing commands serially.") if args.dry_run: @@ -627,7 +666,10 @@ def main(): site = detect_site() if args.site is None else args.site sbatch_config = load_template(site) else: - site = 'local' + if args.glamdring: + site = 'glamdring' + else: + site = 'local' ########################### reuse_stages = [] @@ -651,9 +693,9 @@ def main(): else: last_job = max([sint(re.search(rf'{root}(.*?){suffix}', f).group(1)) for f in fs]) output = run_local(['sacct', '-j', - str(last_job), '--format=State', - '--parsable2'], - verbose=False).split('\n') + str(last_job), '--format=State', + '--parsable2'], + verbose=False).split('\n') completed = True for i,line in enumerate(output): if i==0: @@ -722,10 +764,11 @@ def main(): ########################### - is_sbatch = (have_slurm or args.force_slurm) and not(args.force_local) - is_local = not(have_slurm) or args.force_local - if sum([int(x) for x in [is_sbatch,is_local]])!=1: raise_exception("Inconsistency in submission vs. local. Report bug.") - + is_sbatch = (have_slurm or args.force_slurm) and not(args.force_local) and not(args.glamdring) + is_glam = args.glamdring and not (args.force_local or args.force_slurm) + is_local = (not(have_slurm) and not args.glamdring) or args.force_local + if sum([int(x) for x in [is_sbatch,is_local,is_glam]])!=1: raise_exception("Inconsistency in submission vs. local. Report bug.") + # Check if any reused stages have dependencies that are not reused # If so we will not reuse those stages # Algorithm is linear since `stages` is already sorted @@ -750,7 +793,7 @@ def main(): if not(reply): sys.exit(0) - + # Make project directory proj_dir = get_project_dir(root_dir,args.project) os.makedirs(proj_dir, exist_ok=True) @@ -782,13 +825,17 @@ def main(): if not(s in args.skip) and not(s in reuse_stages): jlist.append(jobids[s]) if len(jlist)>=1: - depstr = ':'.join(jlist) - depstr = "--dependency=afterok:" + depstr + if is_glam: + depstr = ','.join(jlist) + depstr = '--runafter '+depstr + else: + depstr = ':'.join(jlist) + depstr = "--dependency=afterok:" + depstr else: depstr = None else: depstr = None - + if is_sbatch: jobid = submit_slurm(stage,sbatch_config, copy.deepcopy(ostages[stage]).get('parallel',None), @@ -801,6 +848,13 @@ def main(): qos=args.qos, partition=args.partition, constraint=args.constraint) + if is_glam: + jobid = submit_glamdring(stage, + copy.deepcopy(ostages[stage]).get('parallel',{}), + execution, script, pargs, dry_run=args.dry_run, + output_dir=output_dir, + project=args.project, + root_dir=root_dir, depstr=depstr) if is_local: if pargs=='': cmds = [execution,script, '--output-dir',output_dir] From a779ce4a23d1bf06b25a8fc67c25fb9b0aefd734 Mon Sep 17 00:00:00 2001 From: David Alonso Date: Wed, 16 Aug 2023 22:31:02 +0100 Subject: [PATCH 2/3] cleanup --- mbatch/mbatch.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mbatch/mbatch.py b/mbatch/mbatch.py index 33d23c1..58f0410 100644 --- a/mbatch/mbatch.py +++ b/mbatch/mbatch.py @@ -215,7 +215,7 @@ def submit_glamdring(stage, parallel_config, execution, out_file = out_file_root+'_%j.txt' # Construct glamdring launch command - cmd = f'./addqueue -q {queue} ' + cmd = f'addqueue -q {queue} ' if openmp: cmd += '-s ' cmd += f'-n {nproc}x{ncores} ' @@ -233,7 +233,6 @@ def submit_glamdring(stage, parallel_config, execution, return out # Retrieve jobid jobid = out.split()[3] - print("JOB ID is ", jobid) return jobid From 7c551b4c5a284decfd4ce7e12f2601f3c7d022e8 Mon Sep 17 00:00:00 2001 From: David Alonso Date: Thu, 17 Aug 2023 09:14:13 +0100 Subject: [PATCH 3/3] runafter job concatenation --- mbatch/mbatch.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mbatch/mbatch.py b/mbatch/mbatch.py index 58f0410..4ef2200 100644 --- a/mbatch/mbatch.py +++ b/mbatch/mbatch.py @@ -215,7 +215,7 @@ def submit_glamdring(stage, parallel_config, execution, out_file = out_file_root+'_%j.txt' # Construct glamdring launch command - cmd = f'addqueue -q {queue} ' + cmd = f'./addqueue -q {queue} -c {stage} ' if openmp: cmd += '-s ' cmd += f'-n {nproc}x{ncores} ' @@ -824,11 +824,10 @@ def main(): if not(s in args.skip) and not(s in reuse_stages): jlist.append(jobids[s]) if len(jlist)>=1: + depstr = ':'.join(jlist) if is_glam: - depstr = ','.join(jlist) depstr = '--runafter '+depstr else: - depstr = ':'.join(jlist) depstr = "--dependency=afterok:" + depstr else: depstr = None