diff --git a/docs/source/howto/run_httomo.rst b/docs/source/howto/run_httomo.rst index f47305860..dd6d1f631 100644 --- a/docs/source/howto/run_httomo.rst +++ b/docs/source/howto/run_httomo.rst @@ -74,16 +74,22 @@ Once the appropriate step has been done, you will have access to the HTTomo CLI: Commands: check Check a YAML pipeline file for errors. + memory-check Estimate CPU memory requirements for processing input... run Run a processing pipeline defined in YAML on input data. -As can be seen from the output above, there are two HTTomo commands available: -:code:`check` and :code:`run`. +As can be seen from the output above, there are three HTTomo commands +available: :code:`check`, :code:`memory-check`, and :code:`run`. The :code:`check` command is used for checking a YAML process list file for errors, and is highly recommended to be run before attempting to run the pipeline. Please see :ref:`utilities_yamlchecker` for more information about the checks being performed, the help information that is printed, etc. +The :code:`memory-check` command is for estimating the CPU memory requirements +for processing the input data with a given pipeline and number of processes. +The underlying functionality was primarily developed for use with the DLS +HTTomo launcher, and has been exposed as a CLI command for convenience. + The :code:`run` command is used for running HTTomo with a pipeline on the given HDF5 input data. @@ -147,6 +153,66 @@ Options/flags The :code:`check` command has *no* options/flags. +The :code:`memory-check` command +++++++++++++++++++++++++++++++++ + +.. code-block:: console + + $ python -m httomo memory-check --help + Usage: python -m httomo memory-check [OPTIONS] IN_DATA_FILE PIPELINE NPROCS + + Estimate CPU memory requirements for processing input data with a given + pipeline and number of processes + + Options: + --help Show this message and exit. + + +Arguments +######### + +For :code:`memory-check` there are three *required* arguments: +:code:`IN_DATA_FILE`, :code:`PIPELINE`, and :code:`NPROCS`, and zero *optional* +arguments. + +:code:`IN_DATA_FILE` (required) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +This is the filepath to the HDF5 input data that is intended to be processed. +This is required primarily for querying the size of the input data. + +:code:`PIPELINE` (required) +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +This is the filepath to the YAML process list file that defines the processing +to be applied to the input data. + +This is required for several reasons: + +- any cropping of the data via the loader's :code:`preview` parameter will + affect the size of the data being processed +- any methods requiring padding will affect the size of the data being + processed +- any :ref:`info_reslice` in the pipeline will affect the amount of memory + required + +:code:`NPROCS` (required) +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +This is the number of processes the input data is intended to be processed +with. + +This is required primarily due to the number of processes affecting how the +input data is split up, and thus affects the allocations required for +processing the subsets of data. + +.. note:: The value of :code:`NPROCS` must be >= 1. + +Options/flags +############# + +The :code:`memory-check` command has *zero* options/flags. + The :code:`run` command +++++++++++++++++++++++ diff --git a/httomo/cli.py b/httomo/cli.py index 9817763fa..7df0cb7d2 100644 --- a/httomo/cli.py +++ b/httomo/cli.py @@ -8,6 +8,7 @@ import yaml import click +import h5py import shutil from mpi4py import MPI from loguru import logger @@ -16,9 +17,12 @@ from httomo.cli_utils import is_sweep_pipeline from httomo.logger import setup_logger from httomo.monitors import MONITORS_MAP, make_monitors +from httomo.runner.dataset_store_backing import estimate_section_memory from httomo.runner.pipeline import Pipeline +from httomo.runner.section import sectionize from httomo.sweep_runner.param_sweep_runner import ParamSweepRunner from httomo.transform_layer import TransformLayer +from httomo.transform_loader_params import parse_config, parse_preview from httomo.utils import log_exception, log_once, mpi_abort_excepthook from httomo.yaml_checker import ( validate_yaml_config, @@ -71,6 +75,32 @@ def main(): pass +@main.command() +@click.argument( + "in_data_file", + type=click.Path(exists=True, dir_okay=False, path_type=Path), +) +@click.argument( + "pipeline", + type=PipelineFilePathOrString( + types=[click.Path(exists=True, dir_okay=False, path_type=Path), click.STRING] + ), +) +@click.argument( + "nprocs", + type=click.IntRange(1), +) +def memory_check(in_data_file: Path, pipeline: Union[Path, str], nprocs: int): + """ + Estimate CPU memory requirements for processing input data with a given pipeline and number + of processes + """ + memory_peak = estimate_cpu_memory(in_data_file, pipeline, nprocs) + print( + f"Estimated peak CPU memory usage for {nprocs} process run: {(memory_peak * nprocs) / (1024 ** 3):.3f} GB" + ) + + @main.command() @click.argument( "pipeline", @@ -500,3 +530,42 @@ def execute_high_throughput_run( def execute_sweep_run(pipeline: Pipeline, global_comm: MPI.Comm) -> None: ParamSweepRunner(pipeline, global_comm).execute() + + +def estimate_cpu_memory(in_data_file: Path, pipeline_file: Path, nprocs: int) -> int: + pipeline = generate_pipeline( + in_data_file, pipeline_file, False, MPI.COMM_WORLD, PipelineFormat.Yaml + ) + sections = sectionize(pipeline) + config = yaml_loader(pipeline_file) + data_config, _, _, _, _ = parse_config(in_data_file, config[0]["parameters"]) + with h5py.File(in_data_file, "r") as f: + dataset = f[data_config.data_path] + dtype = dataset.dtype + full_shape = dataset.shape + + preview_config = parse_preview( + config[0]["parameters"].get("preview", None), full_shape + ) + previewed_shape = ( + preview_config.angles.stop - preview_config.angles.start, + preview_config.detector_y.stop - preview_config.detector_y.start, + preview_config.detector_x.stop - preview_config.detector_x.start, + ) + + section_memory_peak = 0 + for idx in range(len(sections)): + section_memory_peak = max( + section_memory_peak, + estimate_section_memory( + nprocs, + 0, + None, + dtype, + previewed_shape, + sections, + idx, + ), + ) + + return section_memory_peak diff --git a/httomo/data/dataset_store.py b/httomo/data/dataset_store.py index f8a65917c..be18196c0 100644 --- a/httomo/data/dataset_store.py +++ b/httomo/data/dataset_store.py @@ -287,7 +287,7 @@ def __init__( self._data = self._reslice(source.slicing_dim, slicing_dim, source_data) end = time.perf_counter() log_once( - f"reslice_memory_estimator: {reslice_memory_estimator(source_data.shape, source_data.dtype, source.slicing_dim, slicing_dim, self._comm)}", + f"reslice_memory_estimator: {reslice_memory_estimator(source_data.shape, source_data.dtype, source.slicing_dim, slicing_dim, self._comm.size, self._comm.rank, self._comm.allgather)}", level=logging.DEBUG, ) if slicing_dim == 1: diff --git a/httomo/data/hdf/_utils/reslice.py b/httomo/data/hdf/_utils/reslice.py index b7d88c4cb..648022c80 100644 --- a/httomo/data/hdf/_utils/reslice.py +++ b/httomo/data/hdf/_utils/reslice.py @@ -1,5 +1,6 @@ import logging -from typing import Tuple +from collections.abc import Callable +from typing import Any, Optional, Tuple, TypeAlias import numpy from mpi4py.MPI import Comm @@ -70,15 +71,18 @@ def reslice( return new_data, next_slice_dim, start_idx +AllGatherFunc: TypeAlias = Optional[Callable[[Any], list[Any]]] + + def reslice_memory_estimator( data_shape: Tuple[int, int, int], dtype: numpy.dtype, current_slice_dim: int, next_slice_dim: int, - comm: Comm, + nprocs: int, + rank: int, + allgather_func: AllGatherFunc, ) -> Tuple[int, int]: - rank = comm.rank - nprocs = comm.size itemsize = numpy.dtype(dtype).itemsize split_sizes = [] @@ -93,7 +97,10 @@ def reslice_memory_estimator( split_sizes.append(numpy.prod(split_shape) * itemsize) prev_idx = next_idx - all_split_sizes = comm.allgather(split_sizes) + if allgather_func is not None: + all_split_sizes = allgather_func(split_sizes) + else: + all_split_sizes = [split_sizes] * nprocs recv_sizes = [all_split_sizes[p][rank] for p in range(nprocs)] output_shape = list(data_shape) diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py index 7c69701ad..b31ce08ce 100644 --- a/httomo/runner/dataset_store_backing.py +++ b/httomo/runner/dataset_store_backing.py @@ -5,13 +5,14 @@ from numpy.typing import DTypeLike from mpi4py import MPI -from httomo.data.hdf._utils.reslice import reslice_memory_estimator +from httomo.data.hdf._utils.reslice import AllGatherFunc, reslice_memory_estimator from httomo.runner.section import Section, determine_section_padding from httomo.utils import _get_slicing_dim, make_3d_shape_from_shape def calculate_section_input_chunk_shape( - comm: MPI.Comm, + nprocs: int, + rank: int, global_shape: Tuple[int, int, int], slicing_dim: int, padding: Tuple[int, int], @@ -19,8 +20,8 @@ def calculate_section_input_chunk_shape( """ Calculate the shape of the section input chunk w/ or w/o padding. """ - start = round((global_shape[slicing_dim] / comm.size) * comm.rank) - stop = round((global_shape[slicing_dim] / comm.size) * (comm.rank + 1)) + start = round((global_shape[slicing_dim] / nprocs) * rank) + stop = round((global_shape[slicing_dim] / nprocs) * (rank + 1)) section_slicing_dim_len = stop - start shape = list(global_shape) shape[slicing_dim] = section_slicing_dim_len + padding[0] + padding[1] @@ -58,19 +59,21 @@ class DataSetStoreBacking(Enum): File = 2 -def determine_store_backing( - comm: MPI.Comm, - sections: List[Section], - memory_limit_bytes: int, +def estimate_section_memory( + nprocs: int, + rank: int, + allgather_func: AllGatherFunc, dtype: DTypeLike, global_shape: Tuple[int, int, int], + sections: List[Section], section_idx: int, -) -> DataSetStoreBacking: +) -> int: # Get chunk shape created by reader of section `n` (the current section) that will account # for padding. This chunk shape is based on the chunk shape written by the writer of # section `n - 1` (the previous section) padded_input_chunk_shape = calculate_section_input_chunk_shape( - comm=comm, + nprocs=nprocs, + rank=rank, global_shape=global_shape, slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, padding=determine_section_padding(sections[section_idx]), @@ -82,7 +85,8 @@ def determine_store_backing( # Get unpadded chunk shape input to current section (for calculation of bytes in output # chunk for the current section) input_chunk_shape = calculate_section_input_chunk_shape( - comm=comm, + nprocs=nprocs, + rank=rank, global_shape=global_shape, slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, padding=(0, 0), @@ -102,7 +106,7 @@ def determine_store_backing( # input to it (which would be the output chunk of the current section) reslice_bytes = 0 if ( - comm.size > 1 + nprocs > 1 and section_idx < len(sections) - 1 and sections[section_idx].pattern != sections[section_idx + 1].pattern ): @@ -111,18 +115,55 @@ def determine_store_backing( dtype, _get_slicing_dim(sections[section_idx].pattern), _get_slicing_dim(sections[section_idx + 1].pattern), - comm, + nprocs, + rank, + allgather_func, ) reslice_bytes += ring_algorithm_bytes + reslice_output_bytes + # TODO: The nature of the pinned memory allocations by cupy is currently under + # investigation, so a more precise calculation for its size is not yet known. + # + # It's known that this can grow quite large via allocations exceeding the current + # allocation being bumped to the next power of 2 (ie, a 16GiB allocation that is exceeded + # by 1 byte will have a 32GiB allocation made in addition to the original 16GiB). + # + # Taking half the input data size seems to be in the ballpark for what has been observed + # with larger datasets (ie, an 84GB dataset being processed took ~520GB of memory, and with + # this arbitrary choice of 0.5 as a multiplicative factor gets the estimated value to + # ~514GB) + CUPY_PINNED_CPU_MEMORY = int(0.5 * np.prod(global_shape) * np.dtype(dtype).itemsize) + + return ( + padded_input_chunk_bytes + + output_chunk_bytes + + reslice_bytes + + CUPY_PINNED_CPU_MEMORY + ) + + +def determine_store_backing( + comm: MPI.Comm, + sections: List[Section], + memory_limit_bytes: int, + dtype: DTypeLike, + global_shape: Tuple[int, int, int], + section_idx: int, +) -> DataSetStoreBacking: + section_memory = estimate_section_memory( + comm.size, + comm.rank, + comm.allgather, + dtype, + global_shape, + sections, + section_idx, + ) + send_buffer = np.zeros(1, dtype=bool) recv_buffer = np.zeros(1, dtype=bool) - if ( - memory_limit_bytes > 0 - and padded_input_chunk_bytes + output_chunk_bytes + reslice_bytes - >= memory_limit_bytes - ): + if memory_limit_bytes > 0 and section_memory >= memory_limit_bytes: send_buffer[0] = True # do a logical OR of all the enum variants across the processes diff --git a/tests/runner/test_dataset_store_backing.py b/tests/runner/test_dataset_store_backing.py index bb4123585..14a85c2c9 100644 --- a/tests/runner/test_dataset_store_backing.py +++ b/tests/runner/test_dataset_store_backing.py @@ -48,14 +48,9 @@ def test_calculate_section_chunk_shape( rank: int, section_slicing_dim: int, section_padding: Tuple[int, int], - mocker: MockerFixture, ): GLOBAL_SHAPE = (1801, 2160, 2560) - # Define mock communicator that reflects the desired data splitting/distribution to be - # tested - mock_global_comm = mocker.create_autospec(spec=MPI.Comm, size=nprocs, rank=rank) - # The chunk shape for the section should reflect the padding needed for that section expected_chunk_shape: List[int] = list(GLOBAL_SHAPE) start = round(GLOBAL_SHAPE[section_slicing_dim] / nprocs * rank) @@ -65,7 +60,8 @@ def test_calculate_section_chunk_shape( slicing_dim_len + section_padding[0] + section_padding[1] ) section_chunk_shape = calculate_section_input_chunk_shape( - comm=mock_global_comm, + nprocs=nprocs, + rank=rank, global_shape=GLOBAL_SHAPE, slicing_dim=section_slicing_dim, padding=section_padding, @@ -186,10 +182,10 @@ def test_calculate_section_chunk_bytes_output_dims_change_and_swap( @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (6 * 1024**2, DataSetStoreBacking.File), - (7 * 1024**2, DataSetStoreBacking.RAM), + (8 * 1024**2, DataSetStoreBacking.File), + (9 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["6MB-limit-file-backing", "7MB-limit-ram-backing"], + ids=["8MB-limit-file-backing", "9MB-limit-ram-backing"], ) def test_determine_store_backing_reslice_single_proc( mocker: MockerFixture, @@ -204,6 +200,7 @@ def test_determine_store_backing_reslice_single_proc( # - the write chunk ~3.4MB # - the read chunk also ~3.4MB # - reslice shouldn't occur due to running with a single process + # - the cupy pinned memory allocations is ~1.7MB DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) @@ -233,10 +230,10 @@ def test_determine_store_backing_reslice_single_proc( @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (6 * 1024**2, DataSetStoreBacking.File), - (7 * 1024**2, DataSetStoreBacking.RAM), + (8 * 1024**2, DataSetStoreBacking.File), + (9 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["6MB-limit-file-backing", "7MB-limit-ram-backing"], + ids=["8MB-limit-file-backing", "9MB-limit-ram-backing"], ) def test_determine_store_backing_no_reslice_single_proc( mocker: MockerFixture, @@ -250,6 +247,7 @@ def test_determine_store_backing_no_reslice_single_proc( # The dtype and shape combined makes: # - the write chunk ~3.4MB # - the read chunk also ~3.4MB + # - the cupy pinned memory allocations is ~1.7MB DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) @@ -288,10 +286,10 @@ def test_determine_store_backing_no_reslice_single_proc( @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (6 * 1024**2, DataSetStoreBacking.File), - (7 * 1024**2, DataSetStoreBacking.RAM), + (8 * 1024**2, DataSetStoreBacking.File), + (9 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["6MB-limit-file-backing", "7MB-limit-ram-backing"], + ids=["8MB-limit-file-backing", "9MB-limit-ram-backing"], ) def test_determine_store_backing_reslice_two_procs( mocker: MockerFixture, @@ -307,6 +305,7 @@ def test_determine_store_backing_reslice_two_procs( # - the read chunk also ~1.7MB # - the output of reslice also ~1.7MB # - the intermediate data created by reslice algorithm ~1.7MB + # - the cupy pinned memory allocations is ~1.7MB DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) @@ -340,10 +339,10 @@ def test_determine_store_backing_reslice_two_procs( @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (3 * 1024**2, DataSetStoreBacking.File), - (4 * 1024**2, DataSetStoreBacking.RAM), + (5 * 1024**2, DataSetStoreBacking.File), + (6 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["3MB-limit-file-backing", "4MB-limit-ram-backing"], + ids=["5MB-limit-file-backing", "6MB-limit-ram-backing"], ) def test_determine_store_backing_no_reslice_two_procs( mocker: MockerFixture, @@ -357,6 +356,7 @@ def test_determine_store_backing_no_reslice_two_procs( # The dtype and shape combined makes: # - the write chunk ~1.7MB # - the read chunk also ~1.7MB + # - the cupy pinned memory allocations is ~1.7MB DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) @@ -391,10 +391,10 @@ def test_determine_store_backing_no_reslice_two_procs( @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (41 * 1024**2, DataSetStoreBacking.File), - (42 * 1024**2, DataSetStoreBacking.RAM), + (42 * 1024**2, DataSetStoreBacking.File), + (43 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["41MB-limit-file-backing", "42MB-limit-ram-backing"], + ids=["42MB-limit-file-backing", "43MB-limit-ram-backing"], ) def test_determine_store_backing_large_padding_reslice_single_proc( mocker: MockerFixture, @@ -409,6 +409,7 @@ def test_determine_store_backing_large_padding_reslice_single_proc( # - the write chunk ~3.4MB # - the padded input chunk ~37.7MB (110 * 300 * 300 * 4 / (1024 ** 2)) # - reslice shouldn't occur due to running with a single process + # - the cupy pinned memory allocations is ~1.7MB DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) PADDING = (50, 50) @@ -446,10 +447,10 @@ def test_determine_store_backing_large_padding_reslice_single_proc( @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (41 * 1024**2, DataSetStoreBacking.File), - (42 * 1024**2, DataSetStoreBacking.RAM), + (42 * 1024**2, DataSetStoreBacking.File), + (43 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["41MB-limit-file-backing", "42MB-limit-ram-backing"], + ids=["42MB-limit-file-backing", "43MB-limit-ram-backing"], ) def test_determine_store_backing_large_padding_no_reslice_single_proc( mocker: MockerFixture, @@ -464,6 +465,7 @@ def test_determine_store_backing_large_padding_no_reslice_single_proc( # - the unpadded input chunk ~3.4MB (10 * 300 * 300 * 4 / (1024 ** 2)) # - the padded input chunk ~37.7MB (110 * 300 * 300 * 4 / (1024 ** 2)) # - the output chunk ~3.4MB (10 * 300 * 300 * 4 / (1024 ** 2)) + # - the cupy pinned memory allocations is ~1.7MB DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) PADDING = (50, 50) @@ -511,10 +513,10 @@ def test_determine_store_backing_large_padding_no_reslice_single_proc( @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (41 * 1024**2, DataSetStoreBacking.File), - (42 * 1024**2, DataSetStoreBacking.RAM), + (42 * 1024**2, DataSetStoreBacking.File), + (43 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["41MB-limit-file-backing", "42MB-limit-ram-backing"], + ids=["42MB-limit-file-backing", "43MB-limit-ram-backing"], ) def test_determine_store_backing_large_padding_reslice_two_procs( mocker: MockerFixture, @@ -530,6 +532,7 @@ def test_determine_store_backing_large_padding_reslice_two_procs( # - the padded input chunk ~36.0MB (105 * 300 * 300 * 4 / (1024 ** 2)) # - the output of reslice also ~1.7MB # - the intermediate data created by reslice algorithm ~1.7MB + # - the cupy pinned memory allocations is ~1.7MB DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) PADDING = (50, 50) @@ -571,10 +574,10 @@ def test_determine_store_backing_large_padding_reslice_two_procs( @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (37 * 1024**2, DataSetStoreBacking.File), - (38 * 1024**2, DataSetStoreBacking.RAM), + (39 * 1024**2, DataSetStoreBacking.File), + (40 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["37MB-limit-file-backing", "38MB-limit-ram-backing"], + ids=["39MB-limit-file-backing", "40MB-limit-ram-backing"], ) def test_determine_store_backing_large_padding_no_reslice_two_procs( mocker: MockerFixture, @@ -589,6 +592,7 @@ def test_determine_store_backing_large_padding_no_reslice_two_procs( # - the unpadded input chunk ~1.7MB (5 * 300 * 300 * 4 / (1024 ** 2)) # - the padded input chunk ~36.0MB (105 * 300 * 300 * 4 / (1024 ** 2)) # - the output chunk ~1.7MB (5 * 300 * 300 * 4 / (1024 ** 2)) + # - the cupy pinned memory allocations is ~1.7MB DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) PADDING = (50, 50)