diff --git a/mbatch/mbatch.py b/mbatch/mbatch.py index 9e4b69c..4ef2200 100644 --- a/mbatch/mbatch.py +++ b/mbatch/mbatch.py @@ -89,7 +89,8 @@ def gitcheck(config,cname,package): def get_command(global_vals, stage_configs, stage): stage_config = stage_configs[stage] execution = stage_config['exec'] - if not(execution in ['python','python3']): + if (not(execution in ['python','python3']) and + not('python' in execution)): # Execution may sometimes be an absolute path raise_exception("Only Python execution " "is currently supported. " "Shell support will be " @@ -119,11 +120,11 @@ def run_local(cmds,dry_run=False,verbose=True): cmds = [] for c in icmds: cmds = cmds + shlex.split(c) - if verbose: print(f"Running {cmds} locally...") if dry_run: if verbose: print(' '.join(cmds)) return str(random.randint(1,32768)) else: + if verbose: print(f"Running {cmds} locally...") sp = subprocess.run(cmds,stderr=sys.stderr, stdout=subprocess.PIPE) output = sp.stdout.decode("utf-8") if verbose: print(output) @@ -198,7 +199,43 @@ def get_tpc(sbatch_config,constraint,partition): fprint(HTML(f"WARNING: Number of threads per core not specified. Assuming hyperthreading by factor 2x ...")) tpc = 2 return tpc - + +def submit_glamdring(stage, parallel_config, execution, + script, pargs, dry_run, output_dir, + project, root_dir, depstr): + command = ' '.join([execution,script,pargs]) + f' --output-dir {output_dir}' + + nproc = parallel_config.get('nproc', 1) + ncores = parallel_config.get('ncores_per_node', 1) + openmp = parallel_config.get('openmp', False) + queue = parallel_config.get('queue', 'cmb') + memory_gb = parallel_config.get('memory_gb', 0.1*ncores*nproc) + mem = "%.1lf" % (memory_gb/(nproc*ncores)) + out_file_root = get_out_file_root(root_dir,stage,project,'glamdring') + out_file = out_file_root+'_%j.txt' + + # Construct glamdring launch command + cmd = f'./addqueue -q {queue} -c {stage} ' + if openmp: + cmd += '-s ' + cmd += f'-n {nproc}x{ncores} ' + cmd += f'-m {mem} ' + if depstr: + cmd += depstr + ' ' + cmd += '--sbatch ' + cmd += f'-o {out_file} ' + cmd += command + cmd = cmd.split() + + # Submit job + out = run_local(cmd, dry_run=dry_run, verbose=True) + if dry_run: + return out + # Retrieve jobid + jobid = out.split()[3] + return jobid + + def submit_slurm(stage,sbatch_config,parallel_config,execution, script,pargs,dry_run,output_dir,site,project,root_dir, depstr=None,account=None,qos=None,partition=None,constraint=None): @@ -532,6 +569,7 @@ def main(): parser.add_argument("-q", "--qos", type=str, default=None,help="QOS name") parser.add_argument("-p", "--partition", type=str, default=None,help="Partition name") parser.add_argument("-c", "--constraint", type=str, default=None,help="Constraint name") + parser.add_argument('--glamdring', action='store_true', help='Use if running on Oxford\'s Glamdring cluster') args = parser.parse_args() if args.force_local and args.force_slurm: raise_exception("You can\'t force both local and SLURM.") @@ -543,8 +581,8 @@ def main(): root_dir = config['root_dir'] # Check if we have SLURM - have_slurm = check_slurm() - if not(have_slurm): + have_slurm = check_slurm() if not args.glamdring else False + if not(have_slurm) and not(args.glamdring): print("No SLURM detected. We will be locally " "executing commands serially.") if args.dry_run: @@ -627,7 +665,10 @@ def main(): site = detect_site() if args.site is None else args.site sbatch_config = load_template(site) else: - site = 'local' + if args.glamdring: + site = 'glamdring' + else: + site = 'local' ########################### reuse_stages = [] @@ -651,9 +692,9 @@ def main(): else: last_job = max([sint(re.search(rf'{root}(.*?){suffix}', f).group(1)) for f in fs]) output = run_local(['sacct', '-j', - str(last_job), '--format=State', - '--parsable2'], - verbose=False).split('\n') + str(last_job), '--format=State', + '--parsable2'], + verbose=False).split('\n') completed = True for i,line in enumerate(output): if i==0: @@ -722,10 +763,11 @@ def main(): ########################### - is_sbatch = (have_slurm or args.force_slurm) and not(args.force_local) - is_local = not(have_slurm) or args.force_local - if sum([int(x) for x in [is_sbatch,is_local]])!=1: raise_exception("Inconsistency in submission vs. local. Report bug.") - + is_sbatch = (have_slurm or args.force_slurm) and not(args.force_local) and not(args.glamdring) + is_glam = args.glamdring and not (args.force_local or args.force_slurm) + is_local = (not(have_slurm) and not args.glamdring) or args.force_local + if sum([int(x) for x in [is_sbatch,is_local,is_glam]])!=1: raise_exception("Inconsistency in submission vs. local. Report bug.") + # Check if any reused stages have dependencies that are not reused # If so we will not reuse those stages # Algorithm is linear since `stages` is already sorted @@ -750,7 +792,7 @@ def main(): if not(reply): sys.exit(0) - + # Make project directory proj_dir = get_project_dir(root_dir,args.project) os.makedirs(proj_dir, exist_ok=True) @@ -783,12 +825,15 @@ def main(): jlist.append(jobids[s]) if len(jlist)>=1: depstr = ':'.join(jlist) - depstr = "--dependency=afterok:" + depstr + if is_glam: + depstr = '--runafter '+depstr + else: + depstr = "--dependency=afterok:" + depstr else: depstr = None else: depstr = None - + if is_sbatch: jobid = submit_slurm(stage,sbatch_config, copy.deepcopy(ostages[stage]).get('parallel',None), @@ -801,6 +846,13 @@ def main(): qos=args.qos, partition=args.partition, constraint=args.constraint) + if is_glam: + jobid = submit_glamdring(stage, + copy.deepcopy(ostages[stage]).get('parallel',{}), + execution, script, pargs, dry_run=args.dry_run, + output_dir=output_dir, + project=args.project, + root_dir=root_dir, depstr=depstr) if is_local: if pargs=='': cmds = [execution,script, '--output-dir',output_dir]