Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 68 additions & 16 deletions mbatch/mbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def gitcheck(config,cname,package):
def get_command(global_vals, stage_configs, stage):
stage_config = stage_configs[stage]
execution = stage_config['exec']
if not(execution in ['python','python3']):
if (not(execution in ['python','python3']) and
not('python' in execution)): # Execution may sometimes be an absolute path
raise_exception("Only Python execution "
"is currently supported. "
"Shell support will be "
Expand Down Expand Up @@ -119,11 +120,11 @@ def run_local(cmds,dry_run=False,verbose=True):
cmds = []
for c in icmds:
cmds = cmds + shlex.split(c)
if verbose: print(f"Running {cmds} locally...")
if dry_run:
if verbose: print(' '.join(cmds))
return str(random.randint(1,32768))
else:
if verbose: print(f"Running {cmds} locally...")
sp = subprocess.run(cmds,stderr=sys.stderr, stdout=subprocess.PIPE)
output = sp.stdout.decode("utf-8")
if verbose: print(output)
Expand Down Expand Up @@ -198,7 +199,43 @@ def get_tpc(sbatch_config,constraint,partition):
fprint(HTML(f"<ansiyellow>WARNING: Number of threads per core not specified. Assuming hyperthreading by factor 2x ...</ansiyellow>"))
tpc = 2
return tpc


def submit_glamdring(stage, parallel_config, execution,
script, pargs, dry_run, output_dir,
project, root_dir, depstr):
command = ' '.join([execution,script,pargs]) + f' --output-dir {output_dir}'

nproc = parallel_config.get('nproc', 1)
ncores = parallel_config.get('ncores_per_node', 1)
openmp = parallel_config.get('openmp', False)
queue = parallel_config.get('queue', 'cmb')
memory_gb = parallel_config.get('memory_gb', 0.1*ncores*nproc)
mem = "%.1lf" % (memory_gb/(nproc*ncores))
out_file_root = get_out_file_root(root_dir,stage,project,'glamdring')
out_file = out_file_root+'_%j.txt'

# Construct glamdring launch command
cmd = f'./addqueue -q {queue} -c {stage} '
if openmp:
cmd += '-s '
cmd += f'-n {nproc}x{ncores} '
cmd += f'-m {mem} '
if depstr:
cmd += depstr + ' '
cmd += '--sbatch '
cmd += f'-o {out_file} '
cmd += command
cmd = cmd.split()

# Submit job
out = run_local(cmd, dry_run=dry_run, verbose=True)
if dry_run:
return out
# Retrieve jobid
jobid = out.split()[3]
return jobid


def submit_slurm(stage,sbatch_config,parallel_config,execution,
script,pargs,dry_run,output_dir,site,project,root_dir,
depstr=None,account=None,qos=None,partition=None,constraint=None):
Expand Down Expand Up @@ -532,6 +569,7 @@ def main():
parser.add_argument("-q", "--qos", type=str, default=None,help="QOS name")
parser.add_argument("-p", "--partition", type=str, default=None,help="Partition name")
parser.add_argument("-c", "--constraint", type=str, default=None,help="Constraint name")
parser.add_argument('--glamdring', action='store_true', help='Use if running on Oxford\'s Glamdring cluster')
args = parser.parse_args()
if args.force_local and args.force_slurm: raise_exception("You can\'t force both local and SLURM.")

Expand All @@ -543,8 +581,8 @@ def main():
root_dir = config['root_dir']

# Check if we have SLURM
have_slurm = check_slurm()
if not(have_slurm):
have_slurm = check_slurm() if not args.glamdring else False
if not(have_slurm) and not(args.glamdring):
print("No SLURM detected. We will be locally "
"executing commands serially.")
if args.dry_run:
Expand Down Expand Up @@ -627,7 +665,10 @@ def main():
site = detect_site() if args.site is None else args.site
sbatch_config = load_template(site)
else:
site = 'local'
if args.glamdring:
site = 'glamdring'
else:
site = 'local'

###########################
reuse_stages = []
Expand All @@ -651,9 +692,9 @@ def main():
else:
last_job = max([sint(re.search(rf'{root}(.*?){suffix}', f).group(1)) for f in fs])
output = run_local(['sacct', '-j',
str(last_job), '--format=State',
'--parsable2'],
verbose=False).split('\n')
str(last_job), '--format=State',
'--parsable2'],
verbose=False).split('\n')
completed = True
for i,line in enumerate(output):
if i==0:
Expand Down Expand Up @@ -722,10 +763,11 @@ def main():

###########################

is_sbatch = (have_slurm or args.force_slurm) and not(args.force_local)
is_local = not(have_slurm) or args.force_local
if sum([int(x) for x in [is_sbatch,is_local]])!=1: raise_exception("Inconsistency in submission vs. local. Report bug.")

is_sbatch = (have_slurm or args.force_slurm) and not(args.force_local) and not(args.glamdring)
is_glam = args.glamdring and not (args.force_local or args.force_slurm)
is_local = (not(have_slurm) and not args.glamdring) or args.force_local
if sum([int(x) for x in [is_sbatch,is_local,is_glam]])!=1: raise_exception("Inconsistency in submission vs. local. Report bug.")

# Check if any reused stages have dependencies that are not reused
# If so we will not reuse those stages
# Algorithm is linear since `stages` is already sorted
Expand All @@ -750,7 +792,7 @@ def main():
if not(reply):
sys.exit(0)


# Make project directory
proj_dir = get_project_dir(root_dir,args.project)
os.makedirs(proj_dir, exist_ok=True)
Expand Down Expand Up @@ -783,12 +825,15 @@ def main():
jlist.append(jobids[s])
if len(jlist)>=1:
depstr = ':'.join(jlist)
depstr = "--dependency=afterok:" + depstr
if is_glam:
depstr = '--runafter '+depstr
else:
depstr = "--dependency=afterok:" + depstr
else:
depstr = None
else:
depstr = None

if is_sbatch:
jobid = submit_slurm(stage,sbatch_config,
copy.deepcopy(ostages[stage]).get('parallel',None),
Expand All @@ -801,6 +846,13 @@ def main():
qos=args.qos,
partition=args.partition,
constraint=args.constraint)
if is_glam:
jobid = submit_glamdring(stage,
copy.deepcopy(ostages[stage]).get('parallel',{}),
execution, script, pargs, dry_run=args.dry_run,
output_dir=output_dir,
project=args.project,
root_dir=root_dir, depstr=depstr)
if is_local:
if pargs=='':
cmds = [execution,script, '--output-dir',output_dir]
Expand Down