diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9f8b6d9cea..87d87ef593 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -37,4 +37,4 @@ repos: rev: v1.19.1 hooks: - id: mypy - exclude: ^libensemble/utils/(launcher|loc_stack|runners|pydantic|output_directory)\.py$|^libensemble/tests/regression_tests/support\.py$|^libensemble/tests/functionality_tests/ + exclude: ^libensemble/utils/(launcher|loc_stack|runners|pydantic|output_directory)\.py$|^libensemble/tests/regression_tests/support\.py$|^libensemble/tests/functionality_tests/|^libensemble/tests/unit_tests/ diff --git a/docs/FAQ.rst b/docs/FAQ.rst index b8a3ea2ce5..0339dbb681 100644 --- a/docs/FAQ.rst +++ b/docs/FAQ.rst @@ -13,11 +13,6 @@ We recommend using the following options to help debug workflows:: logger.set_level("DEBUG") libE_specs["safe_mode"] = True -To make it easier to debug a generator try setting the **libE_specs** option ``gen_on_manager``. -To do so, add the following to your calling script:: - - libE_specs["gen_on_manager"] = True - With this, ``pdb`` breakpoints can be set as usual in the generator. For more debugging options see "How can I debug specific libEnsemble processes?" below. diff --git a/docs/data_structures/libE_specs.rst b/docs/data_structures/libE_specs.rst index bc2df3473a..1a31b41b8d 100644 --- a/docs/data_structures/libE_specs.rst +++ b/docs/data_structures/libE_specs.rst @@ -9,7 +9,7 @@ libEnsemble is primarily customized by setting options within a ``LibeSpecs`` in from libensemble.specs import LibeSpecs - specs = LibeSpecs(gen_on_manager=True, save_every_k_gens=100, sim_dirs_make=True, nworkers=4) + specs = LibeSpecs(save_every_k_gens=100, sim_dirs_make=True, nworkers=4) .. dropdown:: Settings by Category :open: @@ -26,9 +26,8 @@ libEnsemble is primarily customized by setting options within a ``LibeSpecs`` in **nworkers** [int]: Number of worker processes in ``"local"``, ``"threads"``, or ``"tcp"``. - **gen_on_manager** [bool] = False - Instructs Manager process to run generator functions. - This generator function can access/modify user objects by reference. + **gen_on_worker** [bool] = False + Instructs Worker process to run generator instead of Manager. **mpi_comm** [MPI communicator] = ``MPI.COMM_WORLD``: libEnsemble MPI communicator. diff --git a/docs/overview_usecases.rst b/docs/overview_usecases.rst index 7d5733d919..e31d6c8990 100644 --- a/docs/overview_usecases.rst +++ b/docs/overview_usecases.rst @@ -53,8 +53,8 @@ to support): .. dropdown:: **Click Here for Use-Cases** * A user wants to optimize a simulation calculation. The simulation may - already be using parallel resources but not a large fraction of a - computer. libEnsemble can coordinate concurrent evaluations of the + already be using parallel resources but not a large fraction of a + computer. libEnsemble can coordinate concurrent evaluations of the simulation ``sim_f`` at multiple parameter values based on candidate parameter values produced by ``gen_f`` (possibly after each ``sim_f`` output). @@ -117,7 +117,7 @@ its capabilities. * **User function**: A generator, simulator, or allocation function. These Python functions govern the libEnsemble workflow. They must conform to the libEnsemble API for each respective user function, but otherwise can - be created or modified by the user. + be created or modified by the user. libEnsemble includes many examples of each type. * **Executor**: The executor provides a simple, portable interface for @@ -138,14 +138,14 @@ its capabilities. allowing them to maintain and update data structures efficiently. These calculations and their assigned workers are referred to as *persistent*. - * **Resource Manager**: libEnsemble includes a built-in resource manager that can detect + * **Resource Manager**: libEnsemble includes a built-in resource manager that can detect (or be provided with) available resources (e.g., a node list). Resources are divided among workers using *resource sets* and can be dynamically reassigned. * **Resource Set**: The smallest unit of resources that can be assigned (and dynamically reassigned) to workers. By default this is the provisioned resources - divided by the number of workers (excluding any workers listed in the + divided by the number of workers (excluding any workers listed in the ``zero_resource_workers`` ``libE_specs`` option). It can also be set explicitly using the ``num_resource_sets`` ``libE_specs`` option. diff --git a/docs/platforms/aurora.rst b/docs/platforms/aurora.rst index af2e7cc160..4865ba0c18 100644 --- a/docs/platforms/aurora.rst +++ b/docs/platforms/aurora.rst @@ -57,7 +57,7 @@ simulations for each worker: .. code-block:: python # Instruct libEnsemble to exit after this many simulations - ensemble.exit_criteria = ExitCriteria(sim_max=nsim_workers*2) + ensemble.exit_criteria = ExitCriteria(sim_max=nsim_workers * 2) Now grab an interactive session on two nodes (or use the batch script at ``../submission_scripts/submit_pbs_aurora.sh``):: @@ -115,26 +115,6 @@ will use one GPU tile):: python run_libe_forces.py -n 25 -Running generator on the manager --------------------------------- - -An alternative is to run the generator on a thread on the manager. The -number of workers can then be set to the number of simulation workers. - -Change the ``libE_specs`` in **run_libe_forces.py** as follows: - -.. code-block:: python - - nsim_workers = ensemble.nworkers - - # Persistent gen does not need resources - ensemble.libE_specs = LibeSpecs( - gen_on_manager=True, - -then we can run with 12 (instead of 13) workers:: - - python run_libe_forces.py -n 12 - Dynamic resource assignment --------------------------- diff --git a/docs/platforms/perlmutter.rst b/docs/platforms/perlmutter.rst index bc8f1af5ed..a2768e1d26 100644 --- a/docs/platforms/perlmutter.rst +++ b/docs/platforms/perlmutter.rst @@ -105,27 +105,6 @@ To see GPU usage, ssh into the node you are on in another window and run:: watch -n 0.1 nvidia-smi -Running generator on the manager -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -An alternative is to run the generator on a thread on the manager. The -number of workers can then be set to the number of simulation workers. - -Change the ``libE_specs`` in **run_libe_forces.py** as follows. - - .. code-block:: python - - nsim_workers = ensemble.nworkers - - # Persistent gen does not need resources - ensemble.libE_specs = LibeSpecs( - gen_on_manager=True, - ) - -and run with:: - - python run_libe_forces.py -n 4 - To watch video ^^^^^^^^^^^^^^ diff --git a/docs/platforms/platforms_index.rst b/docs/platforms/platforms_index.rst index 79285aa7b0..391cfb6c67 100644 --- a/docs/platforms/platforms_index.rst +++ b/docs/platforms/platforms_index.rst @@ -24,35 +24,21 @@ simulation worker, and libEnsemble will distribute user applications across the node allocation. This is the **most common approach** where each simulation runs an MPI application. -The generator will run on a worker by default, but if running a single generator, -the :ref:`libE_specs` option **gen_on_manager** is recommended, -which runs the generator on the manager (using a thread) as below. +.. image:: ../images/centralized_gen_on_manager.png + :alt: centralized + :scale: 55 -.. list-table:: - :widths: 60 40 +A SLURM batch script may include: - * - .. image:: ../images/centralized_gen_on_manager.png - :alt: centralized - :scale: 55 +.. code-block:: bash - - In calling script: + #SBATCH --nodes 3 - .. code-block:: python - :linenos: + python run_libe_forces.py --nworkers 3 - ensemble.libE_specs = LibeSpecs( - gen_on_manager=True, - ) - - A SLURM batch script may include: - - .. code-block:: bash - - #SBATCH --nodes 3 - - python run_libe_forces.py --nworkers 3 - -When using **gen_on_manager**, set ``nworkers`` to the number of workers desired for running simulations. +If running multiple generator processes instead, then set the +:ref:`libE_specs` option **gen_on_worker** so that multiple +worker processes can run multiple generator instances. Dedicated Mode ^^^^^^^^^^^^^^ @@ -62,32 +48,29 @@ True, the MPI executor will not launch applications on nodes where libEnsemble P processes (manager and workers) are running. Workers launch applications onto the remaining nodes in the allocation. -.. list-table:: - :widths: 60 40 - - * - .. image:: ../images/centralized_dedicated.png - :alt: centralized dedicated mode - :scale: 30 - - In calling script: +.. image:: ../images/centralized_dedicated.png + :alt: centralized dedicated mode + :scale: 30 - .. code-block:: python - :linenos: +In calling script: - ensemble.libE_specs = LibeSpecs( - num_resource_sets=2, - dedicated_mode=True, - ) +.. code-block:: python + :linenos: - A SLURM batch script may include: + ensemble.libE_specs = LibeSpecs( + gen_on_worker=True, + num_resource_sets=2, + dedicated_mode=True, + ) - .. code-block:: bash +A SLURM batch script may include: - #SBATCH --nodes 3 +.. code-block:: bash - python run_libe_forces.py --nworkers 3 + #SBATCH --nodes 3 -Note that **gen_on_manager** is not set in the above example. + python run_libe_forces.py --nworkers 3 Distributed Running ------------------- @@ -137,8 +120,7 @@ Zero-resource workers --------------------- Users with persistent ``gen_f`` functions may notice that the persistent workers -are still automatically assigned system resources. This can be resolved by using -the ``gen_on_manager`` option or by +are still automatically assigned system resources. This can be resolved by :ref:`fixing the number of resource sets`. Assigning GPUs diff --git a/docs/running_libE.rst b/docs/running_libE.rst index 7b8b0532d8..50e58afbe5 100644 --- a/docs/running_libE.rst +++ b/docs/running_libE.rst @@ -12,13 +12,6 @@ determine the parameters/inputs for simulations. Simulator functions run and manage simulations, which often involve running a user application (see :doc:`Executor`). -.. note:: - As of version 1.3.0, the generator can be run as a thread on the manager, - using the :ref:`libE_specs` option **gen_on_manager**. - When using this option, set the number of workers desired for running - simulations. See :ref:`Running generator on the manager` - for more details. - To use libEnsemble, you will need a calling script, which in turn will specify generator and simulator functions. Many :doc:`examples` are available. @@ -161,29 +154,6 @@ If this example was run as:: No simulations will be able to run. -.. _gen-on-manager: - -Running generator on the manager --------------------------------- - -The majority of libEnsemble use cases run a single generator. The -:ref:`libE_specs` option **gen_on_manager** will cause -the generator function to run on a thread on the manager. This can run -persistent user functions, sharing data structures with the manager, and avoids -additional communication to a generator running on a worker. When using this -option, the number of workers specified should be the (maximum) number of -concurrent simulations. - -If modifying a workflow to use ``gen_on_manager`` consider the following. - -* Set ``nworkers`` to the number of workers desired for running simulations. -* If using :meth:`add_unique_random_streams()` - to seed random streams, the default generator seed will be zero. -* If you have a line like ``libE_specs["nresource_sets"] = nworkers -1``, this - line should be removed. -* If the generator does use resources, ``nresource_sets`` can be increased as needed - so that the generator and all simulations are resourced. - Environment Variables --------------------- diff --git a/docs/tutorials/executor_forces_tutorial.rst b/docs/tutorials/executor_forces_tutorial.rst index e01496734b..a083aa2a82 100644 --- a/docs/tutorials/executor_forces_tutorial.rst +++ b/docs/tutorials/executor_forces_tutorial.rst @@ -336,44 +336,6 @@ These may require additional browsing of the documentation to complete. ... -Running the generator on the manager ------------------------------------- - -As of version 1.3.0, the generator can be run on a thread on the manager, -using the :ref:`libE_specs` option **gen_on_manager**. - -Change the libE_specs as follows. - - .. code-block:: python - :linenos: - :lineno-start: 28 - - nsim_workers = ensemble.nworkers - - # Persistent gen does not need resources - ensemble.libE_specs = LibeSpecs( - gen_on_manager=True, - sim_dirs_make=True, - ensemble_dir_path="./test_executor_forces_tutorial", - ) - -When running set ``nworkers`` to the number of workers desired for running simulations. -E.g., Instead of: - -.. code-block:: bash - - python run_libe_forces.py --nworkers 5 - -use: - -.. code-block:: bash - - python run_libe_forces.py --nworkers 4 - -Note that as the generator random number seed will be zero instead of one, the checksum will change. - -For more information see :ref:`Running generator on the manager`. - Running forces application with input file ------------------------------------------ diff --git a/docs/tutorials/gpcam_tutorial.rst b/docs/tutorials/gpcam_tutorial.rst index a013c1b67e..096d5584ca 100644 --- a/docs/tutorials/gpcam_tutorial.rst +++ b/docs/tutorials/gpcam_tutorial.rst @@ -30,6 +30,7 @@ This version (and others) of the gpCAM generator can be found at `libensemble/ge from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG from libensemble.tools.persistent_support import PersistentSupport + def persistent_gpCAM(H_in, persis_info, gen_specs, libE_info): """Run a batched gpCAM model to create a surrogate""" @@ -156,6 +157,7 @@ For running applications using parallel resources in the simulator see the `forc # Define our simulation function import numpy as np + def six_hump_camel(H, persis_info, sim_specs, _): """Six-Hump Camel sim_f.""" @@ -189,6 +191,8 @@ First we will create a cleanup script so we can easily re-run. # To rerun this notebook, we need to delete the ensemble directory. import shutil + + def cleanup(): try: shutil.rmtree("ensemble") @@ -218,31 +222,30 @@ If you wish to make your own functions based on the above, those can be imported nworkers = 4 - # When using gen_on_manager, nworkers is number of concurrent sims. # final_gen_send means the last evaluated points are returned to the generator to update the model. - libE_specs = LibeSpecs(nworkers=nworkers, gen_on_manager=True, final_gen_send=True) + libE_specs = LibeSpecs(nworkers=nworkers, final_gen_send=True) n = 2 # Input dimensions batch_size = 4 num_batches = 6 gen_specs = GenSpecs( - gen_f=persistent_gpCAM, # Generator function - persis_in=["f"], # Objective, defined in sim, is returned to gen + gen_f=persistent_gpCAM, # Generator function + persis_in=["f"], # Objective, defined in sim, is returned to gen outputs=[("x", float, (n,))], # Parameters (name, type, size) user={ "batch_size": batch_size, "lb": np.array([-2, -1]), # lower boundaries for n dimensions - "ub": np.array([2, 1]), # upper boundaries for n dimensions - "ask_max_iter": 5, # Number of iterations for ask (default 20) + "ub": np.array([2, 1]), # upper boundaries for n dimensions + "ask_max_iter": 5, # Number of iterations for ask (default 20) "rng_seed": 0, }, ) sim_specs = SimSpecs( - sim_f=six_hump_camel, # Simulator function - inputs=["x"], # Input field names. "x" defined in gen - outputs=[("f", float)], # Objective + sim_f=six_hump_camel, # Simulator function + inputs=["x"], # Input field names. "x" defined in gen + outputs=[("f", float)], # Objective ) # Starts one persistent generator. Simulated values are returned in batch. @@ -251,7 +254,7 @@ If you wish to make your own functions based on the above, those can be imported user={"async_return": False}, # False = batch returns ) - exit_criteria = ExitCriteria(sim_max=num_batches*batch_size) + exit_criteria = ExitCriteria(sim_max=num_batches * batch_size) # Initialize and run the ensemble. ensemble = Ensemble( @@ -272,7 +275,7 @@ At the end of our calling script we run the ensemble. H, persis_info, flag = ensemble.run() # Start the ensemble. Blocks until completion. ensemble.save_output("H_array", append_attrs=False) # Save H (history of all evaluated points) to file - pprint(H[["sim_id", "x", "f"]][:16]) # See first 16 results + pprint(H[["sim_id", "x", "f"]][:16]) # See first 16 results Rerun and test model at known points ------------------------------------ @@ -312,15 +315,21 @@ values at the test points. markersize = 10 plt.figure(figsize=(10, 5)) plt.plot( - num_sims, mse, marker="^", markeredgecolor="black", markeredgewidth=2, - markersize=markersize, linewidth=2, label="Mean squared error" + num_sims, + mse, + marker="^", + markeredgecolor="black", + markeredgewidth=2, + markersize=markersize, + linewidth=2, + label="Mean squared error", ) plt.xticks(num_sims) # Labeling the axes and the legend - plt.title('Mean Squared Error at test points') + plt.title("Mean Squared Error at test points") plt.xlabel("Number of simulations") - plt.ylabel('Mean squared error (rad$^2$)') + plt.ylabel("Mean squared error (rad$^2$)") legend = plt.legend(framealpha=1, edgecolor="black") # Increase edge width here plt.grid(True) plt.show() diff --git a/libensemble/ensemble.py b/libensemble/ensemble.py index 05ddd73941..fe079c99a9 100644 --- a/libensemble/ensemble.py +++ b/libensemble/ensemble.py @@ -161,11 +161,11 @@ class Ensemble: def __init__( self, - sim_specs: SimSpecs | None = SimSpecs(), - gen_specs: GenSpecs | None = GenSpecs(), - exit_criteria: ExitCriteria | None = {}, - libE_specs: LibeSpecs | None = LibeSpecs(), - alloc_specs: AllocSpecs | None = AllocSpecs(), + sim_specs: SimSpecs | dict | None = SimSpecs(), + gen_specs: GenSpecs | dict | None = GenSpecs(), + exit_criteria: ExitCriteria | dict | None = {}, + libE_specs: LibeSpecs | dict | None = LibeSpecs(), + alloc_specs: AllocSpecs | dict | None = AllocSpecs(), persis_info: dict | None = {}, executor: Executor | None = None, H0: npt.NDArray | None = None, @@ -174,7 +174,11 @@ def __init__( self.sim_specs = sim_specs self.gen_specs = gen_specs self.exit_criteria = exit_criteria - self._libE_specs = libE_specs + self._libE_specs: LibeSpecs | None = None + if isinstance(libE_specs, dict): + self._libE_specs = LibeSpecs(**libE_specs) + else: + self._libE_specs = libE_specs self.alloc_specs = alloc_specs self.persis_info = persis_info self.executor = executor @@ -188,11 +192,10 @@ def __init__( if parse_args: self._parse_args() self.parsed = True - self._known_comms = self._libE_specs.comms + if self._libE_specs: + self._known_comms = self._libE_specs.comms if not self._known_comms and self._libE_specs is not None: - if isinstance(self._libE_specs, dict): - self._libE_specs = LibeSpecs(**self._libE_specs) self._known_comms = self._libE_specs.comms if self._known_comms == "local": @@ -202,9 +205,10 @@ def __init__( elif self._known_comms == "mpi" and not parse_args: # Set internal _nworkers - not libE_specs (avoid "nworkers will be ignored" warning) - self._nworkers, self.is_manager = mpi_init(self._libE_specs.mpi_comm) + if self._libE_specs: + self._nworkers, self.is_manager = mpi_init(self._libE_specs.mpi_comm) - def _parse_args(self) -> (int, bool, LibeSpecs): + def _parse_args(self) -> tuple[int, bool, LibeSpecs]: # Set internal _nworkers - not libE_specs (avoid "nworkers will be ignored" warning) self._nworkers, self.is_manager, libE_specs_parsed, self.extra_args = parse_args_f() @@ -220,7 +224,7 @@ def ready(self) -> bool: return all([i for i in [self.exit_criteria, self._libE_specs, self.sim_specs]]) @property - def libE_specs(self) -> LibeSpecs: + def libE_specs(self) -> LibeSpecs | None: return self._libE_specs @libE_specs.setter @@ -257,7 +261,7 @@ def libE_specs(self, new_specs): def _refresh_executor(self): Executor.executor = self.executor or Executor.executor - def run(self) -> (npt.NDArray, dict, int): + def run(self) -> tuple[npt.NDArray, dict, int]: """ Initializes libEnsemble. @@ -297,10 +301,10 @@ def run(self) -> (npt.NDArray, dict, int): """ self._refresh_executor() - - if self._libE_specs.comms != self._known_comms: + if self._libE_specs and self._libE_specs.comms != self._known_comms: raise ValueError(CHANGED_COMMS_WARN) + assert self._libE_specs is not None self.H, self.persis_info, self.flag = libE( self.sim_specs, self.gen_specs, @@ -365,6 +369,7 @@ def save_output(self, basename: str, append_attrs: bool = True): """ if self.is_manager: if self._get_option("libE_specs", "workflow_dir_path"): + assert self.libE_specs is not None save_libE_output( self.H, self.persis_info, diff --git a/libensemble/libE.py b/libensemble/libE.py index abde5423c0..f66ec3ed6f 100644 --- a/libensemble/libE.py +++ b/libensemble/libE.py @@ -155,9 +155,9 @@ def libE( exit_criteria: ExitCriteria, persis_info: dict = {}, alloc_specs: AllocSpecs = AllocSpecs(), - libE_specs: LibeSpecs = {}, + libE_specs: LibeSpecs | dict = {}, H0=None, -) -> (np.ndarray, dict, int): +) -> tuple[np.ndarray, dict, int]: """ Parameters ---------- @@ -243,16 +243,22 @@ def libE( exit_criteria = specs_dump(ensemble.exit_criteria, by_alias=True, exclude_none=True) # Restore objects that don't survive serialization via model_dump - if hasattr(ensemble.gen_specs, "generator") and ensemble.gen_specs.generator is not None: - gen_specs["generator"] = ensemble.gen_specs.generator - if hasattr(ensemble.gen_specs, "vocs") and ensemble.gen_specs.vocs is not None: - gen_specs["vocs"] = ensemble.gen_specs.vocs + if hasattr(ensemble.sim_specs, "simulator") and ensemble.sim_specs.simulator is not None: + sim_specs["simulator"] = ensemble.sim_specs.simulator + if hasattr(ensemble.sim_specs, "vocs") and ensemble.sim_specs.vocs is not None: + sim_specs["vocs"] = ensemble.sim_specs.vocs + + if ensemble.gen_specs is not None: + if hasattr(ensemble.gen_specs, "generator") and ensemble.gen_specs.generator is not None: + gen_specs["generator"] = ensemble.gen_specs.generator + if hasattr(ensemble.gen_specs, "vocs") and ensemble.gen_specs.vocs is not None: + gen_specs["vocs"] = ensemble.gen_specs.vocs # Extract platform info from settings or environment platform_info = get_platform(libE_specs) if libE_specs["dry_run"]: - logger.manager_warning("Dry run. All libE() inputs validated. Exiting.") + logger.manager_warning("Dry run. All libE() inputs validated. Exiting.") # type: ignore[attr-defined] sys.exit() libE_funcs = {"mpi": libE_mpi, "tcp": libE_tcp, "local": libE_local, "threads": libE_local} @@ -287,7 +293,7 @@ def manager( if "out" in gen_specs and ("sim_id", int) in gen_specs["out"]: if hasattr(gen_specs["gen_f"], "__module__") and "libensemble.gen_funcs" not in gen_specs["gen_f"].__module__: - logger.manager_warning(_USER_SIM_ID_WARNING) + logger.manager_warning(_USER_SIM_ID_WARNING) # type: ignore[attr-defined] try: try: @@ -496,6 +502,7 @@ def libE_local(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, li wcomms = start_proc_team(libE_specs["nworkers"], sim_specs, gen_specs, libE_specs) # Set manager resources after the forkpoint. + # if libE_specs["gen_on_worker"] == True, -n reflects the exact number of workers if resources is not None: resources.set_resource_manager(libE_specs["nworkers"]) diff --git a/libensemble/manager.py b/libensemble/manager.py index c2763b7177..3ee2433202 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -57,7 +57,7 @@ class LoggedException(Exception): """Raise exception for handling without re-logging""" -def report_worker_exc(wrk_exc: Exception = None) -> None: +def report_worker_exc(wrk_exc: Exception | None = None) -> None: """Write worker exception to log""" if wrk_exc is not None: from_line, msg, exc = wrk_exc.args @@ -75,7 +75,7 @@ def manager_main( exit_criteria: dict, persis_info: dict, wcomms: list = [], -) -> (dict, int, int): +) -> tuple[dict, int, int]: """Manager routine to coordinate the generation and simulation evaluations Parameters @@ -216,7 +216,7 @@ def __init__( self.elapsed = lambda: timer.elapsed self.wcomms = wcomms self.WorkerExc = False - self.persis_pending = [] + self.persis_pending: list[int] = [] self.live_data = libE_specs.get("live_data") dyn_keys = ("resource_sets", "num_procs", "num_gpus") @@ -232,19 +232,20 @@ def __init__( (1, "stop_val", self.term_test_stop_val), ] - gen_on_manager = self.libE_specs.get("gen_on_manager", False) + gen_on_worker = self.libE_specs.get("gen_on_worker", False) + len_W = len(self.wcomms) + 1 - gen_on_worker # if gen_on_worker, len_W = len(self.wcomms) - self.W = np.zeros(len(self.wcomms) + gen_on_manager, dtype=Manager.worker_dtype) - if gen_on_manager: + self.W = np.zeros(len_W, dtype=Manager.worker_dtype) + if gen_on_worker: + self.W["worker_id"] = np.arange(len(self.wcomms)) + 1 # [1, 2, 3, ...] + else: self.W["worker_id"] = np.arange(len(self.wcomms) + 1) # [0, 1, 2, ...] self.W[0]["gen_worker"] = True local_worker_comm = self._run_additional_worker(hist, sim_specs, gen_specs, libE_specs) self.wcomms = [local_worker_comm] + self.wcomms - else: - self.W["worker_id"] = np.arange(len(self.wcomms)) + 1 # [1, 2, 3, ...] - self.W = _WorkerIndexer(self.W, gen_on_manager) - self.wcomms = _WorkerIndexer(self.wcomms, gen_on_manager) + self.W = _WorkerIndexer(self.W, 1 - gen_on_worker) # if gen on worker, then no additional worker + self.wcomms = _WorkerIndexer(self.wcomms, 1 - gen_on_worker) temp_EnsembleDirectory = EnsembleDirectory(libE_specs=libE_specs) self.resources = Resources.resources @@ -263,7 +264,7 @@ def __init__( try: temp_EnsembleDirectory.make_copyback() except AssertionError as e: # Ensemble dir exists and isn't empty. - logger.manager_warning(_USER_CALC_DIR_WARNING.format(temp_EnsembleDirectory.ensemble_dir)) + logger.manager_warning(_USER_CALC_DIR_WARNING.format(temp_EnsembleDirectory.ensemble_dir)) # type: ignore self._kill_workers() raise ManagerException( "Manager errored on initialization", @@ -394,22 +395,22 @@ def _set_resources(self, Work: dict, w: int) -> None: If rsets are not assigned, then assign using default mapping """ - resource_manager = self.resources.resource_manager + resource_manager = self.resources.resource_manager # type: ignore rset_req = Work["libE_info"].get("rset_team") if rset_req is None: rset_team = [] - default_rset = resource_manager.index_list[w - 1] + default_rset = resource_manager.index_list[w - 1] # type: ignore if default_rset is not None: rset_team.append(default_rset) Work["libE_info"]["rset_team"] = rset_team - resource_manager.assign_rsets(Work["libE_info"]["rset_team"], w) + resource_manager.assign_rsets(Work["libE_info"]["rset_team"], w) # type: ignore def _freeup_resources(self, w: int) -> None: """Free up resources assigned to the worker""" if self.resources: - self.resources.resource_manager.free_rsets(w) + self.resources.resource_manager.free_rsets(w) # type: ignore def _ensure_sim_id_in_persis_in(self, D: npt.NDArray) -> None: """Add sim_id to gen_specs persis_in if generator output contains sim_id (gest-api style generators only)""" @@ -554,7 +555,7 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int) -> None: self._kill_workers() raise WorkerException(f"Received error message from worker {w}", D_recv.msg, D_recv.exc) elif isinstance(D_recv, logging.LogRecord): - logger.vdebug(f"Manager received a log message from worker {w}") + logger.vdebug(f"Manager received a log message from worker {w}") # type: ignore[attr-defined] logging.getLogger(D_recv.name).handle(D_recv) else: logger.debug(f"Manager received data message from worker {w}") @@ -585,7 +586,7 @@ def _kill_cancelled_sims(self) -> None: # --- Handle termination - def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int): + def _final_receive_and_kill(self, persis_info: dict) -> tuple[dict, int, int]: """ Tries to receive from any active workers. @@ -623,9 +624,9 @@ def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int): # Elapsed Wallclock has expired if not any(self.W["persis_state"]): if any(self.W["active"]): - logger.manager_warning(_WALLCLOCK_MSG_ACTIVE) + logger.manager_warning(_WALLCLOCK_MSG_ACTIVE) # type: ignore else: - logger.manager_warning(_WALLCLOCK_MSG_ALL_RETURNED) + logger.manager_warning(_WALLCLOCK_MSG_ALL_RETURNED) # type: ignore exit_flag = 2 if self.WorkerExc: exit_flag = 1 @@ -662,7 +663,7 @@ def _get_alloc_libE_info(self) -> dict: "use_resource_sets": self.use_resource_sets, "gen_num_procs": self.gen_num_procs, "gen_num_gpus": self.gen_num_gpus, - "gen_on_manager": self.libE_specs.get("gen_on_manager", False), + "gen_on_worker": self.libE_specs.get("gen_on_worker", False), } def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict: @@ -699,7 +700,7 @@ def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict: # --- Main loop - def run(self, persis_info: dict) -> (dict, int, int): + def run(self, persis_info: dict) -> tuple[dict, int, int]: """Runs the manager""" logger.debug(f"Manager initiated on node {socket.gethostname()}") logger.info(f"Manager exit_criteria: {self.exit_criteria}") @@ -731,7 +732,7 @@ def run(self, persis_info: dict) -> (dict, int, int): finally: # Return persis_info, exit_flag, elapsed time result = self._final_receive_and_kill(persis_info) - self.wcomms = None + self.wcomms = [] sys.stdout.flush() sys.stderr.flush() return result diff --git a/libensemble/resources/resources.py b/libensemble/resources/resources.py index 105f8a836e..5ba54cc09b 100644 --- a/libensemble/resources/resources.py +++ b/libensemble/resources/resources.py @@ -63,12 +63,12 @@ def init_resources(cls, libE_specs: dict, platform_info: dict = {}) -> None: libE_specs=libE_specs, platform_info=platform_info, top_level_dir=top_level_dir ) - def __init__(self, libE_specs: dict, platform_info: dict = {}, top_level_dir: str = None) -> None: + def __init__(self, libE_specs: dict, platform_info: dict = {}, top_level_dir: str = "") -> None: """Initiate a new resources object""" self.top_level_dir = top_level_dir or os.getcwd() - self.glob_resources = GlobalResources(libE_specs=libE_specs, platform_info=platform_info, top_level_dir=None) - self.resource_manager = None # For Manager - self.worker_resources = None # For Workers + self.glob_resources = GlobalResources(libE_specs=libE_specs, platform_info=platform_info, top_level_dir="") + self.resource_manager: ResourceManager | None = None # For Manager + self.worker_resources: WorkerResources | None = None # For Workers def set_worker_resources(self, num_workers: int, workerid: int) -> None: """Initiate the worker resources component of resources""" @@ -101,7 +101,7 @@ class GlobalResources: :ivar int num_resource_sets: Number of resource sets, if supplied by the user. """ - def __init__(self, libE_specs: dict, platform_info: dict = {}, top_level_dir: str = None) -> None: + def __init__(self, libE_specs: dict, platform_info: dict = {}, top_level_dir: str = "") -> None: """Initializes a new Resources instance Determines the compute resources available for current allocation, including diff --git a/libensemble/resources/rset_resources.py b/libensemble/resources/rset_resources.py index d35cdaee8b..1ef4cc60ba 100644 --- a/libensemble/resources/rset_resources.py +++ b/libensemble/resources/rset_resources.py @@ -130,7 +130,7 @@ def get_rsets_on_a_node(num_rsets, resources): def get_workers2assign2(num_workers, resources): """Returns workers to assign resources to""" zero_resource_list = resources.zero_resource_workers - return num_workers - len(zero_resource_list) + return num_workers - len(zero_resource_list) if resources.zero_resource_workers != [0] else num_workers @staticmethod def even_assignment(nnodes, nworkers): diff --git a/libensemble/resources/worker_resources.py b/libensemble/resources/worker_resources.py index 5033b2aeee..4faa674e93 100644 --- a/libensemble/resources/worker_resources.py +++ b/libensemble/resources/worker_resources.py @@ -105,7 +105,7 @@ def free_rsets(self, worker=None): def get_index_list(num_workers: int, num_rsets: int, zero_resource_list: list[int | Any]) -> list[int | None]: """Map WorkerID to index into a nodelist""" index = 0 - index_list = [] + index_list: list[int | None] = [] for i in range(1, num_workers + 1): if i in zero_resource_list: index_list.append(None) @@ -116,6 +116,11 @@ def get_index_list(num_workers: int, num_rsets: int, zero_resource_list: list[in else: index_list.append(index) index += 1 + + for i in zero_resource_list: + if i >= num_workers: + logger.warning(f"Worker index {i} from zero_resource_workers is out of range (0-{num_workers - 1})") + return index_list @@ -364,7 +369,7 @@ def get_local_nodelist( local_nodelist = list(OrderedDict.fromkeys(team_list)) # Maintain order of nodes logger.debug(f"Worker's local_nodelist is {local_nodelist}") - slots = {} + slots: dict[str, list[int]] = {} for node in local_nodelist: slots[node] = [] diff --git a/libensemble/sim_funcs/run_line_check.py b/libensemble/sim_funcs/run_line_check.py index 530b1e8d9b..d30d10ec38 100644 --- a/libensemble/sim_funcs/run_line_check.py +++ b/libensemble/sim_funcs/run_line_check.py @@ -22,7 +22,7 @@ def exp_nodelist_for_worker(exp_list, workerID, nodes_per_worker, persis_gens): node_list = comp.split(",") for node in node_list: node_name, node_num = node.split("-") - offset = workerID - (1 + persis_gens) + offset = workerID - 1 new_num = int(node_num) + int(nodes_per_worker * offset) new_node = "-".join([node_name, str(new_num)]) new_node_list.append(new_node) diff --git a/libensemble/specs.py b/libensemble/specs.py index 70c18bddd2..bdec38f4db 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -390,9 +390,9 @@ class LibeSpecs(BaseModel): nworkers: int | None = 0 """ Number of worker processes in ``"local"``, ``"threads"``, or ``"tcp"``.""" - gen_on_manager: bool | None = False - """ Instructs Manager process to run generator functions. - This generator function can access/modify user objects by reference. + gen_on_worker: bool = False + """ Instructs libEnsemble to run generator functions on a worker rank. + By default, the generator runs on the manager process as a thread (Worker 0). """ mpi_comm: object | None = None @@ -681,10 +681,11 @@ class LibeSpecs(BaseModel): libEnsemble processes (manager and workers) are running. """ - zero_resource_workers: list[int] | None = [] + zero_resource_workers: list[int] | None = [0] """ list of workers that require no resources. For when a fixed mapping of workers to resources is required. Otherwise, use ``num_resource_sets``. + By default, Worker 0 (manager thread) is a zero-resource worker. For use with supported allocation functions. """ diff --git a/libensemble/tests/functionality_tests/test_1d_splitcomm.py b/libensemble/tests/functionality_tests/test_1d_splitcomm.py index 467afe613a..07959d5cc0 100644 --- a/libensemble/tests/functionality_tests/test_1d_splitcomm.py +++ b/libensemble/tests/functionality_tests/test_1d_splitcomm.py @@ -35,6 +35,7 @@ libE_specs["H_file_prefix"] = "splitcomm_" + str(sub_comm_number) libE_specs["safe_mode"] = False libE_specs["disable_log_files"] = True + libE_specs["gen_on_worker"] = True sim_specs = { "sim_f": sim_f, diff --git a/libensemble/tests/functionality_tests/test_GPU_gen_resources.py b/libensemble/tests/functionality_tests/test_GPU_gen_resources.py index 8feb36ea77..5429c9a69d 100644 --- a/libensemble/tests/functionality_tests/test_GPU_gen_resources.py +++ b/libensemble/tests/functionality_tests/test_GPU_gen_resources.py @@ -49,8 +49,6 @@ if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - libE_specs["num_resource_sets"] = nworkers # Persistent gen DOES need resources - # Mock GPU system / uncomment to detect GPUs libE_specs["sim_dirs_make"] = True # Will only contain files if dry_run is False libE_specs["gen_dirs_make"] = True # Will only contain files if dry_run is False @@ -82,7 +80,8 @@ "give_all_with_same_priority": False, "async_return": False, "user": { - "max_procs": nworkers - 1, # Any sim created can req. 1 worker up to all. + "initial_batch_size": "set_in_loop", + "max_procs": "set_in_loop", # Any sim created can req. 1 worker up to all. "lb": np.array([-3, -2]), "ub": np.array([3, 2]), "dry_run": dry_run, @@ -90,33 +89,48 @@ } exit_criteria = {"sim_max": 20} - libE_specs["resource_info"] = {"cores_on_node": (nworkers * 2, nworkers * 4), "gpus_on_node": nworkers} base_libE_specs = libE_specs.copy() - for gen_on_manager in [False, True]: + for gen_on_worker in [False, True]: for run in range(5): # reset libE_specs = base_libE_specs.copy() - libE_specs["gen_on_manager"] = gen_on_manager - persis_info = add_unique_random_streams({}, nworkers + 1) + libE_specs["gen_on_worker"] = gen_on_worker + libE_specs["zero_resource_workers"] = [] # perhaps the generator needs GPUs + + resourced_workers = ( + nworkers if gen_on_worker else nworkers + 1 + ) # note this "nworkers" decided before the extra worker starts + sim_workers = nworkers - 1 if gen_on_worker else nworkers + + gen_specs["user"]["initial_batch_size"] = sim_workers + gen_specs["user"]["max_procs"] = sim_workers + + libE_specs["num_resource_sets"] = resourced_workers + libE_specs["resource_info"] = { + "cores_on_node": (resourced_workers * 2, resourced_workers * 4), + "gpus_on_node": resourced_workers, + } + + persis_info = add_unique_random_streams({}, resourced_workers) if run == 0: libE_specs["gen_num_procs"] = 2 elif run == 1: - if gen_on_manager: - print("SECOND LIBE CALL WITH GEN ON MANAGER") + if gen_on_worker: + print("SECOND LIBE CALL WITH GEN ON WORKER INSTEAD OF MANAGER") libE_specs["gen_num_gpus"] = 1 elif run == 2: persis_info["gen_num_gpus"] = 1 elif run == 3: # Two GPUs per resource set - libE_specs["resource_info"]["gpus_on_node"] = nworkers * 2 + libE_specs["resource_info"]["gpus_on_node"] = resourced_workers * 2 persis_info["gen_num_gpus"] = 1 elif run == 4: # Two GPUs requested for gen persis_info["gen_num_procs"] = 2 persis_info["gen_num_gpus"] = 2 - gen_specs["user"]["max_procs"] = max(nworkers - 2, 1) + gen_specs["user"]["max_procs"] = max(sim_workers - 1, 1) # Perform the run H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs) diff --git a/libensemble/tests/functionality_tests/test_asktell_sampling.py b/libensemble/tests/functionality_tests/test_asktell_sampling.py index fccb9cd6be..83db331c8d 100644 --- a/libensemble/tests/functionality_tests/test_asktell_sampling.py +++ b/libensemble/tests/functionality_tests/test_asktell_sampling.py @@ -33,7 +33,6 @@ def sim_f(In): if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - libE_specs["gen_on_manager"] = True sim_specs = { "sim_f": sim_f, diff --git a/libensemble/tests/functionality_tests/test_asktell_sampling_external_gen.py b/libensemble/tests/functionality_tests/test_asktell_sampling_external_gen.py index 318cb49ecf..4d99255e6a 100644 --- a/libensemble/tests/functionality_tests/test_asktell_sampling_external_gen.py +++ b/libensemble/tests/functionality_tests/test_asktell_sampling_external_gen.py @@ -44,7 +44,7 @@ def sim_f_scalar(In): if __name__ == "__main__": - libE_specs = LibeSpecs(gen_on_manager=True) + libE_specs = LibeSpecs() for test in range(1): # 2 diff --git a/libensemble/tests/functionality_tests/test_evaluate_existing_plus_gen.py b/libensemble/tests/functionality_tests/test_evaluate_existing_plus_gen.py index b470ccb5a5..fe066ccf54 100644 --- a/libensemble/tests/functionality_tests/test_evaluate_existing_plus_gen.py +++ b/libensemble/tests/functionality_tests/test_evaluate_existing_plus_gen.py @@ -1,6 +1,6 @@ """ Test libEnsemble's capability to evaluate existing points and then generate -new samples via gen_on_manager. +new samples. Execute via one of the following commands (e.g. 3 workers): mpiexec -np 4 python test_evaluate_existing_sample.py @@ -44,7 +44,6 @@ def create_H0(persis_info, gen_specs, H0_size): if __name__ == "__main__": sampling = Ensemble(parse_args=True) - sampling.libE_specs.gen_on_manager = True sampling.sim_specs = SimSpecs(sim_f=sim_f, inputs=["x"], out=[("f", float)]) gen_specs = { diff --git a/libensemble/tests/functionality_tests/test_mpi_gpu_settings.py b/libensemble/tests/functionality_tests/test_mpi_gpu_settings.py index 203a1ca459..7985ff0fff 100644 --- a/libensemble/tests/functionality_tests/test_mpi_gpu_settings.py +++ b/libensemble/tests/functionality_tests/test_mpi_gpu_settings.py @@ -39,7 +39,7 @@ # Do not change these lines - they are parsed by run-tests.sh # TESTSUITE_COMMS: mpi local -# TESTSUITE_NPROCS: 3 6 +# TESTSUITE_NPROCS: 4 7 import os import sys @@ -65,7 +65,7 @@ # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - libE_specs["num_resource_sets"] = nworkers - 1 # Persistent gen does not need resources + libE_specs["num_resource_sets"] = nworkers # Persistent gen does not need resources libE_specs["use_workflow_dir"] = True # Only a place for Open MPI machinefiles if libE_specs["comms"] == "tcp": @@ -88,9 +88,9 @@ "out": [("priority", float), ("resource_sets", int), ("x", float, n)], "give_all_with_same_priority": False, "async_return": False, - "initial_batch_size": nworkers - 1, + "initial_batch_size": nworkers, "user": { - "max_resource_sets": nworkers - 1, # Any sim created can req. 1 worker up to all. + "max_resource_sets": nworkers, # Any sim created can req. 1 worker up to all. "lb": np.array([-3, -2]), "ub": np.array([3, 2]), }, diff --git a/libensemble/tests/functionality_tests/test_mpi_gpu_settings_env.py b/libensemble/tests/functionality_tests/test_mpi_gpu_settings_env.py index 30c737cbfe..51578a55f2 100644 --- a/libensemble/tests/functionality_tests/test_mpi_gpu_settings_env.py +++ b/libensemble/tests/functionality_tests/test_mpi_gpu_settings_env.py @@ -42,7 +42,7 @@ # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - libE_specs["num_resource_sets"] = nworkers - 1 # Persistent gen does not need resources + libE_specs["num_resource_sets"] = nworkers # Persistent gen does not need resources libE_specs["use_workflow_dir"] = True # Only a place for Open MPI machinefiles # Optional for organization of output scripts @@ -72,7 +72,7 @@ "give_all_with_same_priority": False, "async_return": False, "user": { - "max_resource_sets": nworkers - 1, # Any sim created can req. 1 worker up to all. + "max_resource_sets": nworkers, # Any sim created can req. 1 worker up to all. "lb": np.array([-3, -2]), "ub": np.array([3, 2]), }, diff --git a/libensemble/tests/functionality_tests/test_mpi_runners_subnode_uneven.py b/libensemble/tests/functionality_tests/test_mpi_runners_subnode_uneven.py index 26844bcc1d..c28b95b9f9 100644 --- a/libensemble/tests/functionality_tests/test_mpi_runners_subnode_uneven.py +++ b/libensemble/tests/functionality_tests/test_mpi_runners_subnode_uneven.py @@ -27,7 +27,7 @@ # Do not change these lines - they are parsed by run-tests.sh # TESTSUITE_COMMS: mpi local -# TESTSUITE_NPROCS: 4 6 +# TESTSUITE_NPROCS: 5 7 # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": diff --git a/libensemble/tests/functionality_tests/test_mpi_runners_zrw_subnode_uneven.py b/libensemble/tests/functionality_tests/test_mpi_runners_zrw_subnode_uneven.py index aa2a1a8ebe..c9f5047e40 100644 --- a/libensemble/tests/functionality_tests/test_mpi_runners_zrw_subnode_uneven.py +++ b/libensemble/tests/functionality_tests/test_mpi_runners_zrw_subnode_uneven.py @@ -43,6 +43,7 @@ comms = libE_specs["comms"] libE_specs["dedicated_mode"] = True + libE_specs["zero_resource_workers"] = [0] libE_specs["enforce_worker_core_bounds"] = True # To allow visual checking - log file not used in test @@ -51,7 +52,7 @@ # For varying size test - relate node count to nworkers n_gens = 1 - nsim_workers = nworkers - n_gens + nsim_workers = nworkers # - n_gens if nsim_workers % 2 == 0: sys.exit( @@ -136,10 +137,17 @@ exp_srun.append(srun_p1 + str(nodename) + srun_p2 + str(ntasks) + srun_p3 + str(ntasks) + srun_p4) test_list = test_list_base - exp_list = exp_srun + exp_list_dynamic = exp_srun.copy() + if nworkers == 5: + # Iteration 0 (Dynamic): Workers 1, 2 -> node-2; 3, 4, 5 -> node-1 + n1 = srun_p1 + "node-1" + srun_p2 + "5" + srun_p3 + "5" + srun_p4 + n2 = srun_p1 + "node-2" + srun_p2 + "8" + srun_p3 + "8" + srun_p4 + exp_list_dynamic = [n2, n2, n1, n1, n1] + # Iteration 1 (Static): Worker 1 is gen. Workers 2, 3 -> node-1; 4, 5 -> node-2 + exp_list_static = [n1, n1, n2, n2] + sim_specs["user"] = { "tests": test_list, - "expect": exp_list, "persis_gens": n_gens, } @@ -147,15 +155,21 @@ for prob_id in range(iterations): if prob_id == 0: # Uses dynamic scheduler - will find node 2 slots first (as fewer) - libE_specs["num_resource_sets"] = nworkers - 1 # Any worker can be the gen - sim_specs["user"]["offset_for_scheduler"] = True # Changes expected values + libE_specs["gen_on_worker"] = False + libE_specs["num_resource_sets"] = nworkers + sim_specs["user"]["expect"] = exp_list_dynamic + sim_specs["user"]["offset_for_scheduler"] = False + sim_specs["user"]["persis_gens"] = 0 persis_info = add_unique_random_streams({}, nworkers + 1) else: # Uses static scheduler - will find node 1 slots first + libE_specs["gen_on_worker"] = True + sim_specs["user"]["expect"] = exp_list_static del libE_specs["num_resource_sets"] libE_specs["zero_resource_workers"] = [1] # Gen must be worker 1 sim_specs["user"]["offset_for_scheduler"] = False + sim_specs["user"]["persis_gens"] = 1 persis_info = add_unique_random_streams({}, nworkers + 1) # Perform the run diff --git a/libensemble/tests/functionality_tests/test_mpi_runners_zrw_supernode_uneven.py b/libensemble/tests/functionality_tests/test_mpi_runners_zrw_supernode_uneven.py index f52aced21c..8eacafb23b 100644 --- a/libensemble/tests/functionality_tests/test_mpi_runners_zrw_supernode_uneven.py +++ b/libensemble/tests/functionality_tests/test_mpi_runners_zrw_supernode_uneven.py @@ -32,9 +32,10 @@ sim_app = "/path/to/fakeapp.x" comms = libE_specs["comms"] - libE_specs["zero_resource_workers"] = [1] + libE_specs["zero_resource_workers"] = [0, 1] libE_specs["dedicated_mode"] = True libE_specs["enforce_worker_core_bounds"] = True + libE_specs["gen_on_worker"] = True # To allow visual checking - log file not used in test log_file = "ensemble_mpi_runners_zrw_supernode_uneven_comms_" + str(comms) + "_wrks_" + str(nworkers) + ".log" @@ -44,7 +45,7 @@ # For varying size test - relate node count to nworkers in_place = libE_specs["zero_resource_workers"] - n_gens = len(in_place) + n_gens = len([w for w in in_place if w != 0]) nsim_workers = nworkers - n_gens comms = libE_specs["comms"] node_file = "nodelist_mpi_runners_zrw_supernode_uneven_comms_" + str(comms) + "_wrks_" + str(nworkers) diff --git a/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py b/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py index d9e8594f2e..ed744a9501 100644 --- a/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py +++ b/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py @@ -13,7 +13,7 @@ # Do not change these lines - they are parsed by run-tests.sh # TESTSUITE_COMMS: mpi local -# TESTSUITE_NPROCS: 5 7 +# TESTSUITE_NPROCS: 3 5 # TESTSUITE_OS_SKIP: WIN import sys @@ -35,6 +35,8 @@ n = 2 init_batch_size = nworkers - ngens + libE_specs["gen_on_worker"] = True + if ngens >= nworkers: sys.exit("The number of generators must be less than the number of workers -- aborting...") diff --git a/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py b/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py index 50c9fd9ce4..782ed4233c 100644 --- a/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py +++ b/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py @@ -84,9 +84,9 @@ sim_specs["in"] = ["x", "obj_component"] # sim_specs["out"] = [("f", float), ("grad", float, n)] elif run == 3: - libE_specs["gen_on_manager"] = True + libE_specs["gen_on_worker"] = True elif run == 4: - libE_specs["gen_on_manager"] = False + libE_specs["gen_on_worker"] = False libE_specs["gen_workers"] = [2] # Perform the run diff --git a/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_nonblocking.py b/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_nonblocking.py index c9e92a6c4f..05ae2ef8e4 100644 --- a/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_nonblocking.py +++ b/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_nonblocking.py @@ -65,4 +65,4 @@ assert len(np.unique(H["gen_ended_time"])) == 2 save_libE_output(H, persis_info, __file__, nworkers) - assert persis_info[1]["spin_count"] > 0, "This should have been a nonblocking receive" + assert persis_info[0]["spin_count"] > 0, "This should have been a nonblocking receive" diff --git a/libensemble/tests/functionality_tests/test_runlines_adaptive_workers_persistent_oversubscribe_rsets.py b/libensemble/tests/functionality_tests/test_runlines_adaptive_workers_persistent_oversubscribe_rsets.py index a317d90433..0547da4f3f 100644 --- a/libensemble/tests/functionality_tests/test_runlines_adaptive_workers_persistent_oversubscribe_rsets.py +++ b/libensemble/tests/functionality_tests/test_runlines_adaptive_workers_persistent_oversubscribe_rsets.py @@ -30,9 +30,9 @@ # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - nsim_workers = nworkers - 1 + nsim_workers = nworkers - libE_specs["zero_resource_workers"] = [1] + libE_specs["zero_resource_workers"] = [0] rsets = nsim_workers * 2 libE_specs["num_resource_sets"] = rsets @@ -86,7 +86,7 @@ "node_file": node_file, } # Name of file containing a node-list - persis_info = add_unique_random_streams({}, nworkers + 1) + persis_info = add_unique_random_streams({}, nworkers) exit_criteria = {"sim_max": 40, "wallclock_max": 300} # Perform the run diff --git a/libensemble/tests/functionality_tests/test_sim_dirs_per_worker.py b/libensemble/tests/functionality_tests/test_sim_dirs_per_worker.py index 59cbe15214..3756bff47c 100644 --- a/libensemble/tests/functionality_tests/test_sim_dirs_per_worker.py +++ b/libensemble/tests/functionality_tests/test_sim_dirs_per_worker.py @@ -24,14 +24,14 @@ from libensemble.tests.regression_tests.support import write_sim_func as sim_f from libensemble.tools import add_unique_random_streams, parse_args -nworkers, is_manager, libE_specs, _ = parse_args() +n_simworkers, is_manager, libE_specs, _ = parse_args() # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": sim_input_dir = "./sim_input_dir" dir_to_copy = sim_input_dir + "/copy_this" dir_to_symlink = sim_input_dir + "/symlink_this" - w_ensemble = "./ensemble_workdirs_w" + str(nworkers) + "_" + libE_specs.get("comms") + w_ensemble = "./ensemble_workdirs_w" + str(n_simworkers) + "_" + libE_specs.get("comms") print("creating ensemble dir: ", w_ensemble, flush=True) for dir in [sim_input_dir, dir_to_copy, dir_to_symlink]: @@ -61,11 +61,9 @@ }, } - alloc_specs = { - "alloc_f": give_sim_work_first, - } + persis_info = add_unique_random_streams({}, n_simworkers) - persis_info = add_unique_random_streams({}, nworkers + 1) + alloc_specs = {"alloc_f": give_sim_work_first} exit_criteria = {"sim_max": 21} @@ -76,9 +74,9 @@ if is_manager: assert os.path.isdir(w_ensemble), f"Ensemble directory {w_ensemble} not created." worker_dir_sum = sum(["worker" in i for i in os.listdir(w_ensemble)]) - assert worker_dir_sum == nworkers, "Number of worker dirs ({}) does not match nworkers ({}).".format( - worker_dir_sum, nworkers - ) + assert ( + worker_dir_sum == n_simworkers + 1 + ), "Number of worker dirs ({}) does not match n_simworkers ({}).".format(worker_dir_sum, n_simworkers) input_copied = [] sim_dir_sum = 0 diff --git a/libensemble/tests/functionality_tests/test_zero_resource_workers.py b/libensemble/tests/functionality_tests/test_zero_resource_workers.py index a1b3e8e7cd..d0be427360 100644 --- a/libensemble/tests/functionality_tests/test_zero_resource_workers.py +++ b/libensemble/tests/functionality_tests/test_zero_resource_workers.py @@ -1,16 +1,14 @@ """ Runs libEnsemble testing the zero_resource_workers argument. -Execute via one of the following commands (e.g. 3 workers): - mpiexec -np 4 python test_zero_resource_workers.py - python test_zero_resource_workers.py --nworkers 3 - python test_zero_resource_workers.py --nworkers 3 --comms tcp +Execute via one of the following commands (e.g. 4 workers): + mpiexec -np 5 python test_zero_resource_workers.py + python test_zero_resource_workers.py --nworkers 4 + python test_zero_resource_workers.py --nworkers 4 --comms tcp -The number of concurrent evaluations of the objective function will be 4-1=3. +The number of concurrent evaluations of the objective function will be 4. """ -import sys - import numpy as np from libensemble import logger @@ -22,11 +20,11 @@ from libensemble.tools import add_unique_random_streams, parse_args # logger.set_level("DEBUG") # For testing the test -logger.set_level("INFO") +logger.set_level("DEBUG") # Do not change these lines - they are parsed by run-tests.sh # TESTSUITE_COMMS: mpi local -# TESTSUITE_NPROCS: 3 4 +# TESTSUITE_NPROCS: 3 5 # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": @@ -35,7 +33,7 @@ sim_app = "/path/to/fakeapp.x" comms = libE_specs["comms"] - libE_specs["zero_resource_workers"] = [1] + libE_specs["zero_resource_workers"] = [0] libE_specs["dedicated_mode"] = True libE_specs["enforce_worker_core_bounds"] = True @@ -46,9 +44,10 @@ nodes_per_worker = 2 # For varying size test - relate node count to nworkers - in_place = libE_specs["zero_resource_workers"] - n_gens = len(in_place) - nsim_workers = nworkers - n_gens + # With gen-on-manager (default), all user workers are sim workers. + # Worker 0 (gen) is hidden and in zero_resource_workers. + n_gens = 0 + nsim_workers = nworkers comms = libE_specs["comms"] nodes_per_worker = 2 @@ -78,8 +77,8 @@ exctr = MPIExecutor(custom_info=mpi_customizer) exctr.register_app(full_path=sim_app, calc_type="sim") - if nworkers < 2: - sys.exit("Cannot run with a persistent worker if only one worker -- aborting...") + # if nworkers < 2: + # sys.exit("Cannot run with a persistent worker if only one worker -- aborting...") n = 2 sim_specs = { diff --git a/libensemble/tests/functionality_tests/test_zero_resource_workers_subnode.py b/libensemble/tests/functionality_tests/test_zero_resource_workers_subnode.py index 978a992d01..7ce60921af 100644 --- a/libensemble/tests/functionality_tests/test_zero_resource_workers_subnode.py +++ b/libensemble/tests/functionality_tests/test_zero_resource_workers_subnode.py @@ -2,13 +2,13 @@ Runs libEnsemble testing the zero_resource_workers argument with 2 workers per node. -This test must be run on an odd number of workers >= 3 (e.g. even number of +This test must be run on an even number of workers >= 2 (e.g. odd number of procs when using mpi4py). Execute via one of the following commands (e.g. 3 workers): - mpiexec -np 4 python test_zero_resource_workers_subnode.py - python test_zero_resource_workers_subnode.py --nworkers 3 - python test_zero_resource_workers_subnode.py --nworkers 3 --comms tcp + mpiexec -np 5 python test_zero_resource_workers_subnode.py + python test_zero_resource_workers_subnode.py --nworkers 4 + python test_zero_resource_workers_subnode.py --nworkers 4 --comms tcp """ import sys @@ -28,7 +28,7 @@ # Do not change these lines - they are parsed by run-tests.sh # TESTSUITE_COMMS: mpi local -# TESTSUITE_NPROCS: 4 +# TESTSUITE_NPROCS: 5 # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": @@ -38,7 +38,7 @@ sim_app = "/path/to/fakeapp.x" comms = libE_specs["comms"] - libE_specs["zero_resource_workers"] = [1] + libE_specs["zero_resource_workers"] = [0] libE_specs["dedicated_mode"] = True libE_specs["enforce_worker_core_bounds"] = True @@ -49,9 +49,10 @@ nodes_per_worker = 0.5 # For varying size test - relate node count to nworkers - in_place = libE_specs["zero_resource_workers"] - n_gens = len(in_place) - nsim_workers = nworkers - n_gens + # With gen-on-manager (default), all user workers are sim workers. + # Worker 0 (gen) is hidden and in zero_resource_workers. + n_gens = 0 + nsim_workers = nworkers if not (nsim_workers * nodes_per_worker).is_integer(): sys.exit(f"Sim workers ({nsim_workers}) must divide evenly into nodes") diff --git a/libensemble/tests/regression_tests/common.py b/libensemble/tests/regression_tests/common.py index cb174d09c9..ebf45cdaac 100644 --- a/libensemble/tests/regression_tests/common.py +++ b/libensemble/tests/regression_tests/common.py @@ -5,6 +5,7 @@ import glob import os import os.path +import sys import time @@ -72,6 +73,8 @@ def build_simfunc(): # Build simfunc # buildstring='mpif90 -o my_simtask.x my_simtask.f90' # On cray need to use ftn buildstring = "mpicc -o my_simtask.x ../unit_tests/simdir/my_simtask.c" + if sys.platform == "darwin": + buildstring = "mpicc -cc=clang -o my_simtask.x ../unit_tests/simdir/my_simtask.c" # subprocess.run(buildstring.split(),check=True) #Python3.5+ subprocess.check_call(buildstring.split()) diff --git a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py index ec7615fdb7..09064eea58 100644 --- a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py +++ b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py @@ -62,7 +62,7 @@ def six_hump_camel_func(x): n = 2 - workflow.libE_specs.gen_on_manager = True + workflow.libE_specs.gen_on_worker = False vocs = VOCS( variables={"core": [-3, 3], "edge": [-2, 2], "core_on_cube": [-3, 3], "edge_on_cube": [-2, 2]}, diff --git a/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py b/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py index 481db84191..60e43fa57e 100644 --- a/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py +++ b/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py @@ -44,7 +44,6 @@ H0["sim_ended"][:500] = True sampling = Ensemble(parse_args=True) - sampling.libE_specs.gen_on_manager = True sampling.H0 = H0 sampling.sim_specs = SimSpecs(sim_f=sim_f, inputs=["x"], out=[("f", float)]) sampling.alloc_specs = AllocSpecs(alloc_f=alloc_f) diff --git a/libensemble/tests/regression_tests/test_optimas_ax_mf.py b/libensemble/tests/regression_tests/test_optimas_ax_mf.py index 5b9f4dca08..fb5b75c321 100644 --- a/libensemble/tests/regression_tests/test_optimas_ax_mf.py +++ b/libensemble/tests/regression_tests/test_optimas_ax_mf.py @@ -41,7 +41,7 @@ def eval_func_mf(input_params): n = 2 batch_size = 2 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) vocs = VOCS( variables={"x0": [-50.0, 5.0], "x1": [-5.0, 15.0], "res": [1.0, 8.0]}, diff --git a/libensemble/tests/regression_tests/test_optimas_ax_multitask.py b/libensemble/tests/regression_tests/test_optimas_ax_multitask.py index e7031bb226..08d97ed6b6 100644 --- a/libensemble/tests/regression_tests/test_optimas_ax_multitask.py +++ b/libensemble/tests/regression_tests/test_optimas_ax_multitask.py @@ -56,7 +56,7 @@ def eval_func_multitask(input_params): n = 2 batch_size = 2 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) vocs = VOCS( variables={ diff --git a/libensemble/tests/regression_tests/test_optimas_ax_sf.py b/libensemble/tests/regression_tests/test_optimas_ax_sf.py index 1a23edf290..b9fbbb34b6 100644 --- a/libensemble/tests/regression_tests/test_optimas_ax_sf.py +++ b/libensemble/tests/regression_tests/test_optimas_ax_sf.py @@ -40,7 +40,7 @@ def eval_func_sf(input_params): n = 2 batch_size = 2 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) vocs = VOCS( variables={ diff --git a/libensemble/tests/regression_tests/test_optimas_grid_sample.py b/libensemble/tests/regression_tests/test_optimas_grid_sample.py index c390be8ca1..5ec670aa95 100644 --- a/libensemble/tests/regression_tests/test_optimas_grid_sample.py +++ b/libensemble/tests/regression_tests/test_optimas_grid_sample.py @@ -41,7 +41,7 @@ def eval_func(input_params: dict): n = 2 batch_size = 4 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) # Create varying parameters. lower_bounds = [-3.0, 2.0] diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_dfols.py b/libensemble/tests/regression_tests/test_persistent_aposmm_dfols.py index c810db561e..ed6f9be504 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_dfols.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_dfols.py @@ -103,7 +103,7 @@ def combine_component(x): H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs) if is_manager: - assert persis_info[1].get("run_order"), "Run_order should have been given back" + assert persis_info[0].get("run_order"), "Run_order should have been given back" assert flag == 0 assert np.min(H["f"][H["sim_ended"]]) <= 3000, "Didn't find a value below 3000" diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_ibcdfo_pounders.py b/libensemble/tests/regression_tests/test_persistent_aposmm_ibcdfo_pounders.py index ed9dd71a39..ff3e2404ef 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_ibcdfo_pounders.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_ibcdfo_pounders.py @@ -130,7 +130,7 @@ def synthetic_beamline_mapping(H, _, sim_specs): H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs) if is_manager: - assert persis_info[1].get("run_order"), "Run_order should have been given back" + assert persis_info[0].get("run_order"), "Run_order should have been given back" assert flag == 0 save_libE_output(H, persis_info, __file__, nworkers) diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_periodic.py b/libensemble/tests/regression_tests/test_persistent_aposmm_periodic.py index d99e8802a0..8cc215800d 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_periodic.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_periodic.py @@ -89,7 +89,7 @@ H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs) if is_manager: - assert persis_info[1].get("run_order"), "Run_order should have been given back" + assert persis_info[0].get("run_order"), "Run_order should have been given back" min_ids = np.where(H["local_min"]) # The minima are known on this test problem. If the above [lb, ub] domain is diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_timeout.py b/libensemble/tests/regression_tests/test_persistent_aposmm_timeout.py index a31dd01878..111887c2cd 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_timeout.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_timeout.py @@ -88,6 +88,6 @@ if is_manager: assert flag == 2, "Test should have timed out" - assert persis_info[1].get("run_order"), "Run_order should have been given back" + assert persis_info[0].get("run_order"), "Run_order should have been given back" min_ids = np.where(H["local_min"]) save_libE_output(H, persis_info, __file__, nworkers) diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_with_grad.py b/libensemble/tests/regression_tests/test_persistent_aposmm_with_grad.py index a0364e1c14..1562e7f69b 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_with_grad.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_with_grad.py @@ -122,9 +122,9 @@ H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0=H0) if is_manager: - assert persis_info[1].get("run_order"), "Run_order should have been given back" + assert persis_info[0].get("run_order"), "Run_order should have been given back" assert ( - len(persis_info[1]["run_order"]) >= gen_specs["user"]["stop_after_k_minima"] + len(persis_info[0]["run_order"]) >= gen_specs["user"]["stop_after_k_minima"] ), "This test should have many runs started." assert len(H) < exit_criteria["sim_max"], "Test should have stopped early due to 'stop_after_k_minima'" diff --git a/libensemble/tests/regression_tests/test_persistent_fd_param_finder.py b/libensemble/tests/regression_tests/test_persistent_fd_param_finder.py index ac01d5683b..de97470dc2 100644 --- a/libensemble/tests/regression_tests/test_persistent_fd_param_finder.py +++ b/libensemble/tests/regression_tests/test_persistent_fd_param_finder.py @@ -70,6 +70,6 @@ if fd_test.is_manager: assert len(H) < fd_test.exit_criteria.gen_max, "Problem didn't stop early, which should have been the case." - assert np.all(persis_info[1]["Fnoise"] > 0), "gen_f didn't find noise for all F_i components." + assert np.all(persis_info[0]["Fnoise"] > 0), "gen_f didn't find noise for all F_i components." fd_test.save_output(__file__) diff --git a/libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py b/libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py index 38b6140fcd..942c57e9a3 100644 --- a/libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py +++ b/libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py @@ -2,8 +2,6 @@ Example of multi-fidelity optimization using a persistent GP gen_func (calling Ax). -This test uses the gen_on_manager option (persistent generator runs on -a thread). Therefore nworkers is the number of simulation workers. Execute via one of the following commands: mpiexec -np 4 python test_persistent_gp_multitask_ax.py @@ -63,7 +61,6 @@ def run_simulation(H, persis_info, sim_specs, libE_info): # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - libE_specs["gen_on_manager"] = True mt_params = { "name_hifi": "expensive_model", diff --git a/libensemble/tests/regression_tests/test_xopt_EI.py b/libensemble/tests/regression_tests/test_xopt_EI.py index 69eea46dc1..7fb158b58b 100644 --- a/libensemble/tests/regression_tests/test_xopt_EI.py +++ b/libensemble/tests/regression_tests/test_xopt_EI.py @@ -52,7 +52,7 @@ def xtest_sim(H, persis_info, sim_specs, _): n = 2 batch_size = 4 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) libE_specs.reuse_output_dir = True vocs = VOCS( diff --git a/libensemble/tests/regression_tests/test_xopt_EI_xopt_sim.py b/libensemble/tests/regression_tests/test_xopt_EI_xopt_sim.py index 9efffb58e0..13939169f7 100644 --- a/libensemble/tests/regression_tests/test_xopt_EI_xopt_sim.py +++ b/libensemble/tests/regression_tests/test_xopt_EI_xopt_sim.py @@ -46,7 +46,7 @@ def xtest_callable(input_dict: dict, a=0) -> dict: n = 2 batch_size = 4 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) libE_specs.reuse_output_dir = True vocs = VOCS( diff --git a/libensemble/tests/regression_tests/test_xopt_nelder_mead.py b/libensemble/tests/regression_tests/test_xopt_nelder_mead.py index 4f0a4d285a..30d0952077 100644 --- a/libensemble/tests/regression_tests/test_xopt_nelder_mead.py +++ b/libensemble/tests/regression_tests/test_xopt_nelder_mead.py @@ -37,7 +37,7 @@ def rosenbrock_callable(input_dict: dict) -> dict: batch_size = 1 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) libE_specs.reuse_output_dir = True vocs = VOCS( diff --git a/libensemble/tests/run_tests.py b/libensemble/tests/run_tests.py index 1168261fdb..72ef5633ee 100755 --- a/libensemble/tests/run_tests.py +++ b/libensemble/tests/run_tests.py @@ -264,7 +264,10 @@ def build_forces(root_dir): """Build forces.x using mpicc.""" cprint("Building forces.x before running regression tests...", style="yellow", newline=True) forces_app_dir = Path(root_dir) / "libensemble/tests/scaling_tests/forces/forces_app" - subprocess.run(["mpicc", "-O3", "-o", "forces.x", "forces.c", "-lm"], cwd=forces_app_dir, check=True) + build_cmd = ["mpicc", "-O3", "-o", "forces.x", "forces.c", "-lm"] + if platform.system() == "Darwin": + build_cmd = ["mpicc", "-cc=clang", "-O3", "-o", "forces.x", "forces.c", "-lm"] + subprocess.run(build_cmd, cwd=forces_app_dir, check=True) destination_dir = Path(root_dir) / "libensemble/tests/forces_app" os.makedirs(destination_dir, exist_ok=True) shutil.copy(forces_app_dir / "forces.x", destination_dir) diff --git a/libensemble/tests/scaling_tests/forces/forces_app/build_forces.sh b/libensemble/tests/scaling_tests/forces/forces_app/build_forces.sh index b8b379e0ee..8dd599ffed 100755 --- a/libensemble/tests/scaling_tests/forces/forces_app/build_forces.sh +++ b/libensemble/tests/scaling_tests/forces/forces_app/build_forces.sh @@ -4,8 +4,12 @@ # Building flat MPI # ------------------------------------------------- -# GCC -mpicc -O3 -o forces.x forces.c -lm +# macOS (Apple Silicon with pixi) / GCC +if [[ "$OSTYPE" == "darwin"* ]]; then + mpicc -cc=clang -O3 -o forces.x forces.c -lm +else + mpicc -O3 -o forces.x forces.c -lm +fi # Intel # mpiicc -O3 -o forces.x forces.c diff --git a/libensemble/tests/scaling_tests/forces/forces_simple_xopt/run_libe_forces.py b/libensemble/tests/scaling_tests/forces/forces_simple_xopt/run_libe_forces.py index 2a7f8f7018..2d23c83a88 100644 --- a/libensemble/tests/scaling_tests/forces/forces_simple_xopt/run_libe_forces.py +++ b/libensemble/tests/scaling_tests/forces/forces_simple_xopt/run_libe_forces.py @@ -32,7 +32,6 @@ # Persistent gen does not need resources ensemble.libE_specs = LibeSpecs( - gen_on_manager=True, sim_dirs_make=True, ) diff --git a/libensemble/tests/standalone_tests/kill_test/build.sh b/libensemble/tests/standalone_tests/kill_test/build.sh index 8e47571e8c..0094221036 100755 --- a/libensemble/tests/standalone_tests/kill_test/build.sh +++ b/libensemble/tests/standalone_tests/kill_test/build.sh @@ -1,2 +1,7 @@ -mpicc -g -o burn_time.x burn_time.c -mpicc -g -o sleep_and_print.x sleep_and_print.c +if [[ "$OSTYPE" == "darwin"* ]]; then + mpicc -cc=clang -g -o burn_time.x burn_time.c + mpicc -cc=clang -g -o sleep_and_print.x sleep_and_print.c +else + mpicc -g -o burn_time.x burn_time.c + mpicc -g -o sleep_and_print.x sleep_and_print.c +fi diff --git a/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py b/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py index 6d056b1e01..9d1d45fece 100644 --- a/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py +++ b/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py @@ -34,6 +34,25 @@ ], ) +W_gen_mgr = np.array( + [ + (0, True, 0, 0, False, False), + (1, False, 0, 0, False, False), + (2, False, 0, 0, False, False), + (3, False, 0, 0, False, False), + (4, False, 0, 0, False, False), + ], + dtype=[ + ("worker_id", " 0: try: - rset_team = self.sched.assign_resources(rsets_req, use_gpus=False, user_params=user_params) + rset_team = self.sched.assign_resources(rsets_req, use_gpus=True, user_params=user_params) return rset_team - except (InsufficientFreeResources, InsufficientResourcesError): - pass + except InsufficientResourcesError: + pass # More rsets requested than GPU rsets exist - fall back to any rset_team = self.sched.assign_resources(rsets_req, use_gpus, user_params) return rset_team @@ -132,7 +136,7 @@ def fltr_gen_workers(): wrks = [] for wrk in self.W: - if fltr_recving() and fltr_persis() and fltr_zrw() and fltr_gen_workers(): + if all((fltr_recving(), fltr_persis(), fltr_zrw(), fltr_gen_workers())): wrks.append(wrk["worker_id"]) return wrks @@ -160,11 +164,6 @@ def _req_resources_sim(self, libE_info, user_params, H, H_rows): ) else: num_rsets_req = 1 - if "use_gpus" in H.dtype.names: - if np.any(H[H_rows]["use_gpus"]): - use_gpus = True - else: - use_gpus = False if "num_gpus" in H.dtype.names: gpus_per_rset = self.resources.resource_manager.gpus_per_rset num_rsets_req_for_gpus = AllocSupport._convert_rows_to_rsets( diff --git a/libensemble/utils/pydantic_bindings.py b/libensemble/utils/pydantic_bindings.py index 6ae28efe8b..2177c197e6 100644 --- a/libensemble/utils/pydantic_bindings.py +++ b/libensemble/utils/pydantic_bindings.py @@ -7,6 +7,7 @@ from libensemble import specs from libensemble.resources import platforms from libensemble.utils.validators import ( + check_adjust_zrw_on_gen_on_worker, check_any_workers_and_disable_rm_if_tcp, check_exit_criteria, check_gpu_setting_type, @@ -95,6 +96,7 @@ "set_default_comms": set_default_comms, "set_workflow_dir": set_workflow_dir, "set_calc_dirs_on_input_dir": set_calc_dirs_on_input_dir, + "check_adjust_zrw_on_gen_on_worker": check_adjust_zrw_on_gen_on_worker, }, ) diff --git a/libensemble/utils/specs_checkers.py b/libensemble/utils/specs_checkers.py index b8e793fa51..ff73118855 100644 --- a/libensemble/utils/specs_checkers.py +++ b/libensemble/utils/specs_checkers.py @@ -100,3 +100,10 @@ def _check_logical_cores(values): scg(values, "logical_cores_per_node") % scg(values, "cores_per_node") == 0 ), "Logical cores doesn't divide evenly into cores" return values + + +def _check_adjust_zrw_on_gen_on_worker(values): + """When gen_on_worker is set the default zero_resource_worker value complicates resources""" + if scg(values, "gen_on_worker") and scg(values, "zero_resource_workers") == [0]: + scs(values, "zero_resource_workers", []) + return values diff --git a/libensemble/utils/validators.py b/libensemble/utils/validators.py index 2164bf2f40..e4aef42810 100644 --- a/libensemble/utils/validators.py +++ b/libensemble/utils/validators.py @@ -7,6 +7,7 @@ from libensemble.resources.platforms import Platform from libensemble.utils.specs_checkers import ( + _check_adjust_zrw_on_gen_on_worker, _check_any_workers_and_disable_rm_if_tcp, _check_exit_criteria, _check_H0, @@ -117,6 +118,11 @@ def check_mpi_runner_type(cls, value): check_mpi_runner_type = field_validator("mpi_runner")(classmethod(check_mpi_runner_type)) +@model_validator(mode="after") +def check_adjust_zrw_on_gen_on_worker(self): + return _check_adjust_zrw_on_gen_on_worker(self) + + @model_validator(mode="after") def check_any_workers_and_disable_rm_if_tcp(self): return _check_any_workers_and_disable_rm_if_tcp(self) diff --git a/libensemble/worker.py b/libensemble/worker.py index 3574f74678..3a472899e6 100644 --- a/libensemble/worker.py +++ b/libensemble/worker.py @@ -53,7 +53,7 @@ def worker_main( sim_specs: dict, gen_specs: dict, libE_specs: dict, - workerID: int = None, + workerID: int | None = None, log_comm: bool = True, resources: Resources = None, executor: Executor = None, @@ -111,9 +111,10 @@ def worker_main( worker_logging_config(comm, workerID) LS = LocationStack() - LS.register_loc("workflow", Path(libE_specs.get("workflow_dir_path"))) + LS.register_loc("workflow", Path(libE_specs.get("workflow_dir_path", "."))) # Set up and run worker + assert workerID is not None worker = Worker(comm, dtypes, workerID, sim_specs, gen_specs, libE_specs) with LS.loc("workflow"): if Executor.executor is not None: @@ -122,7 +123,7 @@ def worker_main( if libE_specs.get("profile"): pr.disable() - profile_state_fname = "worker_%d.prof" % (workerID) + profile_state_fname = "worker_%d.prof" % workerID pr.dump_stats(profile_state_fname) @@ -132,7 +133,7 @@ def worker_main( class WorkerErrMsg: - def __init__(self, msg, exc): + def __init__(self, msg: str, exc: str) -> None: self.msg = msg self.exc = exc @@ -180,7 +181,7 @@ def __init__( self.runners = {EVAL_SIM_TAG: self.sim_runner.run, EVAL_GEN_TAG: self.gen_runner.run} self.calc_iter = {EVAL_SIM_TAG: 0, EVAL_GEN_TAG: 0} Worker._set_executor(self.workerID, self.comm) - Worker._set_resources(self.workerID, self.comm) + Worker._set_resources(self.workerID, self.comm, self.libE_specs) self.EnsembleDirectory = EnsembleDirectory(libE_specs=libE_specs) @staticmethod @@ -218,11 +219,11 @@ def _set_executor(workerID: int, comm: Comm) -> bool: return False @staticmethod - def _set_resources(workerID, comm: Comm) -> bool: + def _set_resources(workerID, comm: Comm, libE_specs) -> bool: """Sets worker ID in the resources, return True if set""" resources = Resources.resources if isinstance(resources, Resources): - resources.set_worker_resources(comm.get_num_workers(), workerID) + resources.set_worker_resources(comm.get_num_workers() + 1, workerID) return True else: logger.debug(f"No resources set on worker {workerID}") @@ -240,7 +241,7 @@ def _extract_debug_data(self, calc_type, Work): calc_id = calc_id.rjust(5, " ") return enum_desc, calc_id - def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, int): + def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> tuple[npt.NDArray | None, dict, int | str]: """Runs a calculation on this worker object. This routine calls the user calculations. Exceptions are caught, @@ -282,7 +283,7 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, logger.debug(f"Returned from user function for {enum_desc} {calc_id}") - calc_status = UNSET_TAG + calc_status: int | str = UNSET_TAG # Check for buffered receive if self.comm.recv_buffer: tag, message = self.comm.recv() @@ -316,7 +317,7 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, logging.getLogger(LogConfig.config.stats_name).info(calc_msg) - def _get_calc_msg(self, enum_desc: str, calc_id: int, calc_type: int, timer: Timer, status: str) -> str: + def _get_calc_msg(self, enum_desc: str, calc_id: str, calc_type: str, timer: Timer, status: int | str) -> str: """Construct line for libE_stats.txt file""" calc_msg = f"{enum_desc} {calc_id}: {calc_type} {timer}" @@ -332,7 +333,7 @@ def _get_calc_msg(self, enum_desc: str, calc_id: int, calc_type: int, timer: Tim return calc_msg - def _recv_H_rows(self, Work: dict) -> (dict, int, npt.NDArray): + def _recv_H_rows(self, Work: dict) -> tuple[dict, int, npt.NDArray]: """Unpacks Work request and receives any history rows""" libE_info = Work["libE_info"] calc_type = Work["tag"] @@ -346,7 +347,7 @@ def _recv_H_rows(self, Work: dict) -> (dict, int, npt.NDArray): return libE_info, calc_type, calc_in - def _handle(self, Work: dict) -> dict: + def _handle(self, Work: dict) -> dict | None: """Handles a work request from the manager""" # Check work request and receive second message (if needed) libE_info, calc_type, calc_in = self._recv_H_rows(Work) diff --git a/pixi.lock b/pixi.lock index 3357902824..0647ac6202 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:61c5432ee07721317765d0bd57cc8802f96ec9376005b4bedc1bb26f39dc116f -size 1020189 +oid sha256:af6db44054dd36ecf322829eafdcde16863284fb381111b03c9d7712ca7fe62b +size 1018583 diff --git a/pyproject.toml b/pyproject.toml index 871e651525..cf65092a70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -187,7 +187,7 @@ gest-api = ">=0.1,<0.2" # macOS dependencies [tool.pixi.target.osx-arm64.dependencies] -clang_osx-arm64 = ">=21.1.7,<22" +clang_osx-arm64 = ">=22.1.0,<23" # Linux dependencies [tool.pixi.target.linux-64.dependencies] @@ -208,7 +208,7 @@ docs = ["pyenchant", "enchant>=0.0.1,<0.0.2", "sphinx-lfs-content>=1.1.10,<2"] # Various config from here onward [tool.black] line-length = 120 -target-version = ['py310', 'py311', 'py312', 'py313'] +target-version = ["py311", "py312", "py313", "py314"] force-exclude = ''' ( /(