diff --git a/python/analyzer.py b/python/analyzer.py index 906d11a..437e3f4 100644 --- a/python/analyzer.py +++ b/python/analyzer.py @@ -1,3 +1,4 @@ +from calendar import c import os import sys import traceback @@ -10,17 +11,28 @@ import python.l1THistos as Histos import python.tree_reader as treereader from python import collections, timecounter +import dask +import random +import time +from dask.distributed import Client, progress # @profile analyze_counter = 1 +import threading def analyze(params, batch_idx=-1): params.print() debug = int(params.debug) - input_files = [] range_ev = (0, params.maxEvents) + + # ------------------------- PRINT KEYS ------------------------------ + + for key, value in params.items(): + print("KEY:", key , " VALUE: ", value) + + # ------------------------- READ FILES ------------------------------ if params.events_per_job == -1: pprint('This is interactive processing...') @@ -47,22 +59,25 @@ def analyze(params, batch_idx=-1): pprint('') files_with_protocol = [fm.get_eos_protocol(file_name) + file_name for file_name in input_files] + + client = Client(threads_per_worker=4, n_workers=1) + + # -------------------------CALIBRATIONS ------------------------------ calib_manager = calibs.CalibManager() calib_manager.set_calibration_version(params.calib_version) if params.rate_pt_wps: calib_manager.set_pt_wps_version(params.rate_pt_wps) + + # -------------------------BOOK HISTOS------------------------------ output = up.recreate(params.output_filename) hm = Histos.HistoManager() hm.file = output - - # instantiate all the plotters + plotter_collection = [] plotter_collection.extend(params.plotters) - - # -------------------------BOOK HISTOS------------------------------ - + for plotter in plotter_collection: plotter.book_histos() @@ -73,52 +88,66 @@ def analyze(params, batch_idx=-1): # -------------------------EVENT LOOP-------------------------------- - tree_reader = treereader.TreeReader(range_ev, params.maxEvents) pprint('') pprint(f"{'events_per_job':<15}: {params.events_per_job}") pprint(f"{'maxEvents':<15}: {params.maxEvents}") pprint(f"{'range_ev':<15}: {range_ev}") pprint('') - for tree_file_name in files_with_protocol: - tree_file = up.open(tree_file_name, num_workers=1) - pprint(f'opening file: {tree_file_name}') - pprint(f' . tree name: {params.tree_name}') + print("Creating dask-delayed objects for file reading...") + files_dask_delayed = [] + tree_reader_instances = [] + for file in files_with_protocol: + single_tree_reader = treereader.TreeReader(range_ev, params.maxEvents) + single_file_dd = dask.delayed(process_file(file, params, single_tree_reader, debug, collection_manager, plotter_collection, hm)) + files_dask_delayed.append(single_file_dd) + tree_reader_instances.append(single_tree_reader) + + start_time_reading_files = time.time() + print("Reading .ROOT files in parallel...") + dask.compute(files_dask_delayed) + finish_time_reading_files = time.time() + print("Finished reading .ROOT files in parallel! Took: ", finish_time_reading_files - start_time_reading_files, " s.") - ttree = tree_file[params.tree_name] + # ------------------------- WRITING HISTOGRAMS -------------------------------- - tree_reader.setTree(ttree) + pprint(f'Writing histos to file {params.output_filename}') + start_time_writing_histograms = time.time() - while tree_reader.next(debug): - try: - collection_manager.read(tree_reader, debug) + hm.writeHistos() + output.close() + + finish_time_writing_histograms = time.time() + print("Writing histos to file FINISHED! Took: ", finish_time_writing_histograms - start_time_writing_histograms, " s.") - for plotter in plotter_collection: - plotter.fill_histos_event(tree_reader.file_entry, debug=debug) + # ------------------------- TOTAL ENTRIES OUTPUT -------------------------------- - if ( - batch_idx != -1 - and timecounter.counter.started() - and tree_reader.global_entry % 100 == 0 - and timecounter.counter.job_flavor_time_left(params.htc_jobflavor) < 5 * 60 - ): - tree_reader.printEntry() - pprint(' less than 5 min left for batch slot: exit event loop!') - timecounter.counter.job_flavor_time_perc(params.htc_jobflavor) - break + total_entries = 0 + for tree_reader_instance in tree_reader_instances: + total_entries += tree_reader_instance.n_tot_entries - except Exception as inst: - tree_reader.printEntry() - pprint(f'[EXCEPTION OCCURRED:] {inst!s}') - pprint('Unexpected error:', sys.exc_info()[0]) - traceback.print_exc() - tree_file.close() - sys.exit(200) + return total_entries - tree_file.close() +def process_file(file_name, params, tree_reader, debug, collection_manager, plotter_collection, hm): + tree_file = up.open(file_name, num_workers=1) + pprint(f'[process_file] opening file: {file_name}') + pprint(f'[process_file] . tree name: {params.tree_name}') - pprint(f'Writing histos to file {params.output_filename}') - hm.writeHistos() - output.close() + ttree = tree_file[params.tree_name] + tree_reader.setTree(ttree) + + while tree_reader.next(debug): + try: + collection_manager.read(tree_reader, debug) + + for plotter in plotter_collection: + plotter.fill_histos_event(tree_reader.file_entry, debug=debug) + + except Exception as inst: + pprint(f'[EXCEPTION OCCURRED:] {inst!s}') + pprint('Unexpected error:', sys.exc_info()[0]) + return 1 - return tree_reader.n_tot_entries + tree_file.close() + + return 0 \ No newline at end of file diff --git a/python/boost_hist.py b/python/boost_hist.py index a570535..574eff9 100644 --- a/python/boost_hist.py +++ b/python/boost_hist.py @@ -1,7 +1,6 @@ -import awkward as ak +import hist.dask as dah import hist -from hist import Hist - +import awkward as ak def TH1F(name, title, nbins, bin_low, bin_high): b_axis_name = 'X' @@ -10,12 +9,13 @@ def TH1F(name, title, nbins, bin_low, bin_high): b_axis_name = title_split[1] b_name = title_split[0] b_label = name - return Hist( - hist.axis.Regular(bins=nbins, start=bin_low, stop=bin_high, name=b_axis_name), - label=b_label, - name=b_name, - storage=hist.storage.Weight() - ) + + return hist.dask.Hist( + hist.axis.Regular(bins=nbins, start=bin_low, stop=bin_high, name=b_axis_name), + label=b_label, + name=b_name, + storage=hist.storage.Weight() + ) def TH2F(name, title, x_nbins, x_bin_low, x_bin_high, y_nbins, y_bin_low, y_bin_high): b_x_axis_name = 'X' @@ -27,51 +27,32 @@ def TH2F(name, title, x_nbins, x_bin_low, x_bin_high, y_nbins, y_bin_low, y_bin_ b_y_axis_name = title_split[2] b_name = title_split[0] b_label = name - return Hist( - hist.axis.Regular(bins=x_nbins, start=x_bin_low, stop=x_bin_high, name=b_x_axis_name), - hist.axis.Regular(bins=y_nbins, start=y_bin_low, stop=y_bin_high, name=b_y_axis_name), - label=b_label, - name=b_name, - storage=hist.storage.Weight() - ) - - -def TH2F_category(name, title, x_categories, y_nbins, y_bin_low, y_bin_high): - b_x_axis_name = 'X' - b_y_axis_name = 'Y' - title_split = title.split(';') - if len(title_split) > 1: - b_x_axis_name = title_split[1] - if len(title_split) > 2: - b_y_axis_name = title_split[2] - b_name = title_split[0] - b_label = name - return Hist( - hist.axis.StrCategory(x_categories, name=b_x_axis_name), - hist.axis.Regular(bins=y_nbins, start=y_bin_low, stop=y_bin_high, name=b_y_axis_name), - label=b_label, + + return hist.dask.Hist( + hist.axis.Regular(bins=x_nbins, start=x_bin_low, stop=x_bin_high, name=b_x_axis_name), + hist.axis.Regular(bins=y_nbins, start=y_bin_low, stop=y_bin_high, name=b_y_axis_name), + label=b_label, name=b_name, storage=hist.storage.Weight() ) - def fill_1Dhist(hist, array, weights=None): flar = ak.drop_none(ak.flatten(array)) + if weights is None: hist.fill(flar, threads=None) # ROOT.fill_1Dhist(hist=hist, array=flar) else: hist.fill(flar, weights) # ROOT.fill_1Dhist(hist=hist, array=flar, weights=weights) - + def fill_2Dhist(hist, arrayX, arrayY, weights=None): flar_x = ak.drop_none(ak.flatten(arrayX)) flar_y = ak.drop_none(ak.flatten(arrayY)) - + if weights is None: # ROOT.fill_2Dhist(hist=hist, arrayX=flar_x, arrayY=flar_y) hist.fill(flar_x, flar_y, threads=None) else: # ROOT.fill_2Dhist(hist=hist, arrayX=flar_x, arrayY=flar_y, weights=weights) - hist.fill(flar_x, flar_y, weights) - + hist.fill(flar_x, flar_y, weights) \ No newline at end of file diff --git a/python/l1THistos.py b/python/l1THistos.py index a1611c9..f8e41b9 100644 --- a/python/l1THistos.py +++ b/python/l1THistos.py @@ -84,6 +84,9 @@ def write(self, upfile): for histo in [a for a in dir(self) if a.startswith('h_')]: writeable_hist = getattr(self, histo) # print (f"Writing {histo} class {writeable_hist.__class__.__name__}") + name = writeable_hist.label + writeable_hist = writeable_hist.compute() + if 'GraphBuilder' in writeable_hist.__class__.__name__ : continue elif 'TH1' in writeable_hist.__class__.__name__ or 'TH2' in writeable_hist.__class__.__name__: @@ -93,7 +96,7 @@ def write(self, upfile): # print('ok') else: up_writeable_hist = up.to_writable(writeable_hist) - upfile[f'{dir_name}/{writeable_hist.label}'] = up_writeable_hist + upfile[f'{dir_name}/{name}'] = up_writeable_hist # def normalize(self, norm): # className = self.__class__.__name__ @@ -590,7 +593,7 @@ def __init__(self, name, root_file=None, debug=False): self.h_pfIsoPV = bh.TH1F(f'{name}_pfIsoPV', 'Iso; rel-iso^{PV}_{pf}', 100, 0, 2) self.h_n = bh.TH1F(f'{name}_n', '# objects per event', 100, 0, 100) self.h_compBdt = bh.TH1F(f'{name}_compBdt', 'BDT Score Comp ID', 50, 0, 1) - + BaseHistos.__init__(self, name, root_file, debug) def fill(self, egs): diff --git a/python/selections.py b/python/selections.py index cfe958a..afadec6 100644 --- a/python/selections.py +++ b/python/selections.py @@ -612,7 +612,8 @@ def compare_selections(sel1, sel2): wps = working_points_histomax[version] labels = ['LE', 'HE'] -wls = zip(wps, labels, strict=False) + +wls = zip(wps, labels) # for i, tphgc_egbdt_sel = [] diff --git a/python/tree_reader.py b/python/tree_reader.py index 871b3ed..3cc8ad6 100644 --- a/python/tree_reader.py +++ b/python/tree_reader.py @@ -5,6 +5,9 @@ import awkward as ak import vector +import coffea +from coffea.nanoevents import NanoEventsFactory, NanoAODSchema, BaseSchema + vector.register_awkward() class TreeReader: @@ -94,7 +97,7 @@ def getDataFrame(self, prefix, entry_block, fallback=None): if br.startswith(f'{prefix}_') and br != f'{prefix}_n'] names = ['_'.join(br.split('_')[1:]) for br in branches] - name_map = dict(zip(names, branches, strict=False)) + name_map = dict(zip(names, branches)) if len(branches) == 0: if fallback is not None: return self.getDataFrame(prefix=fallback, entry_block=entry_block) @@ -102,20 +105,32 @@ def getDataFrame(self, prefix, entry_block, fallback=None): print(f'stored branch prefixes are: {prefs}') raise ValueError(f'[TreeReader::getDataFrame] No branches with prefix: {prefix}') - akarray = self.tree.arrays(names, - library='ak', - aliases=name_map, - entry_start=self.file_entry, - entry_stop=self.file_entry+entry_block) + dask_akarray = NanoEventsFactory.from_root( + self.tree, + schemaclass=NanoAODSchema).events() + + #akarray = self.tree.arrays(names, + #library='ak', + #aliases=name_map, + #entry_start=self.file_entry, + #entry_stop=self.file_entry+entry_block) + #print("[0] prefix to select: ", prefix) + + dask_akarray = dask_akarray[prefix] + #print("[1] Selected fields from prefix", dask_akarray.fields) + + dask_akarray = dask_akarray[names] + + #print("[2] specific fields with names", dask_akarray.fields) + dask_akarray = dask_akarray[self.file_entry : self.file_entry + entry_block] - # print(akarray) records = {} - for field in akarray.fields: - records[field] = akarray[field] + for field in dask_akarray.fields: + records[field] = dask_akarray[field] if 'pt' in names and 'eta' in names and 'phi' in names: if 'mass' not in names and 'energy' not in names: - records['mass'] = 0.*akarray['pt'] + records['mass'] = 0.*dask_akarray['pt'] return vector.zip(records) return ak.zip(records) @@ -124,5 +139,4 @@ def getDataFrame(self, prefix, entry_block, fallback=None): # ele_rec = ak.zip({'pt': tkele.pt, 'eta': tkele.eta, 'phi': tkele.phi}, with_name="pippo") # this would allow to handle the records and assign behaviours.... - # return akarray - + # return akarray \ No newline at end of file