-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpulp_pipeunit.py
More file actions
1859 lines (1704 loc) · 90.2 KB
/
pulp_pipeunit.py
File metadata and controls
1859 lines (1704 loc) · 90.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
###################################################################
#
# Class PipeUnit - aka Processing unit per Beam
# and other derivative classes
#
import os, sys, glob, time, re, os.path
import math
import numpy as np
import cPickle
import subprocess, shlex
from subprocess import PIPE, STDOUT, Popen
import psr_utils as pu
from pulp_parset import Observation, radial_distance, find_pulsars
from pulp_usercmd import CMDLine, check_pulsars
from pulp_sysinfo import CEP2Info
from pulp_logging import PulpLogger
from pulp_feedback import FeedbackUnit
# return list of dirs in the current base directory
def getDirs(base):
return [x for x in glob.iglob(os.path.join(base, '*')) if os.path.isdir(x)]
# recursive glob
def rglob(base, pattern, maxdepth=0):
flist=[]
flist.extend(glob.glob(os.path.join(base, pattern)))
dirs = getDirs(base)
if maxdepth <= 0: return flist
if len(dirs):
for d in dirs:
flist.extend(rglob(os.path.join(base, d), pattern, maxdepth-1))
return flist
# common postprocessing routines after DSPSR calls for different Units and both DAL and non_DAL parts
# root - class instance to execute
# ref - class instance to get values of chans, etc.
def dspsr_postproc(root, ref, cmdline, obs, psr, total_chan, nsubs_eff, curdir, output_prefix):
# removing corrupted freq channels
if ref.nrChanPerSub > 1:
root.log.info("Zapping every %d channel..." % (ref.nrChanPerSub))
cmd="paz -z \"%s\" -m %s_%s.ar" % \
(" ".join([str(jj) for jj in range(0, total_chan, ref.nrChanPerSub)]), psr, output_prefix)
root.execute(cmd, workdir=curdir)
# rfi zapping
if not cmdline.opts.is_norfi:
# first we check how large the dataset is (product of number of channels and subints)
# this is necessary as clean.py uses a lot of memory. If som then we use old-fashioned paz -r
if (obs.duration/cmdline.opts.tsubint) * total_chan >= 512000:
root.log.info("Zapping channels using median smoothed difference algorithm...")
cmd="paz -r -e paz.ar %s_%s.ar" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
else:
root.log.info("Running Patrick Lazarus's COAST_GUARD RFI cleaner using surgical approach...")
cmd="clean.py -F surgical -o %s_%s.paz.ar %s_%s.ar" % (psr, output_prefix, psr, output_prefix)
try:
root.execute(cmd, workdir=curdir)
except:
root.log.info("COAST_GUARD RFI cleaner failed! Will try to use paz -r...")
root.log.info("Zapping channels using median smoothed difference algorithm...")
cmd="paz -r -e paz.ar %s_%s.ar" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
# dedispersing
# checking if there was already an option -K. That means we do not need to run dedispersion as all sub-integrations
# have been aligned already
if re.match("^\-K$", cmdline.opts.dspsr_extra_opts) or re.match("^.*\s+\-K$", cmdline.opts.dspsr_extra_opts) or re.match("^.*\s+\-K\s+.*$", cmdline.opts.dspsr_extra_opts) or re.match("^\-K\s+.*$", cmdline.opts.dspsr_extra_opts):
cmd="mv %s_%s.ar %s_%s.dd" % (psr, output_prefix, psr, output_prefix)
root.execute(cmd, workdir=curdir)
else:
root.log.info("Dedispersing...")
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_%s.paz.ar" % (curdir, psr, output_prefix)):
cmd="pam -D -m %s_%s.paz.ar" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
cmd="pam -D -e dd %s_%s.ar" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
# scrunching in frequency
root.log.info("Scrunching in frequency to have %d channels in the output ar-file..." % (nsubs_eff))
if ref.nrChanPerSub > 1:
# first, running fscrunch on zapped archive
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_%s.paz.ar" % (curdir, psr, output_prefix)):
cmd="pam --setnchn %d -e fscr.AR %s_%s.paz.ar" % (nsubs_eff, psr, output_prefix)
root.execute(cmd, workdir=curdir)
# remove non-scrunched zapped archive (we will always have unzapped non-scrunched version)
cmd="rm -f %s_%s.paz.ar" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
# running fscrunching on non-zapped archive
cmd="pam --setnchn %d -e fscr.AR %s_%s.dd" % (nsubs_eff, psr, output_prefix)
root.execute(cmd, workdir=curdir)
# remove non-scrunched dedispersed archive (we will always have unzapped non-dedispersed non-scrunched version)
cmd="rm -f %s_%s.dd" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
else: # if number of chans == number of subs, we will just rename .paz.ar to .paz.fscr.AR and .dd to .fscr.AR
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_%s.paz.ar" % (curdir, psr, output_prefix)):
cmd="mv -f %s_%s.paz.ar %s_%s.paz.fscr.AR" % (psr, output_prefix, psr, output_prefix)
root.execute(cmd, workdir=curdir)
cmd="mv -f %s_%s.dd %s_%s.fscr.AR" % (psr, output_prefix, psr, output_prefix)
root.execute(cmd, workdir=curdir)
# running pav's to make DSPSR diagnostic plots
# root - class instance to execute
# ref - class instance to get values of chans, etc.
def make_dspsr_plots(root, ref, cmdline, obs, nsubs_eff, curdir, output_prefix, is_pdmp=False):
# first, calculating the proper max divisor for the number of subbands
root.log.info("Getting proper value of nchans in pav -f between %d and %d..." % (1, min(nsubs_eff, 63)))
# calculating the greatest common denominator of self.tab.nrSubbands starting from 63 down
pav_nchans = ref.hcd(1, min(nsubs_eff, 63), nsubs_eff)
fscrunch_factor = nsubs_eff / pav_nchans
if is_pdmp: suffix=".pdmp.AR"
else: suffix=".AR"
root.log.info("Creating diagnostic plots...")
for psr in ref.psrs: # pulsar list is empty if --nofold is used
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_%s.paz.fscr%s" % (curdir, psr, output_prefix, suffix)):
output_stem=".paz.fscr%s" % (suffix)
else: output_stem=".fscr%s" % (suffix)
# creating DSPSR diagnostic plots
if obs.CV: plot_type = "SFTd"
else: plot_type = "DFTpd"
cmd="pav -%s -C -g %s_%s_%s.ps/cps %s_%s%s" % (plot_type, psr, output_prefix, plot_type, psr, output_prefix, output_stem)
root.execute(cmd, workdir=curdir)
cmd="pav -GTpdf%d -C -g %s_%s_GTpdf%d.ps/cps %s_%s%s" % (fscrunch_factor, psr, output_prefix, fscrunch_factor, psr, output_prefix, output_stem)
root.execute(cmd, workdir=curdir)
cmd="pav -YFpd -C -g %s_%s_YFpd.ps/cps %s_%s%s" % (psr, output_prefix, psr, output_prefix, output_stem)
root.execute(cmd, workdir=curdir)
cmd="pav -j -g %s_%s_j.ps/cps %s_%s%s" % (psr, output_prefix, psr, output_prefix, output_stem)
root.execute(cmd, workdir=curdir)
if not cmdline.opts.is_skip_rmfit and obs.CV:
try:
# running rmfit for negative and positive RMs
cmd="rmfit -m -100,0,100 -D -K %s_%s.negRM.ps/cps %s_%s%s" % (psr, output_prefix, psr, output_prefix, output_stem)
root.execute(cmd, workdir=curdir)
cmd="rmfit -m 0,100,100 -D -K %s_%s.posRM.ps/cps %s_%s%s" % (psr, output_prefix, psr, output_prefix, output_stem)
root.execute(cmd, workdir=curdir)
cmd="convert \( %s_%s_GTpdf%d.ps %s_%s_j.ps %s_%s.posRM.ps +append \) \( %s_%s_%s.ps %s_%s_YFpd.ps %s_%s.negRM.ps +append \) -append -rotate 90 -background white -flatten %s_%s_%s.png" % \
(psr, output_prefix, fscrunch_factor, psr, output_prefix, psr, output_prefix, psr, output_prefix, plot_type, \
psr, output_prefix, psr, output_prefix, psr, output_prefix, is_pdmp and "diag_pdmp" or "diag")
root.execute(cmd, workdir=curdir)
except Exception:
root.log.warning("***** Warning! Rmfit has failed. Diagnostic plots will be made without rmfit plots. *****")
cmd="convert \( %s_%s_GTpdf%d.ps %s_%s_j.ps +append \) \( %s_%s_%s.ps %s_%s_YFpd.ps +append \) -append -rotate 90 -background white -flatten %s_%s_%s.png" % \
(psr, output_prefix, fscrunch_factor, psr, output_prefix, psr, output_prefix, plot_type, \
psr, output_prefix, psr, output_prefix, is_pdmp and "diag_pdmp" or "diag")
root.execute(cmd, workdir=curdir)
else:
cmd="convert \( %s_%s_GTpdf%d.ps %s_%s_j.ps +append \) \( %s_%s_%s.ps %s_%s_YFpd.ps +append \) -append -rotate 90 -background white -flatten %s_%s_%s.png" % \
(psr, output_prefix, fscrunch_factor, psr, output_prefix, psr, output_prefix, plot_type, \
psr, output_prefix, psr, output_prefix, is_pdmp and "diag_pdmp" or "diag")
root.execute(cmd, workdir=curdir)
# start pdmp calls
# root - class instance to execute
# ref - class instance to get values of chans, etc.
def start_pdmp(root, ref, cmdline, obs, nsubs_eff, curdir, output_prefix):
# now running pdmp without waiting...
root.log.info("Running pdmp...")
pdmp_popens=[] # list of pdmp Popen objects
for psr in ref.psrs:
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_%s.paz.fscr.AR" % (curdir, psr, output_prefix)):
output_stem=".paz.fscr.AR"
else: output_stem=".fscr.AR"
# getting the number of bins in the ar-file (it can be different from self.get_best_nbins, because
# we still provide our own number of bins in --dspsr-extra-opts
try:
cmd="psredit -q -Q -c nbin %s/%s_%s%s" % (curdir, psr, output_prefix, output_stem)
binsline=os.popen(cmd).readlines()
if np.size(binsline) > 0:
best_nbins=int(binsline[0][:-1])
cmd="pdmp -mc %d -mb %d -g %s_%s_pdmp.ps/cps %s_%s%s" % (nsubs_eff, min(128, best_nbins), psr, output_prefix, psr, output_prefix, output_stem)
pdmp_popen = root.start_and_go(cmd, workdir=curdir)
pdmp_popens.append(pdmp_popen)
except Exception: pass
return pdmp_popens
# finish pdmps
# root - class instance to execute
# ref - class instance to get values of chans, etc.
def finish_pdmp(root, ref, pdmp_popens, cmdline, obs, curdir, output_prefix):
# waiting for pdmp to finish
root.waiting_list("pdmp", pdmp_popens)
# when pdmp is finished do extra actions with files...
for psr in ref.psrs:
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_%s.paz.fscr.AR" % (curdir, psr, output_prefix)):
output_stem=".paz.fscr.AR"
else: output_stem=".fscr.AR"
cmd="grep %s %s/pdmp.per > %s/%s_%s_pdmp.per" % (psr, curdir, curdir, psr, output_prefix)
root.execute(cmd, is_os=True)
cmd="grep %s %s/pdmp.posn > %s/%s_%s_pdmp.posn" % (psr, curdir, curdir, psr, output_prefix)
root.execute(cmd, is_os=True)
# getting new topo period
logf = open(root.log.get_logfile(), 'r')
newp0s = [str(float(ff.split()[5])/1000.) for ff in logf.read().splitlines() if re.search("^Best TC", ff) is not None]
logf.close()
if np.size(newp0s) > 1: newp0 = newp0s[-1]
else: newp0 = newp0s[0]
# reading new DM from the *.per file
newdm = np.loadtxt("%s/%s_%s_pdmp.per" % (curdir, psr, output_prefix), comments='#', usecols=(3,3), dtype=float, unpack=True)[0]
if np.size(newdm) > 1: cmd="pam -e pdmp.AR -d %f --period %s -D %s_%s%s" % (newdm[-1], newp0, psr, output_prefix, output_stem)
else: cmd="pam -e pdmp.AR -d %f --period %s -D %s_%s%s" % (newdm, newp0, psr, output_prefix, output_stem)
root.execute(cmd, workdir=curdir)
# Running spectar.py to calculate pulsar spectra
def calc_psr_spectra(root, ref, cmdline, obs, sapid, tabid, curdir, output_prefix):
if not cmdline.opts.is_cobalt:
root.log.info("Calculating pulsar(s) spectra...")
try:
for psr in ref.psrs:
psrar=glob.glob("%s/%s_%s.paz.fscr.pdmp.AR" % (curdir, psr, output_prefix))
if len(psrar) == 0:
psrar=glob.glob("%s/%s_%s.paz.fscr.AR" % (curdir, psr, output_prefix))
if len(psrar) == 0:
psrar=glob.glob("%s/%s_%s.fscr.AR" % (curdir, psr, output_prefix))
if len(psrar) == 0: continue
cmd="spectar.py -f %s -p %s -o %s/%s_%s_%s --sap %d --tab %d" % (psrar[0], obs.parset, curdir, psr, output_prefix, "spectra.txt", sapid, tabid)
root.execute(cmd, workdir=curdir)
except: pass
else:
root.log.info("Calculating pulsar(s) spectra...skipped. No observation parset anymore.")
# patch to fill in source coordinates to the header of output ar-file
def fix_header_coords(root, ref, psr, curdir, output_prefix):
# getting coordinates string of the pulsar
psr_ra=pu.coord_to_string(*pu.rad_to_hms(ref.tab.rarad))
psr_dec=pu.coord_to_string(*pu.rad_to_dms(ref.tab.decrad))
if ref.tab.decrad < 0: psr_coords=psr_ra+psr_dec
else: psr_coords=psr_ra+"+"+psr_dec
cmd="psredit -m -c coord=%s %s_%s.ar" % (psr_coords, psr, output_prefix)
root.execute(cmd, workdir=curdir)
# running single-pulse analysis for CV data (prepdata + single_pulse_search.py) for all specified pulsars
# for CV data means, that we already ran digifil with further rfifind on a .fil file
def run_prepdata_CV(root, ref, cmdline, outdir, curdir, output_prefix):
root.log.info("Running single-pulse analysis (prepdata + single_pulse_search.py for a pulsar(s) DM")
try:
for psr in ref.psrs:
psr2=re.sub(r'^[BJ]', '', psr)
if cmdline.opts.is_nofold:
psrdm = 0.0
else:
if os.path.exists("%s/%s.par" % (outdir, psr2)):
psrdm=ref.get_psr_dm("%s/%s.par" % (outdir, psr2))
else: # this check is necessary because when combining parts the parfile is in the curdir, not in the outdir
psrdm=ref.get_psr_dm("%s/%s.par" % (curdir, psr2))
# running prepdata with mask (if --norfi was not set)
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_%s_rfifind.mask" % (curdir, psr, output_prefix)):
cmd="prepdata -noclip -nobary -dm %f -mask %s_%s_rfifind.mask -o %s_%s %s %s_%s.fil" % \
(psrdm, psr, output_prefix, psr, output_prefix, cmdline.opts.prepdata_extra_opts, psr, output_prefix)
else: # running prepdata without mask
cmd="prepdata -noclip -nobary -dm %f -o %s_%s %s %s_%s.fil" % \
(psrdm, psr, output_prefix, cmdline.opts.prepdata_extra_opts, psr, output_prefix)
root.execute(cmd, workdir=curdir)
# running single_pulse_search.py on .dat file (created either with mask or not)
cmd="single_pulse_search_lotaas.py --no-width-label %s_%s.dat" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
except: pass
# base class for the single processing (a-ka beam)
class PipeUnit:
def __init__(self, obs, cep2, cmdline, tab, log, curstokes, part=0):
self.code = "" # 2-letter code, CS, IS, CV
self.stokes = "" # Stokes I, IQUV, or XXYY
self.stokes_index = curstokes
self.sapid = tab.parent_sapid
self.tabid = tab.tabid
self.tab = tab
self.parent = None # parent Popen project
self.procs = [] # list of open processes
# process.pid - pid, process.returncode - status
# if returncode == None, it means that process is still running
self.rawdir = "" # actual rawdir for this beam (where the processing is happening, e.g. either /data1 or /data2)
self.processed_dir_root = "" # root directory relative to cep2.processed_dir_prefix, i.e. where LOFAR_PULSAR_ARCHIVE resides
# because it can be different from cep2.rawdir
self.outdir = "" # root directory with processed data
self.curdir = "" # current processing directory
self.beams_root_dir = "" # 'stokes' for CS, 'incoherentstokes' for IS
self.summary_node = "" # summary node, for CS - locus092, for IS - locus094
self.summary_node_dir_suffix = "" # for CS this is "_CSplots" and for IS - "_redIS"
self.archive_suffix = "" # "_plotsCS.tar.gz" for CS and "_plotsIS.tar.gz" for IS
self.outdir_suffix = "" # for CS this is "_red" (if user did not specify) and "_redIS" for IS
self.raw2fits_extra_options = "" # extra options for 2bf2fits: -CS -H for CS, -CS -H -IS for IS
self.raw_8bit_dir = "raw-8bit" # directory with raw 8-bit data (when requested)
self.nrChanPerSub = 0
self.sampling = 0
self.downsample_factor = 0
self.log = None
self.logfile = "" # basename of the logfile
self.start_time = 0 # start time of the processing (in s)
self.end_time = 0 # end time (in s)
self.total_time = 0 # total time in s
# extensions of the files to copy to summary node
self.extensions=["*.log", "*.txt", "*.pdf", "*.ps", "*.bestprof", "*.rfirep", "*png", "*_DM*.inf", "*.singlepulse"]
self.procdir = "BEAM%d" % (self.tabid)
# extensions for the full archive, e.g. LTA
self.full_archive_exts=["*.log", "*.txt", "*.pdf", "*.ps", "*.pfd", "*.bestprof", "*.polycos", "*.inf", "*.rfirep", "*png", "*.ar", "*.AR", "*pdmp*", "*_rfifind*", "*.dat", "*.singlepulse", "*.h5", "*.fil", "*.rv", "*.out", "*.raw", "*.fits"]
# feedback unit
self.fbunit = None
# bandwidth part index
self.part = part
# pulsars to fold for this unit
self.psrs = []
if not cmdline.opts.is_nofold:
self.psrs = self.get_pulsars_to_fold(obs, cep2, cmdline, log)
# to be sure that we have unique list of pulsars (especially relevant for tabfind+ option)
self.psrs = np.unique(self.psrs)
# dspsr folding options
# making choice between -L %d and "-s"
# by default -L is used, but if -s is given in the dspsr_extra_opts, then we should get rid of -L
self.dspsr_folding_options="-L %d" % (cmdline.opts.tsubint)
if re.match("^\-s$", cmdline.opts.dspsr_extra_opts) or re.match("^.*\s+\-s$", cmdline.opts.dspsr_extra_opts) or re.match("^.*\s+\-s\s+.*$", cmdline.opts.dspsr_extra_opts) or re.match("^\-s\s+.*$", cmdline.opts.dspsr_extra_opts):
self.dspsr_folding_options=""
# function to set outdir and curdir directories
def set_outdir(self, obs, cep2, cmdline):
if len(self.tab.location) == 0: # when raw data are erased but still want to run only summaries
self.location=cep2.hoover_nodes[0] # basically there is no need in this, as only summaries will be done
elif len(self.tab.location) > 1:
if (self.tab.is_coherent and obs.stokesCS == "IQUV" and obs.nsplitsCS == 1) or (self.tab.is_coherent == False and obs.stokesIS == "IQUV" and obs.nsplitsIS == 1):
self.location=[key for key in self.tab.rawfiles.keys() if any("_S%d_" % (self.stokes_index) in ff for ff in self.tab.rawfiles[key])][0]
else:
if not cmdline.opts.is_nohoover: # are not using hoover nodes
self.location=cep2.hoover_nodes[0]
# otherwise self.location is already defined in the PartUnit
else:
self.location=self.tab.location[0]
# setting output directory
if cmdline.opts.is_auto: # auto pipeline mode
if cep2.cluster_headnode[:5] == "head0" or cep2.cluster_headnode[:3] == "cpu": # CEP4
self.outdir = "%s%s" % (cep2.processed_dir, self.outdir_suffix)
else: # not CEP4
self.outdir = "%s" % (cep2.processed_dir)
if cep2.rawdir[:5] == "/data":
if cep2.cluster_headnode[:5] == "head0" or cep2.cluster_headnode[:3] == "cpu": # CEP4
self.rawdir = "%s%s" % (cep2.rawdir, "/".join(cep2.processed_dir.split("/data")[-1].split("/")[:-1]))
self.processed_dir_root = cep2.processed_dir_root
else: # CEP2
self.rawdir = "%s%s" % (cep2.rawdir, cep2.processed_dir.split("/data")[-1].split("/")[0])
self.processed_dir_root = "%s%s" % (cep2.processed_dir_root, cep2.processed_dir.split("/data")[-1].split("/")[0])
else:
if cep2.cluster_headnode[:5] == "head0" or cep2.cluster_headnode[:3] == "cpu": # CEP4
self.rawdir = "/".join(cep2.processed_dir.split("/")[:-1])
self.processed_dir_root = ""
else: # CEP2
self.rawdir = cep2.rawdir
self.processed_dir_root = cep2.processed_dir_root
else:
if cep2.cluster_headnode[:5] == "head0" or cep2.cluster_headnode[:3] == "cpu": # CEP4
if cep2.rawdir[:5] == "/data":
myfiles=[ff for ff in self.tab.rawfiles[self.location] if "_P%03d_" % (self.part) in ff]
self.rawdir = "%s%s" % (cep2.rawdir, "/".join(myfiles[0].split("/data")[-1].split("/")[:-3]))
#self.processed_dir_root = "%s%s" % (cep2.processed_dir_root, "/".join(myfiles[0].split("/data")[-1].split("/")[:-3]))
else:
self.rawdir = cep2.rawdir
self.processed_dir_root = cep2.processed_dir_root
else: # not CEP4
if cep2.rawdir[:5] == "/data":
myfiles=[ff for ff in self.tab.rawfiles[self.location] if "_P%03d_" % (self.part) in ff]
self.rawdir = "%s%s" % (cep2.rawdir, myfiles[0].split("/data")[-1].split("/")[0])
self.processed_dir_root = "%s%s" % (cep2.processed_dir_root, myfiles[0].split("/data")[-1].split("/")[0])
else:
self.rawdir = cep2.rawdir
self.processed_dir_root = cep2.processed_dir_root
# if user specified output dir (relative to /data/LOFAR_PULSAR_....)
if cmdline.opts.outdir != "":
if not cmdline.opts.is_globalfs:
self.outdir = "%s/%s_%s/%s" % (self.processed_dir_root, cep2.processed_dir_prefix, self.location, cmdline.opts.outdir)
else:
self.outdir = "%s/%s/%s" % (self.processed_dir_root, cep2.processed_dir_prefix, cmdline.opts.outdir)
else: # if output dir was not set
if not cmdline.opts.is_globalfs:
self.outdir = "%s/%s_%s/%s%s" % (self.processed_dir_root, cep2.processed_dir_prefix, self.location, obs.id, self.outdir_suffix)
else:
self.outdir = "%s/%s/%s%s" % (self.processed_dir_root, cep2.processed_dir_prefix, obs.id, self.outdir_suffix)
self.curdir = "%s/%s/SAP%d/%s" % (self.outdir, self.beams_root_dir, self.sapid, self.procdir)
# function to get feedback index for the feedback unit
def get_feedback_index(self, obs, cmdline):
fbindex=0
for sap in sorted(obs.saps, key=lambda x: x.sapid):
for tab in sorted(sap.tabs, key=lambda x: x.tabid):
if self.sapid == sap.sapid and self.tabid == tab.tabid:
if tab.is_coherent:
if obs.stokesCS == "IQUV":
fbindex += 4 * (self.part - cmdline.opts.first_freq_splitCS)
fbindex += self.stokes_index
else: fbindex += (self.part - cmdline.opts.first_freq_splitCS)
else:
if obs.stokesIS == "IQUV":
fbindex += 4 * (self.part - cmdline.opts.first_freq_splitIS)
fbindex += self.stokes_index
else: fbindex += (self.part - cmdline.opts.first_freq_splitIS)
return fbindex
else:
if tab.is_coherent:
if obs.stokesCS == "IQUV":
fbindex += 4 * cmdline.opts.nsplitsCS
else: fbindex += cmdline.opts.nsplitsCS
else:
if obs.stokesIS == "IQUV":
fbindex += 4 * cmdline.opts.nsplitsIS
else: fbindex += cmdline.opts.nsplitsIS
return fbindex
# function to initialize feedback unit (should be called after self.outdir is set)
def feedback_init(self, obs, cep2, cmdline):
fbindex=self.get_feedback_index(obs, cmdline)
self.fbunit = FeedbackUnit(fbindex, self.location, self.outdir, self.sapid, self.tabid, self.stokes_index, self.part)
if (self.tab.is_coherent and obs.stokesCS == "IQUV") or (self.tab.is_coherent == False and obs.stokesIS == "IQUV"):
self.fbunit.update("%s/" % (self.outdir),
"%s/.%s_SAP%03d_B%03d_S%d_P%03d.fb" % (cep2.get_logdir(), cep2.pipeid, self.sapid, self.tabid, self.stokes_index, self.part), self.code, obs)
else:
self.fbunit.update("%s/" % (self.outdir),
"%s/.%s_SAP%03d_B%03d_P%03d.fb" % (cep2.get_logdir(), cep2.pipeid, self.sapid, self.tabid, self.part), self.code, obs)
# writing initial feedback file to disk, so we have something in case pipeline will fail
self.fbunit.flush(0, cep2, False)
# function to get the list of pulsars to fold for this TAB (unit)
def get_pulsars_to_fold(self, obs, cep2, cmdline, log):
try:
# get pulsar name from the parset
# if pulsar is not given in the command line, I also have to find pulsar if parset entry is empty
if len(cmdline.psrs) == 0 or cmdline.psrs[0] == "parset":
for sap in obs.saps:
if self.sapid == sap.sapid and sap.source != "" and check_pulsars(sap.source, cmdline, cep2, None):
self.psrs.append(sap.source)
# if --pulsar is not given and source field in parset is empty
if len(cmdline.psrs) == 0 and len(self.psrs) == 0:
for sap in obs.saps:
if self.sapid == sap.sapid:
self.psrs[:] = sap.psrs
break
if len(self.psrs)>0: self.psrs = self.psrs[:1] # leave only one pulsar
# if special word "tabfind" or "tabfind+" is given
# if it is "tabfind+", then we take pulsar from the parset (if exist), then one pulsar from the SAP (if different from the parset)
# and then also search for another pulsar in the TAB
# In case of "tabfind" we only search for pulsars in the TAB
if len(cmdline.psrs) != 0 and (cmdline.psrs[0] == "tabfind" or cmdline.psrs[0] == "tabfind+"):
# in the special case of "tabfind+"...
if cmdline.psrs[0] == "tabfind+":
for sap in obs.saps:
if self.sapid == sap.sapid:
if sap.source != "" and check_pulsars(sap.source, cmdline, cep2, None):
self.psrs.append(sap.source)
if len(sap.psrs) > 0: self.psrs.append(sap.psrs[0])
log.info("Searching for best pulsar for folding in SAP=%d TAB=%d..." % (self.sapid, self.tabid))
tabpsrs = find_pulsars(self.tab.rarad, self.tab.decrad, cmdline, cmdline.opts.fwhm_CS/2.)
if len(tabpsrs) > 0:
self.psrs.append(tabpsrs[0]) # use only one pulsar from those found in a TAB
log.info("%s" % (tabpsrs[0]))
# using pulsars from SAP
if len(cmdline.psrs) != 0 and (cmdline.psrs[0] == "sapfind" or cmdline.psrs[0] == "sapfind3"):
for sap in obs.saps:
if self.sapid == sap.sapid:
self.psrs[:] = sap.psrs
break
if cmdline.psrs[0] == "sapfind" and len(self.psrs)>0: self.psrs = self.psrs[:1] # leave only one pulsar
# if --pulsar is used but no special word
if len(cmdline.psrs) != 0 and cmdline.psrs[0] != "parset" and cmdline.psrs[0] != "tabfind" and \
cmdline.psrs[0] != "sapfind" and cmdline.psrs[0] != "sapfind3" and cmdline.psrs[0] != "tabfind+":
self.psrs[:] = cmdline.psrs # copying all items
# checking if pulsars are in ATNF catalog, or if not par-files do exist fo them, if not - exit
for psr in self.psrs:
if not check_pulsars(psr, cmdline, cep2, log):
log.info("*** No parfile found for pulsar %s for SAP=%d TAB=%d. Exiting..." % (psr, self.sapid, self.tabid))
raise Exception
# if pulsar list is still empty, and we did not set --nofold then set is_nofold flag to True
if len(self.psrs) == 0:
log.warning("*** No pulsar found to fold for SAP=%d TAB=%d. Setting --nofold flag for this beam..." % (self.sapid, self.tabid))
return self.psrs
except Exception:
self.log.exception("Oops... get_pulsars_to_fold has crashed!")
self.kill() # killing all open processes
raise
# getting proper parfile in the processing directory
def get_parfile(self, cmdline, cep2):
# local function to check the existence of the parfile
def check_parfile (parfile, psrstem):
if os.path.exists(parfile):
try:
cmd="rsync -a %s %s/%s.par" % (parfile, self.outdir, psrstem)
self.execute(cmd)
# also converting them to UNIX format (if were prepared on Macs for example)
# if they are in UNIX format already, nothing happens
cmd="dos2unix %s/%s.par" % (self.outdir, psrstem)
self.execute(cmd)
except: pass
return True
return False
for psr in self.psrs:
psr2=re.sub(r'^[BJ]', '', psr)
if cmdline.opts.parfile != "":
if os.path.exists(cmdline.opts.parfile):
self.log.info("Copying user-specified parfile '%s' to %s/%s.par" % \
(cmdline.opts.parfile, self.outdir, psr2))
try:
cmd="rsync -a %s %s/%s.par" % (cmdline.opts.parfile, self.outdir, psr2)
self.execute(cmd)
except: pass
continue
else:
self.log.error("Can't find user-specified parfile '%s'. Exiting..." % (cmdline.opts.parfile))
self.kill()
raise Exception
# checking repository dir withiout ^[BJ] in the name of the parfile
parfile="%s/%s.par" % (cep2.parfile_dir, psr2)
if check_parfile(parfile, psr2): continue
# with the B or J in the name
parfile="%s/%s.par" % (cep2.parfile_dir, psr)
if check_parfile(parfile, psr2): continue
# checking extra directory withiout ^[BJ] in the name of the parfile
parfile="%s/%s.par" % (cep2.parfile_dir_extra, psr2)
if check_parfile(parfile, psr2): continue
# checking extra directory with the B or J in the name
parfile="%s/%s.par" % (cep2.parfile_dir_extra, psr)
if check_parfile(parfile, psr2): continue
self.log.info("Parfile does not exist. Creating parfile base on pulsar ephemeris from ATNF catalog...")
# for -e option, we need to use pulsar name with leading B or J, otherwise it is possible (I cama across this!)
# that there are two pulsars with the same name, one with leading J, another with leading B,
# in this case psrcat returns records for both pulsars, and output parfile gets messed up
if not cmdline.opts.is_globalfs:
cmd="psrcat -db_file %s -e %s > %s/%s.par" % (cep2.psrcatdb, psr, self.outdir, psr2)
self.execute(cmd, is_os=True)
else:
import tempfile
tmpdir = tempfile.mkdtemp()
parf = os.path.join(tmpdir, "%s.par" % (psr2))
cmd="psrcat -db_file %s -e %s > %s" % (cep2.psrcatdb, psr, parf)
self.execute(cmd, is_os=True)
cmd="rsync -a %s %s" % (parf, self.outdir)
self.execute(cmd)
cmd="rm -rf %s" % (tmpdir)
os.system(cmd)
# To avoid potential simultaneous access (as probably happens on CEP4 with global file system)
# each PipeUnit will have also copies of the parfiles inside the temporary directories
# which will be deleted at the end
import tempfile
tmpdir = tempfile.mkdtemp()
# Now we check the par-files if they have non-appropriate flags that can cause prepfold to crash
toremove_psrs=[] # list of pulsars to remove from folding in case there is a problem with the parfile (e.g. PEPOCH is absent)
for psr in self.psrs:
psr2=re.sub(r'^[BJ]', '', psr)
parfile="%s/%s.par" % (self.outdir, psr2)
cmd="rsync -a %s %s" % (parfile, tmpdir)
self.execute(cmd)
parf="%s/%s" % (tmpdir, parfile.split("/")[-1])
# check first that PEPOCH is in the parfile
cmd="grep 'PEPOCH' %s" % (parf,)
status=os.popen(cmd).readlines()
if np.size(status)==0:
self.log.warning("WARNING: Par-file %s has no PEPOCH keyword, so this pulsar will be excluded from folding." % (parfile,))
toremove_psrs.append(psr)
continue
# check first for PSRB name. It should be changed to PSRJ
cmd="grep 'PSRB' %s" % (parf,)
status=os.popen(cmd).readlines()
if np.size(status)>0:
self.log.warning("WARNING: Par-file %s has PSRB keyword that can cause prepfold to crash!\n\
If your pipeline run calls prepfold you might need to change PSRB to PSRJ." % (parfile,))
# checking CLK flag
cmd="grep 'CLK' %s" % (parf,)
status=os.popen(cmd).readlines()
if np.size(status)>0:
self.log.warning("WARNING: Par-file %s has CLK keyword that can cause prepfold to crash!\n\
CLK line will be removed from the parfile!" % (parfile,))
cmd="sed -i '/^CLK/d' %s" % (parf,)
self.execute(cmd, self.log, is_os=True)
cmd="rsync -a %s %s" % (parf, self.outdir)
self.execute(cmd)
# checking for UNITS flag
cmd="grep 'UNITS' %s" % (parf,)
status=os.popen(cmd).readlines()
if np.size(status)>0:
self.log.warning("WARNING: Par-file %s has UNITS keyword that can cause prepfold to crash!\n\
UNITS line will be removed from the parfile!" % (parfile,))
cmd="sed -i '/^UNITS/d' %s" % (parf,)
self.execute(cmd, self.log, is_os=True)
cmd="rsync -a %s %s" % (parf, self.outdir)
self.execute(cmd)
# removing pulsars from folding if their parfile is bad (e.g. no PEPOCH)
if len(toremove_psrs) > 0:
indices_to_remove=set()
for psr in np.unique(toremove_psrs):
for ii in xrange(len(self.psrs)):
if psr == self.psrs[ii]: indices_to_remove.add(ii)
indices_to_keep=sorted(set(np.arange(len(self.psrs)))-indices_to_remove)
self.psrs = self.psrs[indices_to_keep]
# return tmpdir
return tmpdir
def execute(self, cmd, workdir=None, shell=False, is_os=False):
"""
Execute the command 'cmd' after logging the command
and the wall-clock amount of time the command took to execute.
This function waits for process to finish
"""
try:
self.log.info(re.sub("\n", "\\\\n", cmd)) # also escaping \n to show it as it is
job_start = time.time()
self.log.info("Start at UTC %s" % (time.asctime(time.gmtime())))
status = 1
if is_os: status = os.system(cmd)
else:
process = Popen(shlex.split(cmd), stdout=PIPE, stderr=STDOUT, cwd=workdir, shell=shell, bufsize=1048576)
self.procs.append(process)
self.log.process2log(process)
process.communicate()
status=process.poll()
self.procs.remove(process)
job_end = time.time()
job_total_time = job_end - job_start
self.log.info("Finished at UTC %s, status=%s, Total running time: %.1f s (%.2f hrs)" % (time.asctime(time.gmtime()), status, job_total_time, job_total_time/3600.))
self.log.info("")
# if job is not successful
if status != 0:
raise Exception
except Exception:
self.log.exception("Oops... job has crashed!\n%s\nStatus=%s" % (re.sub("\n", "\\\\n", cmd), status))
raise Exception
def start_and_go(self, cmd, workdir=None, shell=False, immediate_status_check=False):
"""
Execute the command 'cmd' after logging the command
This function start the cmd and leaves the function
returning the Popen object, it does not wait for process to finish
"""
status=1
try:
self.log.info(re.sub("\n", "\\\\n", cmd))
self.log.info("Start at UTC %s" % (time.asctime(time.gmtime())))
if immediate_status_check:
process = Popen(shlex.split(cmd), cwd=workdir, shell=shell, bufsize=1048576)
time.sleep(10) # waiting 10 secs to see if process crashes right away
if process.poll() is not None and process.poll() != 0:
raise Exception
else: process.kill() # if process is still running, it means that cmd is good, so we kill it in order to
# restart it with proper stdout/stderr and add it to the list
process = Popen(shlex.split(cmd), stdout=PIPE, stderr=STDOUT, cwd=workdir, shell=shell, bufsize=1048576)
status=process.returncode
self.procs.append(process)
self.log.info("Job pid=%d, not waiting for it to finish." % (process.pid))
return process
except Exception:
self.log.exception("Oops... job has crashed!\n%s\nStatus=%s" % (re.sub("\n", "\\\\n", cmd), status))
raise Exception
def waiting(self, prg, popen):
"""
Waiting for process to finish
"""
try:
job_start = time.time()
self.log.info("Waiting for %s to finish, pid=%d" % (prg, popen.pid))
(sout, serr) = popen.communicate()
# we pipe serr to sout, so no need to log serr
# self.log.stdout2log(sout)
self.log.info(sout)
job_end = time.time()
job_total_time = job_end - job_start
self.log.info("Process pid=%d (%s) has finished at UTC %s, status=%d, Waiting time: %.1f s (%.2f hrs)" % \
(popen.pid, prg, time.asctime(time.gmtime()), popen.returncode, job_total_time, job_total_time/3600.))
self.log.info("")
self.procs.remove(popen)
# if job is not successful
if popen.poll() != 0:
raise Exception
except Exception:
self.log.exception("Oops... %s has crashed!\npid=%d, Status=%s" % (prg, popen.pid, popen.returncode))
raise Exception
def waiting_list(self, prg, popen_list):
"""
Waiting for list of processes to finish
"""
try:
job_start = time.time()
self.log.info("Waiting for %s processes to finish..." % (prg))
run_units = [u.pid for u in popen_list if u.poll() is None]
self.log.info("Still running [%d]: %s" % (len(run_units), run_units))
for unit in popen_list:
self.waiting(prg, unit)
run_units = [u.pid for u in popen_list if u.poll() is None]
finished_units = [u for u in popen_list if u.poll() is not None]
for fu in finished_units:
if fu.returncode != 0:
self.log.exception("Oops... %s has crashed!\npid=%d, Status=%s" % (prg, fu.pid, fu.returncode))
if len(run_units) > 0: self.log.info("Still running [%d]: %s" % (len(run_units), run_units))
job_end = time.time()
job_total_time = job_end - job_start
self.log.info("Processes of %s have finished at UTC %s, Waiting time: %.1f s (%.2f hrs)" % (prg, time.asctime(time.gmtime()), job_total_time, job_total_time/3600.))
self.log.info("")
except Exception:
self.log.exception("Oops... %s has crashed!\npids = %s" % (prg, ",".join(["%d" % (fu.pid) for fu in popen_list if fu.poll() is not None])))
raise Exception
"""
def waiting_list(self, prg, popen_list):
"""
"""
Waiting for list of processes to finish
"""
"""
try:
job_start = time.time()
self.log.info("Waiting for %s processes to finish..." % (prg))
run_popens = popen_list
while True:
if len(run_popens) == 0: break
finished_units = [u for u in run_popens if u.poll() is not None]
run_units = [u.pid for u in run_popens if u.poll() is None]
run_popens = [u for u in run_popens if u.poll() is None]
for fu in finished_units:
if fu.returncode != 0:
self.log.exception("Oops... %s has crashed!\npid=%d, Status=%s" % (prg, fu.pid, fu.returncode))
else: self.waiting(prg, fu)
if len(run_units) > 0 and len(finished_units) > 0:
self.log.info("Still running [%d]: %s" % (len(run_units), run_units))
job_end = time.time()
job_total_time = job_end - job_start
self.log.info("Processes of %s have finished at UTC %s, Waiting time: %.1f s (%.2f hrs)" % \
(prg, time.asctime(time.gmtime()), job_total_time, job_total_time/3600.))
self.log.info("")
except Exception:
self.log.exception("Oops... %s has crashed!\npids = %s" % (prg, ",".join(["%d" % (fu.pid) for fu in popen_list if fu.poll() is not None])))
raise Exception
"""
def power_of_two(self, value):
"""
Returns the closest power of two value to the input value (from the low side)
"""
return int(math.pow(2, math.floor(math.log(value)/math.log(2))))
def get_best_nbins(self, parf):
"""
Calculates the best number of bins for folding based on the period value from the parfile (parf)
and sampling interval (tsamp)
"""
try:
f = open(parf, 'r')
parlines = f.read().splitlines()
f.close()
res=[ii for ii in parlines if re.search("F0", ii) is not None]
if len(res) > 0:
f0=float(re.sub("\s+", " ", res[0]).split(" ")[1])
nbins=self.power_of_two(int(math.ceil((1000.0/f0)/self.sampling)))
else:
res=[ii for ii in parlines if re.search("P0", ii) is not None]
if len(res) > 0:
p0=float(re.sub("\s+", " ", res[0]).split(" ")[1])
nbins=self.power_of_two(int(math.ceil(p0*1000.0/self.sampling)))
else:
nbins=1024
if nbins > 1024: return 1024
else: return nbins
except: return 1024
def get_psr_dm(self, parf):
"""
Reads parfile and returns pulsar DM
"""
dm=0
try:
f = open(parf, 'r')
parlines = f.read().splitlines()
f.close()
res=[ii for ii in parlines if re.search("DM", ii) is not None and re.search("DMEPOCH", ii) is None]
if len(res) > 0:
dm=float(re.sub("\s+", " ", res[0]).split(" ")[1])
except: pass
return dm
def lcd(self, low, high):
"""
Calculates the lowest common denominator of 'high' value between 'low' and 'high'
"""
for ii in range(low, high+1):
if high % ii == 0: return ii
return high
def hcd(self, low, high, value):
"""
Calculates the highest common denominator of 'value' value between 'low' and 'high'
"""
for ii in range(high, low-1, -1):
if value % ii == 0: return ii
return 1
# function that checks all processes in the list and kill them if they are still running
def kill(self):
if self.log != None: self.log.info("Killing all open processes for SAP=%d TAB=%d..." % (self.sapid, self.tabid))
for proc in self.procs:
if proc.poll() is None: # process is still running
proc.kill()
if proc != None: proc.communicate()
if proc != None: proc.poll()
self.procs = []
# refresh NFS mounting of locus node (loc) on hoover node
# by doing 'ls' command
def hoover_mounting(self, cep2, firstfile, loc):
uniqdir="/".join(firstfile.split("/")[0:-1]).split("/data/")[-1]
input_dir="%s/%s_data/%s" % (cep2.hoover_data_dir, loc, uniqdir)
process = Popen(shlex.split("ls %s" % (input_dir)), stdout=PIPE, stderr=STDOUT)
process.communicate()
return input_dir
# running prepfold(s) for all specified pulsars
# returning the Popen instances of prepfold calls
def start_prepfold(self, cmdline, total_chan):
if total_chan >= 256:
prepfold_nsubs = 512
else: prepfold_nsubs = total_chan
# prepfold_nsubs = total_chan
self.log.info("Getting proper value of nsubs in prepfold between %d and %d..." % (prepfold_nsubs, total_chan))
# calculating the least common denominator of total_chan starting from prepfold_nsubs
prepfold_nsubs = self.lcd(prepfold_nsubs, total_chan)
self.log.info("Number of subbands, -nsubs, for prepfold is %d" % (prepfold_nsubs))
self.log.info("Running prepfolds...")
prepfold_popens=[] # list of prepfold Popen objects
for psr in self.psrs: # pulsar list is empty if --nofold is used
psr2=re.sub(r'^[BJ]', '', psr)
prepfold_nbins=self.get_best_nbins("%s/%s.par" % (self.outdir, psr2))
# first running prepfold with mask (if --norfi was not set)
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_rfifind.mask" % (self.curdir, self.output_prefix)):
# we use ../../../ instead of self.outdir for the full-name of the parfile, because in this case prepfold crashes
# I suppose it happens because name of the file is TOO long for Tempo
cmd="prepfold -noscales -nooffsets -noxwin -psr %s -par ../../../%s.par -n %d -nsub %d -fine -nopdsearch -mask %s_rfifind.mask -o %s_%s %s %s.fits" % \
(psr, psr2, prepfold_nbins, prepfold_nsubs, self.output_prefix, psr, self.output_prefix, cmdline.opts.prepfold_extra_opts, self.output_prefix)
try: # if prepfold fails right away (sometimes happens with error like this:
# Read 0 set(s) of polycos for PSR 1022+1001 at 56282.138888888891 (DM = 3.6186e-317)
# MJD 56282.139 out of range ( 0.000 to 0.000)
# isets = 0
# I guess something to do with how Tempo treats parfile. When this happens, we try to rerun prepfold with the same
# command but without using -par option
prepfold_popen = self.start_and_go(cmd, workdir=self.curdir, immediate_status_check=True)
except Exception:
self.log.warning("***** Prepfold failed when using par-file. Will try the same command but without using -par option *****")
cmd="prepfold -noscales -nooffsets -noxwin -psr %s -n %d -nsub %d -fine -nopdsearch -mask %s_rfifind.mask -o %s_%s %s %s.fits" % \
(psr, prepfold_nbins, prepfold_nsubs, self.output_prefix, psr, self.output_prefix, cmdline.opts.prepfold_extra_opts, self.output_prefix)
prepfold_popen = self.start_and_go(cmd, workdir=self.curdir)
prepfold_popens.append(prepfold_popen)
time.sleep(5) # will sleep for 5 secs, in order to give prepfold enough time to finish
# with temporary files lile resid2.tmp otherwise it can interfere with next prepfold call
# running prepfold without mask
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_rfifind.mask" % (self.curdir, self.output_prefix)):
output_stem="_nomask"
else: output_stem=""
# we use ../../../ instead of self.outdir for the full-name of the parfile, because in this case prepfold crashes
# I suppose it happens because name of the file is TOO long for Tempo
cmd="prepfold -noscales -nooffsets -noxwin -psr %s -par ../../../%s.par -n %d -nsub %d -fine -nopdsearch -o %s_%s%s %s %s.fits" % \
(psr, psr2, prepfold_nbins, prepfold_nsubs, psr, self.output_prefix, output_stem, cmdline.opts.prepfold_extra_opts, self.output_prefix)
try: # same reasoning as above
prepfold_popen = self.start_and_go(cmd, workdir=self.curdir, immediate_status_check=True)
except Exception:
self.log.warning("***** Prepfold failed when using par-file. Will try the same command but without using -par option *****")
cmd="prepfold -noscales -nooffsets -noxwin -psr %s -n %d -nsub %d -fine -nopdsearch -o %s_%s%s %s %s.fits" % \
(psr, prepfold_nbins, prepfold_nsubs, psr, self.output_prefix, output_stem, cmdline.opts.prepfold_extra_opts, self.output_prefix)
prepfold_popen = self.start_and_go(cmd, workdir=self.curdir)
prepfold_popens.append(prepfold_popen)
time.sleep(5) # again will sleep for 5 secs, in order to give prepfold enough time to finish
# with temporary files like resid2.tmp otherwise it can interfere with next prepfold call
return prepfold_popens
# making prepfold diagnostic plots
def make_prepfold_plots(self, obs):
# running convert on prepfold ps to pdf and png
self.log.info("Running convert on ps to pdf and png of the plots...")
prepfold_ps=glob.glob("%s/*.pfd.ps" % (self.curdir))
for psfile in prepfold_ps:
base=psfile.split("/")[-1].split(".ps")[0]
cmd="convert %s.ps %s.pdf" % (base, base)
# have separate try/except blocks for each convert command to allow to continue processing
# in case something is wrong with ps-file
try:
self.execute(cmd, workdir=self.curdir)
except: pass
cmd="convert -rotate 90 %s.ps %s.png" % (base, base)
try:
self.execute(cmd, workdir=self.curdir)
except : pass
cmd="convert -rotate 90 -crop 200x140-0 %s.ps %s.th.png" % (base, base)
try:
self.execute(cmd, workdir=self.curdir)
except: pass
# getting the list of *.pfd.bestprof files and read chi-sq values for all folded pulsars
psr_bestprofs=rglob(self.outdir, "*.pfd.bestprof", 3)
if len(psr_bestprofs) > 0:
self.log.info("Reading chi-squared values and adding to chi-squared.txt...")
# also preparing montage command to create combined plot
montage_cmd="montage -background none -pointsize 10.2 "
# reading log file to get RFI fraction from rfifind
logf = open(self.log.get_logfile(), 'r')
rfifracs = [ff.split("(")[1].split("%")[0].lstrip() for ff in logf.read().splitlines() if re.search("Number of bad", ff) is not None]
logf.close()
if np.size(rfifracs) == 0: rfifrac="" # in case we did not run rfifind
elif np.size(rfifracs) > 1: rfifrac = rfifracs[-1]
else: rfifrac = rfifracs[0]
chif=open("%s/%s_sap%03d_tab%04d_stokes%d_part%03d_chi-squared.txt" % (self.outdir, obs.id, self.sapid, self.tabid, self.stokes_index, self.part), 'w')
thumbs=[] # list of thumbnail files
# check first if all available *bestprof files are those created without mask. If so, then allow to make
# a diagnostic combined plot using prepfold plots without a mask
good_bestprofs=[file for file in psr_bestprofs if re.search("_nomask_", file) is None]
if len(good_bestprofs) == 0:
good_bestprofs=[file for file in psr_bestprofs]
for bp in good_bestprofs:
psr=bp.split("/")[-1].split("_")[0]
thumbfile=bp.split(self.outdir+"/")[-1].split(".pfd.bestprof")[0] + ".pfd.th.png"
# getting current number for SAP and TA beam
try: # we need this try block in case User manually creates sub-directories with some test bestprof files there
# which will also be found by rglob function. So, we need to exclude them by catching an Exception
# on a wrong-formed string applying int()
cursapid=int(thumbfile.split("_SAP")[-1].split("_")[0])
except: continue
curprocdir=thumbfile.split("_SAP")[-1].split("_")[1]
if self.sapid != cursapid or self.procdir != curprocdir:
continue
thumbs.append(thumbfile)
# calculating the S/N (profile significance) from the bestprof file
snr=0.0
try:
snrlog="snr-presto.log"
if not os.path.exists("%s/%s" % (self.curdir, snrlog)):
cmd="snr.py --presto --snrmethod=Off --auto-off --plot --saveonly %s | tee %s/%s" % (bp, self.curdir, snrlog)
self.execute(cmd, workdir=self.curdir, is_os=True)
tmp = np.genfromtxt("%s/%s" % (self.curdir, snrlog), skip_header=13, skip_footer=2, usecols=(4,4), dtype=float, unpack=True)[0]
snr = float(tmp[0])
except:
if self.sapid == cursapid and self.procdir == curprocdir:
self.log.warning("Warning: can't read file %s or calculate S/N of the profile" % (bp))
chif.write("file=%s obs=%s_SAP%d_%s_%s S/N=%g%s\n" % (thumbfile, self.code, cursapid, curprocdir, psr, snr, rfifrac != "" and " RFI=%s" % (rfifrac) or ""))
montage_cmd += "-label '%s SAP%d %s\n%s\nS/N = %g' %s " % (self.code, cursapid, curprocdir, psr, snr, thumbfile)
chif.close()
cmd="mv %s_sap%03d_tab%04d_stokes%d_part%03d_chi-squared.txt chi-squared.txt" % (obs.id, self.sapid, self.tabid, self.stokes_index, self.part)
self.execute(cmd, workdir=self.outdir)
# creating combined plots
# only creating combined plo % (snrtmpdir)ts when ALL corresponding thumbnail files exist. It is possible, when there are 2+ beams on
# the same node, that bestprof files do exist, but thumbnails were not created yet at the time when chi-squared.txt is
# getting created for another beam. And this will cause "montage" command to fail
# At the end combined plot will eventually be created for this node during the procesing of the last beam of this node
if len(thumbs) > 0 and len([ff for ff in thumbs if os.path.exists(ff)]) == len(thumbs):
# creating combined plots
self.log.info("Combining all pfd.th.png files in a single combined plot...")
montage_cmd += "combined.png"
self.execute(montage_cmd, workdir=self.outdir)
# making thumbnail version of the combined plot
cmd="convert -resize 200x140 -bordercolor none -border 150 -gravity center -crop 200x140-0-0 +repage combined.png combined.th.png"
self.execute(cmd, workdir=self.outdir)
# running single-pulse analysis (prepdata + single_pulse_search.py) for all specified pulsars
def run_prepdata(self, cmdline):
self.log.info("Running single-pulse analysis (prepdata + single_pulse_search.py for a pulsar(s) DM")
for psr in self.psrs: # pulsar list is empty if --nofold is used
psr2=re.sub(r'^[BJ]', '', psr)
if cmdline.opts.is_nofold:
psrdm = 0.0
else:
psrdm=self.get_psr_dm("%s/%s.par" % (self.outdir, psr2))
# running prepdata with mask (if --norfi was not set)
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_rfifind.mask" % (self.curdir, self.output_prefix)):
cmd="prepdata -noscales -nooffsets -noclip -nobary -dm %f -mask %s_rfifind.mask -o %s_%s %s %s.fits" % \
(psrdm, self.output_prefix, psr, self.output_prefix, cmdline.opts.prepdata_extra_opts, self.output_prefix)
else: # running prepdata without mask
cmd="prepdata -noscales -nooffsets -noclip -nobary -dm %f -o %s_%s %s %s.fits" % \
(psrdm, psr, self.output_prefix, cmdline.opts.prepdata_extra_opts, self.output_prefix)
self.execute(cmd, workdir=self.curdir)
# running single_pulse_search.py on .dat file (created either with mask or not)
cmd="single_pulse_search_lotaas.py --no-width-label %s_%s.dat" % (psr, self.output_prefix)
self.execute(cmd, workdir=self.curdir)