-
Notifications
You must be signed in to change notification settings - Fork 22
Generalize data-dependent processes for simulations #1413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0175b8e
d305941
4b9ac4a
61e1e2a
5fca3c8
87c9662
e29df6f
d6183d7
e239588
d6ef354
20eaa20
b8a241b
3adba77
cace604
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -549,7 +549,8 @@ def load_and_preprocess(obs_id, configs, context=None, dets=None, meta=None, | |
| def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc, | ||
| dets=None, meta=None, no_signal=None, | ||
| logger=None, init_only=False, | ||
| ignore_cfg_check=False): | ||
| ignore_cfg_check=False, | ||
| stop_for_sims=False): | ||
| """Loads the saved information from the preprocessing pipeline from a | ||
| reference and a dependent database and runs the processing section of | ||
| the pipeline for each. | ||
|
|
@@ -583,6 +584,11 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc, | |
| ignore_cfg_check : bool | ||
| If True, do not attempt to validate that configs_init is the same as | ||
| the config used to create the existing init db. | ||
| stop_for_sims: bool | ||
| Optinal. If True, will stop before each step of the pipeline | ||
| with the flag `use_data_aman` set to True. The intended use is | ||
| to prepare all necessary data products that cannot be stored in | ||
| the preprocessing database, to process simulations. | ||
|
Comment on lines
+587
to
+591
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The data aman holds in the axismanager a full copy of the data for every preprocess step which has been specified with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I only tested it for a couple of stops in the pipe. I think this is related to your other comments about only keeping what's necessary in the AxisManagers, I will propose something more memory efficient |
||
|
|
||
| Returns | ||
| ------- | ||
|
|
@@ -600,6 +606,21 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc, | |
| configs_proc, context_proc = get_preprocess_context(configs_proc) | ||
| meta_proc = context_proc.get_meta(obs_id, dets=dets, meta=meta) | ||
|
|
||
| # Count number of stops | ||
| if stop_for_sims: | ||
| num_stops = 0 | ||
| for process in configs_init["process_pipe"]: | ||
| if process.get("use_data_aman", False): | ||
| num_stops += 1 | ||
| for process in configs_proc["process_pipe"]: | ||
| if process.get("use_data_aman", False): | ||
| num_stops += 1 | ||
| logger.warning( | ||
| "Currently running with `stop_for_sims=True`. " | ||
| f"It will generate {num_stops} additional copies " | ||
| "of the data AxisManager with a higher memory usage." | ||
| ) | ||
|
|
||
| group_by_init = np.atleast_1d(configs_init['subobs'].get('use', 'detset')) | ||
| group_by_proc = np.atleast_1d(configs_proc['subobs'].get('use', 'detset')) | ||
|
|
||
|
|
@@ -611,7 +632,9 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc, | |
| return None | ||
| else: | ||
| pipe_init = Pipeline(configs_init["process_pipe"], logger=logger) | ||
| aman_cfgs_ref = get_pcfg_check_aman(pipe_init) | ||
|
|
||
| if not ignore_cfg_check: | ||
| aman_cfgs_ref = get_pcfg_check_aman(pipe_init) | ||
|
|
||
| if ( | ||
| ignore_cfg_check or | ||
|
|
@@ -637,27 +660,97 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc, | |
|
|
||
| aman = context_init.get_obs(meta_init, no_signal=no_signal) | ||
| logger.info("Running initial pipeline") | ||
| pipe_init.run(aman, aman.preprocess, select=False) | ||
| if init_only: | ||
| return aman | ||
| if stop_for_sims: | ||
| out_amans_init = run_pipeline_stepgroups( | ||
| pipe_init, | ||
| aman, | ||
| run_last_step=not(init_only) | ||
| ) | ||
| if init_only: | ||
| return out_amans_init | ||
| else: | ||
| pipe_init.run(aman, aman.preprocess, select=False) | ||
| if init_only: | ||
| return aman | ||
|
|
||
| logger.info("Running dependent pipeline") | ||
| if stop_for_sims: | ||
| aman = out_amans_init[(len(pipe_init), 'last')] | ||
| proc_aman = context_proc.get_meta(obs_id, meta=aman) | ||
|
|
||
| if 'valid_data' in aman.preprocess: | ||
| aman.preprocess.move('valid_data', None) | ||
| aman.preprocess.merge(proc_aman.preprocess) | ||
| pipe_proc.run(aman, aman.preprocess, select=False) | ||
| if stop_for_sims: | ||
| out_amans = run_pipeline_stepgroups( | ||
| pipe_proc, | ||
| out_amans_init[(len(pipe_init), 'last')], | ||
| ) | ||
| out_amans.update({ | ||
| (step, name): out_amans_init[(step, name)] | ||
| for (step, name) in out_amans_init | ||
| if name != 'last' | ||
| }) | ||
| return out_amans | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this just be a dictionary of numpy arrays or restricted AxisManagers that only include the fields you need to reduce memory overhead?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking of doing this but fields to keep will depend on the filter and it will need to be defined somewhere. I find it a bit too verbose to be defined in the config files, I'm happy to take suggestions on this one. Also, this implementation follows the existing behaviour, i.e. loading a full data AxisManager in the filtering script and looping over all seeds to get filtered atomics. |
||
|
|
||
| return aman | ||
| else: | ||
| pipe_proc.run(aman, aman.preprocess, select=False) | ||
| return aman | ||
| else: | ||
| raise ValueError('Dependency check between configs failed.') | ||
|
|
||
|
|
||
| def run_pipeline_stepgroups(pipe, aman, run_last_step=False): | ||
| """ | ||
| Run a Pipeline object, grouping steps based on | ||
| the flag `use_data_aman` in the configuration | ||
| file. | ||
| Arguments | ||
| ---------- | ||
| pipe : Pipeline | ||
| Pipeline object to run. | ||
| aman : AxisManager | ||
| AxisManager to process. | ||
| run_last_step : bool | ||
| If True, will create a dict item containing the | ||
| AxisManager after run the full pipeline. | ||
| """ | ||
| batch_idx = [ | ||
| (step, process.name) | ||
| for step, process in enumerate(pipe) | ||
| if process.use_data_aman | ||
| ] | ||
| if batch_idx or run_last_step: | ||
| batch_idx = [(0, pipe[0].name)] + batch_idx | ||
| if run_last_step: | ||
| batch_idx += [(len(pipe), 'last')] | ||
| pipes = {} | ||
| for idx in range(len(batch_idx)-1): | ||
| start, start_name = batch_idx[idx] | ||
| end, end_name = batch_idx[idx+1] | ||
| # If asked to stop at the first process | ||
| # one needs to save the current state of | ||
| # the AxisManager | ||
| if end == 0: | ||
| pipes[end, end_name] = None | ||
| else: | ||
| pipes[end, end_name] = pipe[start:end] | ||
| out_amans = {} | ||
| loc_aman = aman.copy() | ||
| for (step, name), pipe in pipes.items(): | ||
| if pipe is not None: | ||
| pipe.run(loc_aman, aman.preprocess, select=False) | ||
| out_amans[step, name] = loc_aman.copy() | ||
| return out_amans | ||
| else: | ||
| return {} | ||
|
|
||
|
|
||
| def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc, | ||
| sim_map, meta=None, | ||
| logger=None, init_only=False, | ||
| t2ptemplate_aman=None): | ||
| ignore_cfg_check=False, | ||
| data_amans=None): | ||
| """Loads the saved information from the preprocessing pipeline from a | ||
| reference and a dependent database, loads the signal from a (simulated) | ||
| map into the AxisManager and runs the processing section of the pipeline | ||
|
|
@@ -689,9 +782,14 @@ def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc, | |
| Optional. Logger object or None will generate a new one. | ||
| init_only : bool | ||
| Optional. Whether or not to run the dependent pipeline. | ||
| t2ptemplate_aman : AxisManager | ||
| Optional. AxisManager to use as a template for t2p leakage | ||
| deprojection. | ||
| ignore_cfg_check : bool | ||
| If True, do not attempt to validate that configs_init is the same as | ||
| the config used to create the existing init db. | ||
| data_amans: dict (Optional) | ||
| A dictionary of AxisManagers with keys (step, process.name) | ||
| filled with AxisManager processed up to step-1. This is used | ||
| to pre-load all data AxisManager which could be required when | ||
| processing simulations (e.g. to provide a T2P template) | ||
|
|
||
| Returns | ||
| ------- | ||
|
|
@@ -719,10 +817,14 @@ def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc, | |
| return None | ||
| else: | ||
| pipe_init = Pipeline(configs_init["process_pipe"], logger=logger) | ||
| aman_cfgs_ref = get_pcfg_check_aman(pipe_init) | ||
|
|
||
| if check_cfg_match(aman_cfgs_ref, meta_proc.preprocess['pcfg_ref'], | ||
| logger=logger): | ||
| if not ignore_cfg_check: | ||
| aman_cfgs_ref = get_pcfg_check_aman(pipe_init) | ||
|
|
||
| if ignore_cfg_check or check_cfg_match( | ||
| aman_cfgs_ref, | ||
| meta_proc.preprocess['pcfg_ref'], | ||
| logger=logger): | ||
| pipe_proc = Pipeline(configs_proc["process_pipe"], logger=logger) | ||
|
|
||
| logger.info("Restricting detectors on all proc pipeline processes") | ||
|
|
@@ -755,27 +857,12 @@ def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc, | |
| if init_only: | ||
| return aman | ||
|
|
||
| if t2ptemplate_aman is not None: | ||
| # Replace Q,U with simulated timestreams | ||
| t2ptemplate_aman.wrap("demodQ", aman.demodQ, [(0, 'dets'), (1, 'samps')], overwrite=True) | ||
| t2ptemplate_aman.wrap("demodU", aman.demodU, [(0, 'dets'), (1, 'samps')], overwrite=True) | ||
|
|
||
| t2p_aman = t2pleakage.get_t2p_coeffs( | ||
| t2ptemplate_aman, | ||
| merge_stats=False | ||
| ) | ||
| t2pleakage.subtract_t2p( | ||
| aman, | ||
| t2p_aman, | ||
| T_signal=t2ptemplate_aman.dsT | ||
| ) | ||
|
|
||
| logger.info("Running dependent pipeline") | ||
| proc_aman = context_proc.get_meta(obs_id, meta=aman) | ||
| if 'valid_data' in aman.preprocess: | ||
| aman.preprocess.move('valid_data', None) | ||
| aman.preprocess.merge(proc_aman.preprocess) | ||
| pipe_proc.run(aman, aman.preprocess, sim=True) | ||
| pipe_proc.run(aman, aman.preprocess, sim=True, data_amans=data_amans) | ||
|
|
||
| return aman | ||
| else: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unclear what "step-1" means here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry! Here
stepis the index of the process in a given config file.step-1is then the process preceding it. (i.e. the step before getting the T2P template for example)