Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions sotodlib/preprocess/pcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ def __init__(self, step_cfgs):
self.save_cfgs = step_cfgs.get("save")
self.select_cfgs = step_cfgs.get("select")
self.plot_cfgs = step_cfgs.get("plot")
self.skip_on_sim = step_cfgs.get("skip_on_sim", False)
def process(self, aman, proc_aman, sim=False):
self.skip_on_sim = step_cfgs.get("skip_on_sim")
self.use_data_aman = step_cfgs.get("use_data_aman", False)
def process(self, aman, proc_aman, sim=False, data_aman=None):
""" This function makes changes to the time ordered data AxisManager.
Ex: calibrating or detrending the timestreams. This function will use
any configuration information under the ``process`` key of the
Expand All @@ -56,6 +57,9 @@ def process(self, aman, proc_aman, sim=False):
sim: Bool
False by default when analyzing data. Should be True when doing
Transfer Function simulations and determining which steps should be run.
data_aman: AxisManager (Optional)
An AxisManager containing the preprocessed data to be used by
this process.
"""
if self.process_cfgs is None:
return aman, proc_aman
Expand Down Expand Up @@ -435,8 +439,14 @@ def extend(self, index, other):
super().extend( [self._check_item(item) for item in other])
def __setitem__(self, index, item):
super().__setitem__(index, self._check_item(item))
def __getitem__(self, index):
result = super().__getitem__(index)
if isinstance(index, slice):
return Pipeline(result)
else:
return result

def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False, data_amans=None):
"""
The main workhorse function for the pipeline class. This function takes
an AxisManager TOD and successively runs the pipeline of preprocessing
Expand Down Expand Up @@ -472,6 +482,11 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
given ``proc_aman`` is ``aman.preprocess``. This assumes
``process.calc_and_save()`` has been run on this aman before and
has injested flags and other information into ``proc_aman``.
data_amans: dict (Optional)
A dictionary of AxisManagers with keys (step, process.name)
filled with AxisManager processed up to step-1. This is used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear what "step-1" means here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry! Here step is the index of the process in a given config file. step-1 is then the process preceding it. (i.e. the step before getting the T2P template for example)

to pre-load all data AxisManager which could be required when
processing simulations (e.g. to provide a T2P template)

Returns
-------
Expand Down Expand Up @@ -520,10 +535,21 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):

success = 'end'
for step, process in enumerate(self):
if sim and (process.skip_on_sim is None):
raise ValueError(f"Process {process.name} missing required field `skip_on_sim`")
if sim and process.skip_on_sim:
continue
self.logger.debug(f"Running {process.name}")
aman, proc_aman = process.process(aman, proc_aman, sim)
if (data_amans is not None) and process.use_data_aman:
try:
data_aman = data_amans[step, process.name]
except KeyError:
raise KeyError(f"Requested to use data AxisManager for process {process.name} but not found in data_amans")
else:
if process.use_data_aman and sim:
raise ValueError(f"Process {process.name} requested to use data_aman but none was provided to Pipeline.run()")
data_aman = None
process.process(aman, proc_aman, sim, data_aman)
if run_calc:
aman, proc_aman = process.calc_and_save(aman, proc_aman)
process.plot(aman, proc_aman, filename=os.path.join(self.plot_dir, '{ctime}/{obsid}', f'{step+1}_{{name}}.png'))
Expand Down
147 changes: 117 additions & 30 deletions sotodlib/preprocess/preprocess_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,8 @@ def load_and_preprocess(obs_id, configs, context=None, dets=None, meta=None,
def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
dets=None, meta=None, no_signal=None,
logger=None, init_only=False,
ignore_cfg_check=False):
ignore_cfg_check=False,
stop_for_sims=False):
"""Loads the saved information from the preprocessing pipeline from a
reference and a dependent database and runs the processing section of
the pipeline for each.
Expand Down Expand Up @@ -583,6 +584,11 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
ignore_cfg_check : bool
If True, do not attempt to validate that configs_init is the same as
the config used to create the existing init db.
stop_for_sims: bool
Optinal. If True, will stop before each step of the pipeline
with the flag `use_data_aman` set to True. The intended use is
to prepare all necessary data products that cannot be stored in
the preprocessing database, to process simulations.
Comment on lines +587 to +591
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data aman holds in the axismanager a full copy of the data for every preprocess step which has been specified with use_data_aman. This seems like it could balloon really quickly if the config file is improperly configured. Even 2 extra copies of the data gets pretty big. Perhaps there should be a check for if there's more than 2 or 3 steps which specify the data_aman that you warn the user and force them to acknowledge they're about to launch a job with ~2-3x in the normal memory usage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I only tested it for a couple of stops in the pipe. I think this is related to your other comments about only keeping what's necessary in the AxisManagers, I will propose something more memory efficient


