forked from diegomcarvalho/biocomp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmultiple_submissions.py
More file actions
87 lines (82 loc) · 4.35 KB
/
multiple_submissions.py
File metadata and controls
87 lines (82 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import time, os, glob, re, argparse, subprocess
from datetime import datetime
TIME_PATTERN = '%Y-%m-%d %H:%M:%S.%f' #parsl.log timestamp pattern
SLEEP_TIME = 60*30 #sleep for 60 min
NUMBER_EXEC = 3
MAX_TASKS = 24
MIN_TASKS = 1
def check_runing():
try:
status = subprocess.check_output("squeue -u rafael.terra | grep \" R \"", shell=True)
except Exception:
status = ""
if(len(status) > 0):
return True
else:
return False
def check_waiting():
try:
status = subprocess.check_output("squeue -u rafael.terra | grep \" PD \"", shell=True)
except Exception:
status = ""
if(len(status) > 0):
return True
else:
return False
def multiple_workloads():
times = list()
#configs = ['300g6t_iqtree_phylonet.config', '300g6t_iqtree_snaq.config', '300g6t_raxml_phylonet.config', '300g6t_raxml_snaq.config', '300g6t_mrbayes_snaq.config', '300g6t_all.config']
#configs.extend(['1000g6t_iqtree_phylonet.config', '1000g6t_iqtree_snaq.config', '1000g6t_raxml_phylonet.config', '1000g6t_raxml_snaq.config', '1000g6t_mrbayes_snaq.config', '1000g6t_all.config'])
configs = ['100g6t_iqtree_phylonet.config', '100g6t_iqtree_snaq.config', '100g6t_raxml_phylonet.config', '100g6t_raxml_snaq.config', '100g6t_mrbayes_snaq.config', '100g6t_all.config']
# configs.extend(['300g6t_iqtree_phylonet.config', '300g6t_iqtree_snaq.config', '300g6t_raxml_phylonet.config', '300g6t_raxml_snaq.config', '300g6t_mrbayes_snaq.config', '300g6t_all.config'])
# configs.extend(['1000g6t_iqtree_phylonet.config', '1000g6t_iqtree_snaq.config', '1000g6t_raxml_phylonet.config', '1000g6t_raxml_snaq.config', '1000g6t_mrbayes_snaq.config', '1000g6t_all.config'])
print('Workload:')
print(format(configs))
for i in range(0, NUMBER_EXEC*len(configs)):
ind = i % len(configs)
print(f'Submitting {configs[ind]}')
while(check_runing()):
print(f'There is a job running. Sleeping...')
time.sleep(SLEEP_TIME)
subprocess.call(f'sbatch --export=ALL,SCRATCH=\'{os.environ["SCRATCH"]}\',WF=\'{os.path.join(os.getcwd(), os.path.join("config",configs[ind]))}\' submit.sh', shell=True)
while(check_runing() or check_waiting()):
print(f'There is a job waiting or running. Sleeping...')
time.sleep(SLEEP_TIME)
files = sorted(glob.glob(os.path.join(os.getcwd(), 'runinfo/*')))
last_runinfo = files[len(files) - 1]
log = os.path.join(last_runinfo, 'parsl.log')
first_line = ""
last_line = ""
with open(log, "rb") as file:
try:
file.seek(0)
first_line = file.readline().decode()
file.seek(-2, os.SEEK_END)
while file.read(1) != b'\n':
file.seek(-2, os.SEEK_CUR)
except OSError:
file.seek(0)
last_line = file.readline().decode()
time_begin = re.match('\d+-\d+-\d+\s\d+:\d+:\d+\.\d+', first_line).group(0)
time_end = re.match('\d+-\d+-\d+\s\d+:\d+:\d+\.\d+', last_line).group(0)
total_time = datetime.strptime(time_end, TIME_PATTERN) - datetime.strptime(time_begin, TIME_PATTERN)
with open("output.txt", 'a') as output:
output.write(f'{configs[ind]},{total_time.total_seconds()}\n')
output.close()
def multiple_tasks():
configs = ['1000g6t_mrbayes_snaq.config']
# configs = ['100g6t_iqtree_phylonet.config', '100g6t_iqtree_snaq.config', '100g6t_raxml_phylonet.config', '100g6t_raxml_snaq.config', '100g6t_mrbayes_snaq.config', '100g6t_all.config']
for config in configs:
for task in [24]:
# for task in [1, 4, 8, 16, 24]:
for i in range(0, NUMBER_EXEC):
print(f"Submitting Workload: {config} Max workers: {task} Execution {i}")
while(check_runing()):
print(f'There is a job running. Sleeping...')
time.sleep(SLEEP_TIME)
subprocess.call(f'sbatch --export=ALL,SCRATCH=\'{os.environ["SCRATCH"]}\',WF=\'{os.path.join(os.getcwd(), os.path.join("config",config))}\',MW=\'{task}\' submit.sh', shell=True)
while(check_runing() or check_waiting()):
print(f'There is a job waiting or running. Sleeping...')
time.sleep(SLEEP_TIME)
if __name__ == "__main__":
multiple_tasks()