Returns
-------
Expand All @@ -600,6 +606,21 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
configs_proc, context_proc = get_preprocess_context(configs_proc)
meta_proc = context_proc.get_meta(obs_id, dets=dets, meta=meta)

# Count number of stops
if stop_for_sims:
num_stops = 0
for process in configs_init["process_pipe"]:
if process.get("use_data_aman", False):
num_stops += 1
for process in configs_proc["process_pipe"]:
if process.get("use_data_aman", False):
num_stops += 1
logger.warning(
"Currently running with `stop_for_sims=True`. "
f"It will generate {num_stops} additional copies "
"of the data AxisManager with a higher memory usage."
)

group_by_init = np.atleast_1d(configs_init['subobs'].get('use', 'detset'))
group_by_proc = np.atleast_1d(configs_proc['subobs'].get('use', 'detset'))

Expand All @@ -611,7 +632,9 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
return None
else:
pipe_init = Pipeline(configs_init["process_pipe"], logger=logger)
aman_cfgs_ref = get_pcfg_check_aman(pipe_init)

if not ignore_cfg_check:
aman_cfgs_ref = get_pcfg_check_aman(pipe_init)

if (
ignore_cfg_check or
Expand All @@ -637,27 +660,97 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,

aman = context_init.get_obs(meta_init, no_signal=no_signal)
logger.info("Running initial pipeline")
pipe_init.run(aman, aman.preprocess, select=False)
if init_only:
return aman
if stop_for_sims:
out_amans_init = run_pipeline_stepgroups(
pipe_init,
aman,
run_last_step=not(init_only)
)
if init_only:
return out_amans_init
else:
pipe_init.run(aman, aman.preprocess, select=False)
if init_only:
return aman

logger.info("Running dependent pipeline")
if stop_for_sims:
aman = out_amans_init[(len(pipe_init), 'last')]
proc_aman = context_proc.get_meta(obs_id, meta=aman)

if 'valid_data' in aman.preprocess:
aman.preprocess.move('valid_data', None)
aman.preprocess.merge(proc_aman.preprocess)
pipe_proc.run(aman, aman.preprocess, select=False)
if stop_for_sims:
out_amans = run_pipeline_stepgroups(
pipe_proc,
out_amans_init[(len(pipe_init), 'last')],
)
out_amans.update({
(step, name): out_amans_init[(step, name)]
for (step, name) in out_amans_init
if name != 'last'
})
return out_amans
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this just be a dictionary of numpy arrays or restricted AxisManagers that only include the fields you need to reduce memory overhead?

Copy link
Contributor Author

@adrien-laposta adrien-laposta Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of doing this but fields to keep will depend on the filter and it will need to be defined somewhere. I find it a bit too verbose to be defined in the config files, I'm happy to take suggestions on this one. Also, this implementation follows the existing behaviour, i.e. loading a full data AxisManager in the filtering script and looping over all seeds to get filtered atomics.


return aman
else:
pipe_proc.run(aman, aman.preprocess, select=False)
return aman
else:
raise ValueError('Dependency check between configs failed.')


def run_pipeline_stepgroups(pipe, aman, run_last_step=False):
"""
Run a Pipeline object, grouping steps based on
the flag `use_data_aman` in the configuration
file.
Arguments
----------
pipe : Pipeline
Pipeline object to run.
aman : AxisManager
AxisManager to process.
run_last_step : bool
If True, will create a dict item containing the
AxisManager after run the full pipeline.
"""
batch_idx = [
(step, process.name)
for step, process in enumerate(pipe)
if process.use_data_aman
]
if batch_idx or run_last_step:
batch_idx = [(0, pipe[0].name)] + batch_idx
if run_last_step:
batch_idx += [(len(pipe), 'last')]
pipes = {}
for idx in range(len(batch_idx)-1):
start, start_name = batch_idx[idx]
end, end_name = batch_idx[idx+1]
# If asked to stop at the first process
# one needs to save the current state of
# the AxisManager
if end == 0:
pipes[end, end_name] = None
else:
pipes[end, end_name] = pipe[start:end]
out_amans = {}
loc_aman = aman.copy()
for (step, name), pipe in pipes.items():
if pipe is not None:
pipe.run(loc_aman, aman.preprocess, select=False)
out_amans[step, name] = loc_aman.copy()
return out_amans
else:
return {}


def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc,
sim_map, meta=None,
logger=None, init_only=False,
t2ptemplate_aman=None):
ignore_cfg_check=False,
data_amans=None):
"""Loads the saved information from the preprocessing pipeline from a
reference and a dependent database, loads the signal from a (simulated)
map into the AxisManager and runs the processing section of the pipeline
Expand Down Expand Up @@ -689,9 +782,14 @@ def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc,
Optional. Logger object or None will generate a new one.
init_only : bool
Optional. Whether or not to run the dependent pipeline.
t2ptemplate_aman : AxisManager
Optional. AxisManager to use as a template for t2p leakage
deprojection.
ignore_cfg_check : bool
If True, do not attempt to validate that configs_init is the same as
the config used to create the existing init db.
data_amans: dict (Optional)
A dictionary of AxisManagers with keys (step, process.name)
filled with AxisManager processed up to step-1. This is used
to pre-load all data AxisManager which could be required when
processing simulations (e.g. to provide a T2P template)

Returns
-------
Expand Down Expand Up @@ -719,10 +817,14 @@ def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc,
return None
else:
pipe_init = Pipeline(configs_init["process_pipe"], logger=logger)
aman_cfgs_ref = get_pcfg_check_aman(pipe_init)

if check_cfg_match(aman_cfgs_ref, meta_proc.preprocess['pcfg_ref'],
logger=logger):
if not ignore_cfg_check:
aman_cfgs_ref = get_pcfg_check_aman(pipe_init)

if ignore_cfg_check or check_cfg_match(
aman_cfgs_ref,
meta_proc.preprocess['pcfg_ref'],
logger=logger):
pipe_proc = Pipeline(configs_proc["process_pipe"], logger=logger)

logger.info("Restricting detectors on all proc pipeline processes")
Expand Down Expand Up @@ -755,27 +857,12 @@ def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc,
if init_only:
return aman

if t2ptemplate_aman is not None:
# Replace Q,U with simulated timestreams
t2ptemplate_aman.wrap("demodQ", aman.demodQ, [(0, 'dets'), (1, 'samps')], overwrite=True)
t2ptemplate_aman.wrap("demodU", aman.demodU, [(0, 'dets'), (1, 'samps')], overwrite=True)

t2p_aman = t2pleakage.get_t2p_coeffs(
t2ptemplate_aman,
merge_stats=False
)
t2pleakage.subtract_t2p(
aman,
t2p_aman,
T_signal=t2ptemplate_aman.dsT
)

logger.info("Running dependent pipeline")
proc_aman = context_proc.get_meta(obs_id, meta=aman)
if 'valid_data' in aman.preprocess:
aman.preprocess.move('valid_data', None)
aman.preprocess.merge(proc_aman.preprocess)
pipe_proc.run(aman, aman.preprocess, sim=True)
pipe_proc.run(aman, aman.preprocess, sim=True, data_amans=data_amans)

return aman
else:
Expand Down
Loading