diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 515439b2b..dd88d2018 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -37,3 +37,4 @@ repos: rev: v1.19.1 hooks: - id: mypy + args: [--config-file=pyproject.toml] diff --git a/docs/FAQ.rst b/docs/FAQ.rst index b8a3ea2ce..0339dbb68 100644 --- a/docs/FAQ.rst +++ b/docs/FAQ.rst @@ -13,11 +13,6 @@ We recommend using the following options to help debug workflows:: logger.set_level("DEBUG") libE_specs["safe_mode"] = True -To make it easier to debug a generator try setting the **libE_specs** option ``gen_on_manager``. -To do so, add the following to your calling script:: - - libE_specs["gen_on_manager"] = True - With this, ``pdb`` breakpoints can be set as usual in the generator. For more debugging options see "How can I debug specific libEnsemble processes?" below. diff --git a/docs/data_structures/libE_specs.rst b/docs/data_structures/libE_specs.rst index bc2df3473..1a31b41b8 100644 --- a/docs/data_structures/libE_specs.rst +++ b/docs/data_structures/libE_specs.rst @@ -9,7 +9,7 @@ libEnsemble is primarily customized by setting options within a ``LibeSpecs`` in from libensemble.specs import LibeSpecs - specs = LibeSpecs(gen_on_manager=True, save_every_k_gens=100, sim_dirs_make=True, nworkers=4) + specs = LibeSpecs(save_every_k_gens=100, sim_dirs_make=True, nworkers=4) .. dropdown:: Settings by Category :open: @@ -26,9 +26,8 @@ libEnsemble is primarily customized by setting options within a ``LibeSpecs`` in **nworkers** [int]: Number of worker processes in ``"local"``, ``"threads"``, or ``"tcp"``. - **gen_on_manager** [bool] = False - Instructs Manager process to run generator functions. - This generator function can access/modify user objects by reference. + **gen_on_worker** [bool] = False + Instructs Worker process to run generator instead of Manager. **mpi_comm** [MPI communicator] = ``MPI.COMM_WORLD``: libEnsemble MPI communicator. diff --git a/docs/overview_usecases.rst b/docs/overview_usecases.rst index 7d5733d91..e31d6c899 100644 --- a/docs/overview_usecases.rst +++ b/docs/overview_usecases.rst @@ -53,8 +53,8 @@ to support): .. dropdown:: **Click Here for Use-Cases** * A user wants to optimize a simulation calculation. The simulation may - already be using parallel resources but not a large fraction of a - computer. libEnsemble can coordinate concurrent evaluations of the + already be using parallel resources but not a large fraction of a + computer. libEnsemble can coordinate concurrent evaluations of the simulation ``sim_f`` at multiple parameter values based on candidate parameter values produced by ``gen_f`` (possibly after each ``sim_f`` output). @@ -117,7 +117,7 @@ its capabilities. * **User function**: A generator, simulator, or allocation function. These Python functions govern the libEnsemble workflow. They must conform to the libEnsemble API for each respective user function, but otherwise can - be created or modified by the user. + be created or modified by the user. libEnsemble includes many examples of each type. * **Executor**: The executor provides a simple, portable interface for @@ -138,14 +138,14 @@ its capabilities. allowing them to maintain and update data structures efficiently. These calculations and their assigned workers are referred to as *persistent*. - * **Resource Manager**: libEnsemble includes a built-in resource manager that can detect + * **Resource Manager**: libEnsemble includes a built-in resource manager that can detect (or be provided with) available resources (e.g., a node list). Resources are divided among workers using *resource sets* and can be dynamically reassigned. * **Resource Set**: The smallest unit of resources that can be assigned (and dynamically reassigned) to workers. By default this is the provisioned resources - divided by the number of workers (excluding any workers listed in the + divided by the number of workers (excluding any workers listed in the ``zero_resource_workers`` ``libE_specs`` option). It can also be set explicitly using the ``num_resource_sets`` ``libE_specs`` option. diff --git a/docs/platforms/aurora.rst b/docs/platforms/aurora.rst index af2e7cc16..4865ba0c1 100644 --- a/docs/platforms/aurora.rst +++ b/docs/platforms/aurora.rst @@ -57,7 +57,7 @@ simulations for each worker: .. code-block:: python # Instruct libEnsemble to exit after this many simulations - ensemble.exit_criteria = ExitCriteria(sim_max=nsim_workers*2) + ensemble.exit_criteria = ExitCriteria(sim_max=nsim_workers * 2) Now grab an interactive session on two nodes (or use the batch script at ``../submission_scripts/submit_pbs_aurora.sh``):: @@ -115,26 +115,6 @@ will use one GPU tile):: python run_libe_forces.py -n 25 -Running generator on the manager --------------------------------- - -An alternative is to run the generator on a thread on the manager. The -number of workers can then be set to the number of simulation workers. - -Change the ``libE_specs`` in **run_libe_forces.py** as follows: - -.. code-block:: python - - nsim_workers = ensemble.nworkers - - # Persistent gen does not need resources - ensemble.libE_specs = LibeSpecs( - gen_on_manager=True, - -then we can run with 12 (instead of 13) workers:: - - python run_libe_forces.py -n 12 - Dynamic resource assignment --------------------------- diff --git a/docs/platforms/perlmutter.rst b/docs/platforms/perlmutter.rst index bc8f1af5e..a2768e1d2 100644 --- a/docs/platforms/perlmutter.rst +++ b/docs/platforms/perlmutter.rst @@ -105,27 +105,6 @@ To see GPU usage, ssh into the node you are on in another window and run:: watch -n 0.1 nvidia-smi -Running generator on the manager -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -An alternative is to run the generator on a thread on the manager. The -number of workers can then be set to the number of simulation workers. - -Change the ``libE_specs`` in **run_libe_forces.py** as follows. - - .. code-block:: python - - nsim_workers = ensemble.nworkers - - # Persistent gen does not need resources - ensemble.libE_specs = LibeSpecs( - gen_on_manager=True, - ) - -and run with:: - - python run_libe_forces.py -n 4 - To watch video ^^^^^^^^^^^^^^ diff --git a/docs/platforms/platforms_index.rst b/docs/platforms/platforms_index.rst index 79285aa7b..391cfb6c6 100644 --- a/docs/platforms/platforms_index.rst +++ b/docs/platforms/platforms_index.rst @@ -24,35 +24,21 @@ simulation worker, and libEnsemble will distribute user applications across the node allocation. This is the **most common approach** where each simulation runs an MPI application. -The generator will run on a worker by default, but if running a single generator, -the :ref:`libE_specs` option **gen_on_manager** is recommended, -which runs the generator on the manager (using a thread) as below. +.. image:: ../images/centralized_gen_on_manager.png + :alt: centralized + :scale: 55 -.. list-table:: - :widths: 60 40 +A SLURM batch script may include: - * - .. image:: ../images/centralized_gen_on_manager.png - :alt: centralized - :scale: 55 +.. code-block:: bash - - In calling script: + #SBATCH --nodes 3 - .. code-block:: python - :linenos: + python run_libe_forces.py --nworkers 3 - ensemble.libE_specs = LibeSpecs( - gen_on_manager=True, - ) - - A SLURM batch script may include: - - .. code-block:: bash - - #SBATCH --nodes 3 - - python run_libe_forces.py --nworkers 3 - -When using **gen_on_manager**, set ``nworkers`` to the number of workers desired for running simulations. +If running multiple generator processes instead, then set the +:ref:`libE_specs` option **gen_on_worker** so that multiple +worker processes can run multiple generator instances. Dedicated Mode ^^^^^^^^^^^^^^ @@ -62,32 +48,29 @@ True, the MPI executor will not launch applications on nodes where libEnsemble P processes (manager and workers) are running. Workers launch applications onto the remaining nodes in the allocation. -.. list-table:: - :widths: 60 40 - - * - .. image:: ../images/centralized_dedicated.png - :alt: centralized dedicated mode - :scale: 30 - - In calling script: +.. image:: ../images/centralized_dedicated.png + :alt: centralized dedicated mode + :scale: 30 - .. code-block:: python - :linenos: +In calling script: - ensemble.libE_specs = LibeSpecs( - num_resource_sets=2, - dedicated_mode=True, - ) +.. code-block:: python + :linenos: - A SLURM batch script may include: + ensemble.libE_specs = LibeSpecs( + gen_on_worker=True, + num_resource_sets=2, + dedicated_mode=True, + ) - .. code-block:: bash +A SLURM batch script may include: - #SBATCH --nodes 3 +.. code-block:: bash - python run_libe_forces.py --nworkers 3 + #SBATCH --nodes 3 -Note that **gen_on_manager** is not set in the above example. + python run_libe_forces.py --nworkers 3 Distributed Running ------------------- @@ -137,8 +120,7 @@ Zero-resource workers --------------------- Users with persistent ``gen_f`` functions may notice that the persistent workers -are still automatically assigned system resources. This can be resolved by using -the ``gen_on_manager`` option or by +are still automatically assigned system resources. This can be resolved by :ref:`fixing the number of resource sets`. Assigning GPUs diff --git a/docs/running_libE.rst b/docs/running_libE.rst index 7b8b0532d..50e58afbe 100644 --- a/docs/running_libE.rst +++ b/docs/running_libE.rst @@ -12,13 +12,6 @@ determine the parameters/inputs for simulations. Simulator functions run and manage simulations, which often involve running a user application (see :doc:`Executor`). -.. note:: - As of version 1.3.0, the generator can be run as a thread on the manager, - using the :ref:`libE_specs` option **gen_on_manager**. - When using this option, set the number of workers desired for running - simulations. See :ref:`Running generator on the manager` - for more details. - To use libEnsemble, you will need a calling script, which in turn will specify generator and simulator functions. Many :doc:`examples` are available. @@ -161,29 +154,6 @@ If this example was run as:: No simulations will be able to run. -.. _gen-on-manager: - -Running generator on the manager --------------------------------- - -The majority of libEnsemble use cases run a single generator. The -:ref:`libE_specs` option **gen_on_manager** will cause -the generator function to run on a thread on the manager. This can run -persistent user functions, sharing data structures with the manager, and avoids -additional communication to a generator running on a worker. When using this -option, the number of workers specified should be the (maximum) number of -concurrent simulations. - -If modifying a workflow to use ``gen_on_manager`` consider the following. - -* Set ``nworkers`` to the number of workers desired for running simulations. -* If using :meth:`add_unique_random_streams()` - to seed random streams, the default generator seed will be zero. -* If you have a line like ``libE_specs["nresource_sets"] = nworkers -1``, this - line should be removed. -* If the generator does use resources, ``nresource_sets`` can be increased as needed - so that the generator and all simulations are resourced. - Environment Variables --------------------- diff --git a/docs/tutorials/executor_forces_tutorial.rst b/docs/tutorials/executor_forces_tutorial.rst index e01496734..a083aa2a8 100644 --- a/docs/tutorials/executor_forces_tutorial.rst +++ b/docs/tutorials/executor_forces_tutorial.rst @@ -336,44 +336,6 @@ These may require additional browsing of the documentation to complete. ... -Running the generator on the manager ------------------------------------- - -As of version 1.3.0, the generator can be run on a thread on the manager, -using the :ref:`libE_specs` option **gen_on_manager**. - -Change the libE_specs as follows. - - .. code-block:: python - :linenos: - :lineno-start: 28 - - nsim_workers = ensemble.nworkers - - # Persistent gen does not need resources - ensemble.libE_specs = LibeSpecs( - gen_on_manager=True, - sim_dirs_make=True, - ensemble_dir_path="./test_executor_forces_tutorial", - ) - -When running set ``nworkers`` to the number of workers desired for running simulations. -E.g., Instead of: - -.. code-block:: bash - - python run_libe_forces.py --nworkers 5 - -use: - -.. code-block:: bash - - python run_libe_forces.py --nworkers 4 - -Note that as the generator random number seed will be zero instead of one, the checksum will change. - -For more information see :ref:`Running generator on the manager`. - Running forces application with input file ------------------------------------------ diff --git a/docs/tutorials/gpcam_tutorial.rst b/docs/tutorials/gpcam_tutorial.rst index a013c1b67..096d5584c 100644 --- a/docs/tutorials/gpcam_tutorial.rst +++ b/docs/tutorials/gpcam_tutorial.rst @@ -30,6 +30,7 @@ This version (and others) of the gpCAM generator can be found at `libensemble/ge from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG from libensemble.tools.persistent_support import PersistentSupport + def persistent_gpCAM(H_in, persis_info, gen_specs, libE_info): """Run a batched gpCAM model to create a surrogate""" @@ -156,6 +157,7 @@ For running applications using parallel resources in the simulator see the `forc # Define our simulation function import numpy as np + def six_hump_camel(H, persis_info, sim_specs, _): """Six-Hump Camel sim_f.""" @@ -189,6 +191,8 @@ First we will create a cleanup script so we can easily re-run. # To rerun this notebook, we need to delete the ensemble directory. import shutil + + def cleanup(): try: shutil.rmtree("ensemble") @@ -218,31 +222,30 @@ If you wish to make your own functions based on the above, those can be imported nworkers = 4 - # When using gen_on_manager, nworkers is number of concurrent sims. # final_gen_send means the last evaluated points are returned to the generator to update the model. - libE_specs = LibeSpecs(nworkers=nworkers, gen_on_manager=True, final_gen_send=True) + libE_specs = LibeSpecs(nworkers=nworkers, final_gen_send=True) n = 2 # Input dimensions batch_size = 4 num_batches = 6 gen_specs = GenSpecs( - gen_f=persistent_gpCAM, # Generator function - persis_in=["f"], # Objective, defined in sim, is returned to gen + gen_f=persistent_gpCAM, # Generator function + persis_in=["f"], # Objective, defined in sim, is returned to gen outputs=[("x", float, (n,))], # Parameters (name, type, size) user={ "batch_size": batch_size, "lb": np.array([-2, -1]), # lower boundaries for n dimensions - "ub": np.array([2, 1]), # upper boundaries for n dimensions - "ask_max_iter": 5, # Number of iterations for ask (default 20) + "ub": np.array([2, 1]), # upper boundaries for n dimensions + "ask_max_iter": 5, # Number of iterations for ask (default 20) "rng_seed": 0, }, ) sim_specs = SimSpecs( - sim_f=six_hump_camel, # Simulator function - inputs=["x"], # Input field names. "x" defined in gen - outputs=[("f", float)], # Objective + sim_f=six_hump_camel, # Simulator function + inputs=["x"], # Input field names. "x" defined in gen + outputs=[("f", float)], # Objective ) # Starts one persistent generator. Simulated values are returned in batch. @@ -251,7 +254,7 @@ If you wish to make your own functions based on the above, those can be imported user={"async_return": False}, # False = batch returns ) - exit_criteria = ExitCriteria(sim_max=num_batches*batch_size) + exit_criteria = ExitCriteria(sim_max=num_batches * batch_size) # Initialize and run the ensemble. ensemble = Ensemble( @@ -272,7 +275,7 @@ At the end of our calling script we run the ensemble. H, persis_info, flag = ensemble.run() # Start the ensemble. Blocks until completion. ensemble.save_output("H_array", append_attrs=False) # Save H (history of all evaluated points) to file - pprint(H[["sim_id", "x", "f"]][:16]) # See first 16 results + pprint(H[["sim_id", "x", "f"]][:16]) # See first 16 results Rerun and test model at known points ------------------------------------ @@ -312,15 +315,21 @@ values at the test points. markersize = 10 plt.figure(figsize=(10, 5)) plt.plot( - num_sims, mse, marker="^", markeredgecolor="black", markeredgewidth=2, - markersize=markersize, linewidth=2, label="Mean squared error" + num_sims, + mse, + marker="^", + markeredgecolor="black", + markeredgewidth=2, + markersize=markersize, + linewidth=2, + label="Mean squared error", ) plt.xticks(num_sims) # Labeling the axes and the legend - plt.title('Mean Squared Error at test points') + plt.title("Mean Squared Error at test points") plt.xlabel("Number of simulations") - plt.ylabel('Mean squared error (rad$^2$)') + plt.ylabel("Mean squared error (rad$^2$)") legend = plt.legend(framealpha=1, edgecolor="black") # Increase edge width here plt.grid(True) plt.show() diff --git a/libensemble/ensemble.py b/libensemble/ensemble.py index 05ddd7394..fe079c99a 100644 --- a/libensemble/ensemble.py +++ b/libensemble/ensemble.py @@ -161,11 +161,11 @@ class Ensemble: def __init__( self, - sim_specs: SimSpecs | None = SimSpecs(), - gen_specs: GenSpecs | None = GenSpecs(), - exit_criteria: ExitCriteria | None = {}, - libE_specs: LibeSpecs | None = LibeSpecs(), - alloc_specs: AllocSpecs | None = AllocSpecs(), + sim_specs: SimSpecs | dict | None = SimSpecs(), + gen_specs: GenSpecs | dict | None = GenSpecs(), + exit_criteria: ExitCriteria | dict | None = {}, + libE_specs: LibeSpecs | dict | None = LibeSpecs(), + alloc_specs: AllocSpecs | dict | None = AllocSpecs(), persis_info: dict | None = {}, executor: Executor | None = None, H0: npt.NDArray | None = None, @@ -174,7 +174,11 @@ def __init__( self.sim_specs = sim_specs self.gen_specs = gen_specs self.exit_criteria = exit_criteria - self._libE_specs = libE_specs + self._libE_specs: LibeSpecs | None = None + if isinstance(libE_specs, dict): + self._libE_specs = LibeSpecs(**libE_specs) + else: + self._libE_specs = libE_specs self.alloc_specs = alloc_specs self.persis_info = persis_info self.executor = executor @@ -188,11 +192,10 @@ def __init__( if parse_args: self._parse_args() self.parsed = True - self._known_comms = self._libE_specs.comms + if self._libE_specs: + self._known_comms = self._libE_specs.comms if not self._known_comms and self._libE_specs is not None: - if isinstance(self._libE_specs, dict): - self._libE_specs = LibeSpecs(**self._libE_specs) self._known_comms = self._libE_specs.comms if self._known_comms == "local": @@ -202,9 +205,10 @@ def __init__( elif self._known_comms == "mpi" and not parse_args: # Set internal _nworkers - not libE_specs (avoid "nworkers will be ignored" warning) - self._nworkers, self.is_manager = mpi_init(self._libE_specs.mpi_comm) + if self._libE_specs: + self._nworkers, self.is_manager = mpi_init(self._libE_specs.mpi_comm) - def _parse_args(self) -> (int, bool, LibeSpecs): + def _parse_args(self) -> tuple[int, bool, LibeSpecs]: # Set internal _nworkers - not libE_specs (avoid "nworkers will be ignored" warning) self._nworkers, self.is_manager, libE_specs_parsed, self.extra_args = parse_args_f() @@ -220,7 +224,7 @@ def ready(self) -> bool: return all([i for i in [self.exit_criteria, self._libE_specs, self.sim_specs]]) @property - def libE_specs(self) -> LibeSpecs: + def libE_specs(self) -> LibeSpecs | None: return self._libE_specs @libE_specs.setter @@ -257,7 +261,7 @@ def libE_specs(self, new_specs): def _refresh_executor(self): Executor.executor = self.executor or Executor.executor - def run(self) -> (npt.NDArray, dict, int): + def run(self) -> tuple[npt.NDArray, dict, int]: """ Initializes libEnsemble. @@ -297,10 +301,10 @@ def run(self) -> (npt.NDArray, dict, int): """ self._refresh_executor() - - if self._libE_specs.comms != self._known_comms: + if self._libE_specs and self._libE_specs.comms != self._known_comms: raise ValueError(CHANGED_COMMS_WARN) + assert self._libE_specs is not None self.H, self.persis_info, self.flag = libE( self.sim_specs, self.gen_specs, @@ -365,6 +369,7 @@ def save_output(self, basename: str, append_attrs: bool = True): """ if self.is_manager: if self._get_option("libE_specs", "workflow_dir_path"): + assert self.libE_specs is not None save_libE_output( self.H, self.persis_info, diff --git a/libensemble/libE.py b/libensemble/libE.py index abde5423c..71cc654e8 100644 --- a/libensemble/libE.py +++ b/libensemble/libE.py @@ -155,9 +155,9 @@ def libE( exit_criteria: ExitCriteria, persis_info: dict = {}, alloc_specs: AllocSpecs = AllocSpecs(), - libE_specs: LibeSpecs = {}, + libE_specs: LibeSpecs | dict = {}, H0=None, -) -> (np.ndarray, dict, int): +) -> tuple[np.ndarray, dict, int]: """ Parameters ---------- @@ -243,16 +243,22 @@ def libE( exit_criteria = specs_dump(ensemble.exit_criteria, by_alias=True, exclude_none=True) # Restore objects that don't survive serialization via model_dump - if hasattr(ensemble.gen_specs, "generator") and ensemble.gen_specs.generator is not None: - gen_specs["generator"] = ensemble.gen_specs.generator - if hasattr(ensemble.gen_specs, "vocs") and ensemble.gen_specs.vocs is not None: - gen_specs["vocs"] = ensemble.gen_specs.vocs + if hasattr(ensemble.sim_specs, "simulator") and ensemble.sim_specs.simulator is not None: + sim_specs["simulator"] = ensemble.sim_specs.simulator + if hasattr(ensemble.sim_specs, "vocs") and ensemble.sim_specs.vocs is not None: + sim_specs["vocs"] = ensemble.sim_specs.vocs + + if ensemble.gen_specs is not None: + if hasattr(ensemble.gen_specs, "generator") and ensemble.gen_specs.generator is not None: + gen_specs["generator"] = ensemble.gen_specs.generator + if hasattr(ensemble.gen_specs, "vocs") and ensemble.gen_specs.vocs is not None: + gen_specs["vocs"] = ensemble.gen_specs.vocs # Extract platform info from settings or environment platform_info = get_platform(libE_specs) if libE_specs["dry_run"]: - logger.manager_warning("Dry run. All libE() inputs validated. Exiting.") + logger.manager_warning("Dry run. All libE() inputs validated. Exiting.") # type: ignore[attr-defined] sys.exit() libE_funcs = {"mpi": libE_mpi, "tcp": libE_tcp, "local": libE_local, "threads": libE_local} @@ -287,7 +293,7 @@ def manager( if "out" in gen_specs and ("sim_id", int) in gen_specs["out"]: if hasattr(gen_specs["gen_f"], "__module__") and "libensemble.gen_funcs" not in gen_specs["gen_f"].__module__: - logger.manager_warning(_USER_SIM_ID_WARNING) + logger.manager_warning(_USER_SIM_ID_WARNING) # type: ignore[attr-defined] try: try: @@ -379,7 +385,7 @@ def libE_mpi(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE # Run manager or worker code, depending if is_manager: if resources is not None: - resources.set_resource_manager(nworkers) + resources.set_resource_manager(nworkers + 1) return libE_mpi_manager( mpi_comm, sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0 ) @@ -496,8 +502,9 @@ def libE_local(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, li wcomms = start_proc_team(libE_specs["nworkers"], sim_specs, gen_specs, libE_specs) # Set manager resources after the forkpoint. + # if libE_specs["gen_on_worker"] == True, -n reflects the exact number of workers if resources is not None: - resources.set_resource_manager(libE_specs["nworkers"]) + resources.set_resource_manager(libE_specs["nworkers"] + 1) if not libE_specs["disable_log_files"]: exit_logger = manager_logging_config(specs=libE_specs) diff --git a/libensemble/manager.py b/libensemble/manager.py index c2763b717..e914c8098 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -57,7 +57,7 @@ class LoggedException(Exception): """Raise exception for handling without re-logging""" -def report_worker_exc(wrk_exc: Exception = None) -> None: +def report_worker_exc(wrk_exc: Exception | None = None) -> None: """Write worker exception to log""" if wrk_exc is not None: from_line, msg, exc = wrk_exc.args @@ -75,7 +75,7 @@ def manager_main( exit_criteria: dict, persis_info: dict, wcomms: list = [], -) -> (dict, int, int): +) -> tuple[dict, int, int]: """Manager routine to coordinate the generation and simulation evaluations Parameters @@ -214,9 +214,9 @@ def __init__( self.gen_specs = gen_specs self.exit_criteria = exit_criteria self.elapsed = lambda: timer.elapsed - self.wcomms = wcomms + self.wcomms: Any = wcomms self.WorkerExc = False - self.persis_pending = [] + self.persis_pending: list[int] = [] self.live_data = libE_specs.get("live_data") dyn_keys = ("resource_sets", "num_procs", "num_gpus") @@ -232,19 +232,20 @@ def __init__( (1, "stop_val", self.term_test_stop_val), ] - gen_on_manager = self.libE_specs.get("gen_on_manager", False) + gen_on_worker = self.libE_specs.get("gen_on_worker", False) + len_W = len(self.wcomms) + 1 - gen_on_worker # if gen_on_worker, len_W = len(self.wcomms) - self.W = np.zeros(len(self.wcomms) + gen_on_manager, dtype=Manager.worker_dtype) - if gen_on_manager: + self.W = np.zeros(len_W, dtype=Manager.worker_dtype) + if gen_on_worker: + self.W["worker_id"] = np.arange(len(self.wcomms)) + 1 # [1, 2, 3, ...] + else: self.W["worker_id"] = np.arange(len(self.wcomms) + 1) # [0, 1, 2, ...] self.W[0]["gen_worker"] = True local_worker_comm = self._run_additional_worker(hist, sim_specs, gen_specs, libE_specs) self.wcomms = [local_worker_comm] + self.wcomms - else: - self.W["worker_id"] = np.arange(len(self.wcomms)) + 1 # [1, 2, 3, ...] - self.W = _WorkerIndexer(self.W, gen_on_manager) - self.wcomms = _WorkerIndexer(self.wcomms, gen_on_manager) + self.W = _WorkerIndexer(self.W, 1 - gen_on_worker) # if gen on worker, then no additional worker + self.wcomms = _WorkerIndexer(self.wcomms, 1 - gen_on_worker) temp_EnsembleDirectory = EnsembleDirectory(libE_specs=libE_specs) self.resources = Resources.resources @@ -263,7 +264,9 @@ def __init__( try: temp_EnsembleDirectory.make_copyback() except AssertionError as e: # Ensemble dir exists and isn't empty. - logger.manager_warning(_USER_CALC_DIR_WARNING.format(temp_EnsembleDirectory.ensemble_dir)) + logger.manager_warning( # type: ignore[attr-defined] + _USER_CALC_DIR_WARNING.format(temp_EnsembleDirectory.ensemble_dir) + ) self._kill_workers() raise ManagerException( "Manager errored on initialization", @@ -290,7 +293,7 @@ def term_test_stop_val(self, stop_val: Any) -> bool: """Checks against stop value criterion""" key, val = stop_val H = self.hist.H - return np.any(filter_nans(H[key][H["sim_ended"]]) <= val) + return bool(np.any(filter_nans(H[key][H["sim_ended"]]) <= val)) def term_test(self, logged: bool = True) -> bool | int: """Checks termination criteria""" @@ -399,7 +402,7 @@ def _set_resources(self, Work: dict, w: int) -> None: if rset_req is None: rset_team = [] - default_rset = resource_manager.index_list[w - 1] + default_rset = resource_manager.index_list[w] if default_rset is not None: rset_team.append(default_rset) Work["libE_info"]["rset_team"] = rset_team @@ -554,7 +557,7 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int) -> None: self._kill_workers() raise WorkerException(f"Received error message from worker {w}", D_recv.msg, D_recv.exc) elif isinstance(D_recv, logging.LogRecord): - logger.vdebug(f"Manager received a log message from worker {w}") + logger.vdebug(f"Manager received a log message from worker {w}") # type: ignore[attr-defined] logging.getLogger(D_recv.name).handle(D_recv) else: logger.debug(f"Manager received data message from worker {w}") @@ -585,7 +588,7 @@ def _kill_cancelled_sims(self) -> None: # --- Handle termination - def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int): + def _final_receive_and_kill(self, persis_info: dict) -> tuple[dict, int, int]: """ Tries to receive from any active workers. @@ -623,9 +626,9 @@ def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int): # Elapsed Wallclock has expired if not any(self.W["persis_state"]): if any(self.W["active"]): - logger.manager_warning(_WALLCLOCK_MSG_ACTIVE) + logger.manager_warning(_WALLCLOCK_MSG_ACTIVE) # type: ignore[attr-defined] else: - logger.manager_warning(_WALLCLOCK_MSG_ALL_RETURNED) + logger.manager_warning(_WALLCLOCK_MSG_ALL_RETURNED) # type: ignore[attr-defined] exit_flag = 2 if self.WorkerExc: exit_flag = 1 @@ -662,7 +665,7 @@ def _get_alloc_libE_info(self) -> dict: "use_resource_sets": self.use_resource_sets, "gen_num_procs": self.gen_num_procs, "gen_num_gpus": self.gen_num_gpus, - "gen_on_manager": self.libE_specs.get("gen_on_manager", False), + "gen_on_worker": self.libE_specs.get("gen_on_worker", False), } def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict: @@ -699,7 +702,7 @@ def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict: # --- Main loop - def run(self, persis_info: dict) -> (dict, int, int): + def run(self, persis_info: dict) -> tuple[dict, int, int]: """Runs the manager""" logger.debug(f"Manager initiated on node {socket.gethostname()}") logger.info(f"Manager exit_criteria: {self.exit_criteria}") diff --git a/libensemble/resources/resources.py b/libensemble/resources/resources.py index 105f8a836..5ba54cc09 100644 --- a/libensemble/resources/resources.py +++ b/libensemble/resources/resources.py @@ -63,12 +63,12 @@ def init_resources(cls, libE_specs: dict, platform_info: dict = {}) -> None: libE_specs=libE_specs, platform_info=platform_info, top_level_dir=top_level_dir ) - def __init__(self, libE_specs: dict, platform_info: dict = {}, top_level_dir: str = None) -> None: + def __init__(self, libE_specs: dict, platform_info: dict = {}, top_level_dir: str = "") -> None: """Initiate a new resources object""" self.top_level_dir = top_level_dir or os.getcwd() - self.glob_resources = GlobalResources(libE_specs=libE_specs, platform_info=platform_info, top_level_dir=None) - self.resource_manager = None # For Manager - self.worker_resources = None # For Workers + self.glob_resources = GlobalResources(libE_specs=libE_specs, platform_info=platform_info, top_level_dir="") + self.resource_manager: ResourceManager | None = None # For Manager + self.worker_resources: WorkerResources | None = None # For Workers def set_worker_resources(self, num_workers: int, workerid: int) -> None: """Initiate the worker resources component of resources""" @@ -101,7 +101,7 @@ class GlobalResources: :ivar int num_resource_sets: Number of resource sets, if supplied by the user. """ - def __init__(self, libE_specs: dict, platform_info: dict = {}, top_level_dir: str = None) -> None: + def __init__(self, libE_specs: dict, platform_info: dict = {}, top_level_dir: str = "") -> None: """Initializes a new Resources instance Determines the compute resources available for current allocation, including diff --git a/libensemble/resources/worker_resources.py b/libensemble/resources/worker_resources.py index 5033b2aee..df89625ad 100644 --- a/libensemble/resources/worker_resources.py +++ b/libensemble/resources/worker_resources.py @@ -105,8 +105,8 @@ def free_rsets(self, worker=None): def get_index_list(num_workers: int, num_rsets: int, zero_resource_list: list[int | Any]) -> list[int | None]: """Map WorkerID to index into a nodelist""" index = 0 - index_list = [] - for i in range(1, num_workers + 1): + index_list: list[int | None] = [] + for i in range(0, num_workers): if i in zero_resource_list: index_list.append(None) else: @@ -116,6 +116,11 @@ def get_index_list(num_workers: int, num_rsets: int, zero_resource_list: list[in else: index_list.append(index) index += 1 + + for i in zero_resource_list: + if i >= num_workers: + logger.warning(f"Worker index {i} from zero_resource_workers is out of range (0-{num_workers - 1})") + return index_list @@ -364,7 +369,7 @@ def get_local_nodelist( local_nodelist = list(OrderedDict.fromkeys(team_list)) # Maintain order of nodes logger.debug(f"Worker's local_nodelist is {local_nodelist}") - slots = {} + slots: dict[str, list[int]] = {} for node in local_nodelist: slots[node] = [] diff --git a/libensemble/sim_funcs/run_line_check.py b/libensemble/sim_funcs/run_line_check.py index 530b1e8d9..d30d10ec3 100644 --- a/libensemble/sim_funcs/run_line_check.py +++ b/libensemble/sim_funcs/run_line_check.py @@ -22,7 +22,7 @@ def exp_nodelist_for_worker(exp_list, workerID, nodes_per_worker, persis_gens): node_list = comp.split(",") for node in node_list: node_name, node_num = node.split("-") - offset = workerID - (1 + persis_gens) + offset = workerID - 1 new_num = int(node_num) + int(nodes_per_worker * offset) new_node = "-".join([node_name, str(new_num)]) new_node_list.append(new_node) diff --git a/libensemble/specs.py b/libensemble/specs.py index 718e92734..01e8d21a6 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -344,10 +344,8 @@ class LibeSpecs(BaseModel): nworkers: int | None = 0 """ Number of worker processes in ``"local"``, ``"threads"``, or ``"tcp"``.""" - gen_on_manager: bool | None = False - """ Instructs Manager process to run generator functions. - This generator function can access/modify user objects by reference. - """ + gen_on_worker: bool = False + """ Instructs Worker process to run generator instead of Manager.""" mpi_comm: object | None = None """ libEnsemble MPI communicator. Default: ``MPI.COMM_WORLD``""" @@ -635,7 +633,7 @@ class LibeSpecs(BaseModel): libEnsemble processes (manager and workers) are running. """ - zero_resource_workers: list[int] | None = [] + zero_resource_workers: list[int] | None = [0] """ list of workers that require no resources. For when a fixed mapping of workers to resources is required. Otherwise, use ``num_resource_sets``. diff --git a/libensemble/tests/functionality_tests/test_GPU_gen_resources.py b/libensemble/tests/functionality_tests/test_GPU_gen_resources.py index d77088d7e..5308f12c7 100644 --- a/libensemble/tests/functionality_tests/test_GPU_gen_resources.py +++ b/libensemble/tests/functionality_tests/test_GPU_gen_resources.py @@ -50,8 +50,6 @@ if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - libE_specs["num_resource_sets"] = nworkers # Persistent gen DOES need resources - # Mock GPU system / uncomment to detect GPUs libE_specs["sim_dirs_make"] = True # Will only contain files if dry_run is False libE_specs["gen_dirs_make"] = True # Will only contain files if dry_run is False @@ -80,8 +78,8 @@ "persis_in": ["f", "x", "sim_id"], "out": [("num_procs", int), ("num_gpus", int), ("x", float, n)], "user": { - "initial_batch_size": nworkers - 1, - "max_procs": nworkers - 1, # Any sim created can req. 1 worker up to all. + "initial_batch_size": "set_in_loop", + "max_procs": "set_in_loop", # Any sim created can req. 1 worker up to all. "lb": np.array([-3, -2]), "ub": np.array([3, 2]), "dry_run": dry_run, @@ -97,33 +95,48 @@ } exit_criteria = {"sim_max": 20} - libE_specs["resource_info"] = {"cores_on_node": (nworkers * 2, nworkers * 4), "gpus_on_node": nworkers} base_libE_specs = libE_specs.copy() - for gen_on_manager in [False, True]: + for gen_on_worker in [False, True]: for run in range(5): # reset libE_specs = base_libE_specs.copy() - libE_specs["gen_on_manager"] = gen_on_manager - persis_info = add_unique_random_streams({}, nworkers + 1) + libE_specs["gen_on_worker"] = gen_on_worker + libE_specs["zero_resource_workers"] = [] # perhaps the generator needs GPUs + + resourced_workers = ( + nworkers if gen_on_worker else nworkers + 1 + ) # note this "nworkers" decided before the extra worker starts + sim_workers = nworkers - 1 if gen_on_worker else nworkers + + gen_specs["user"]["initial_batch_size"] = sim_workers + gen_specs["user"]["max_procs"] = sim_workers + + libE_specs["num_resource_sets"] = resourced_workers + libE_specs["resource_info"] = { + "cores_on_node": (resourced_workers * 2, resourced_workers * 4), + "gpus_on_node": resourced_workers, + } + + persis_info = add_unique_random_streams({}, resourced_workers) if run == 0: libE_specs["gen_num_procs"] = 2 elif run == 1: - if gen_on_manager: - print("SECOND LIBE CALL WITH GEN ON MANAGER") + if gen_on_worker: + print("SECOND LIBE CALL WITH GEN ON WORKER INSTEAD OF MANAGER") libE_specs["gen_num_gpus"] = 1 elif run == 2: persis_info["gen_num_gpus"] = 1 elif run == 3: # Two GPUs per resource set - libE_specs["resource_info"]["gpus_on_node"] = nworkers * 2 + libE_specs["resource_info"]["gpus_on_node"] = resourced_workers * 2 persis_info["gen_num_gpus"] = 1 elif run == 4: # Two GPUs requested for gen persis_info["gen_num_procs"] = 2 persis_info["gen_num_gpus"] = 2 - gen_specs["user"]["max_procs"] = max(nworkers - 2, 1) + gen_specs["user"]["max_procs"] = max(sim_workers - 1, 1) # Perform the run H, persis_info, flag = libE( diff --git a/libensemble/tests/functionality_tests/test_asktell_sampling.py b/libensemble/tests/functionality_tests/test_asktell_sampling.py index cebb858df..7da9f1c66 100644 --- a/libensemble/tests/functionality_tests/test_asktell_sampling.py +++ b/libensemble/tests/functionality_tests/test_asktell_sampling.py @@ -34,7 +34,7 @@ def sim_f(In): if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - libE_specs["gen_on_manager"] = True + libE_specs["gen_on_worker"] = False sim_specs = { "sim_f": sim_f, diff --git a/libensemble/tests/functionality_tests/test_asktell_sampling_external_gen.py b/libensemble/tests/functionality_tests/test_asktell_sampling_external_gen.py index 8578da720..1bbbf696f 100644 --- a/libensemble/tests/functionality_tests/test_asktell_sampling_external_gen.py +++ b/libensemble/tests/functionality_tests/test_asktell_sampling_external_gen.py @@ -44,7 +44,7 @@ def sim_f_scalar(In): if __name__ == "__main__": - libE_specs = LibeSpecs(gen_on_manager=True) + libE_specs = LibeSpecs() for test in range(1): # 2 diff --git a/libensemble/tests/functionality_tests/test_evaluate_existing_plus_gen.py b/libensemble/tests/functionality_tests/test_evaluate_existing_plus_gen.py index fe3d8dad8..7a16d7073 100644 --- a/libensemble/tests/functionality_tests/test_evaluate_existing_plus_gen.py +++ b/libensemble/tests/functionality_tests/test_evaluate_existing_plus_gen.py @@ -1,6 +1,6 @@ """ Test libEnsemble's capability to evaluate existing points and then generate -new samples via gen_on_manager. +new samples. Execute via one of the following commands (e.g. 3 workers): mpiexec -np 4 python test_evaluate_existing_sample.py @@ -43,7 +43,6 @@ def create_H0(persis_info, gen_specs, H0_size): if __name__ == "__main__": sampling = Ensemble(parse_args=True) - sampling.libE_specs.gen_on_manager = True sampling.sim_specs = SimSpecs(sim_f=sim_f, inputs=["x"], out=[("f", float)]) gen_specs = { diff --git a/libensemble/tests/functionality_tests/test_mpi_gpu_settings.py b/libensemble/tests/functionality_tests/test_mpi_gpu_settings.py index 31e537a31..121bb08cb 100644 --- a/libensemble/tests/functionality_tests/test_mpi_gpu_settings.py +++ b/libensemble/tests/functionality_tests/test_mpi_gpu_settings.py @@ -66,7 +66,7 @@ # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - libE_specs["num_resource_sets"] = nworkers - 1 # Persistent gen does not need resources + libE_specs["num_resource_sets"] = nworkers # Persistent gen does not need resources libE_specs["use_workflow_dir"] = True # Only a place for Open MPI machinefiles if libE_specs["comms"] == "tcp": @@ -88,8 +88,8 @@ "persis_in": ["f", "x", "sim_id"], "out": [("priority", float), ("resource_sets", int), ("x", float, n)], "user": { - "initial_batch_size": nworkers - 1, - "max_resource_sets": nworkers - 1, # Any sim created can req. 1 worker up to all. + "initial_batch_size": nworkers, + "max_resource_sets": nworkers, # Any sim created can req. 1 worker up to all. "lb": np.array([-3, -2]), "ub": np.array([3, 2]), }, diff --git a/libensemble/tests/functionality_tests/test_mpi_gpu_settings_env.py b/libensemble/tests/functionality_tests/test_mpi_gpu_settings_env.py index 814f5086c..4cfc431c1 100644 --- a/libensemble/tests/functionality_tests/test_mpi_gpu_settings_env.py +++ b/libensemble/tests/functionality_tests/test_mpi_gpu_settings_env.py @@ -43,7 +43,7 @@ # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - libE_specs["num_resource_sets"] = nworkers - 1 # Persistent gen does not need resources + libE_specs["num_resource_sets"] = nworkers # Persistent gen does not need resources libE_specs["use_workflow_dir"] = True # Only a place for Open MPI machinefiles # Optional for organization of output scripts @@ -70,8 +70,8 @@ "persis_in": ["f", "x", "sim_id"], "out": [("priority", float), ("resource_sets", int), ("x", float, n)], "user": { - "initial_batch_size": nworkers - 1, - "max_resource_sets": nworkers - 1, # Any sim created can req. 1 worker up to all. + "initial_batch_size": nworkers, + "max_resource_sets": nworkers, # Any sim created can req. 1 worker up to all. "lb": np.array([-3, -2]), "ub": np.array([3, 2]), }, diff --git a/libensemble/tests/functionality_tests/test_mpi_runners_subnode_uneven.py b/libensemble/tests/functionality_tests/test_mpi_runners_subnode_uneven.py index a5145965b..9312c83b4 100644 --- a/libensemble/tests/functionality_tests/test_mpi_runners_subnode_uneven.py +++ b/libensemble/tests/functionality_tests/test_mpi_runners_subnode_uneven.py @@ -26,7 +26,7 @@ # Do not change these lines - they are parsed by run-tests.sh # TESTSUITE_COMMS: mpi local -# TESTSUITE_NPROCS: 4 6 +# TESTSUITE_NPROCS: 5 7 # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": diff --git a/libensemble/tests/functionality_tests/test_mpi_runners_zrw_subnode_uneven.py b/libensemble/tests/functionality_tests/test_mpi_runners_zrw_subnode_uneven.py index cc73d0e42..68e9c30cd 100644 --- a/libensemble/tests/functionality_tests/test_mpi_runners_zrw_subnode_uneven.py +++ b/libensemble/tests/functionality_tests/test_mpi_runners_zrw_subnode_uneven.py @@ -44,6 +44,7 @@ comms = libE_specs["comms"] libE_specs["dedicated_mode"] = True + libE_specs["zero_resource_workers"] = [0] libE_specs["enforce_worker_core_bounds"] = True # To allow visual checking - log file not used in test @@ -52,7 +53,7 @@ # For varying size test - relate node count to nworkers n_gens = 1 - nsim_workers = nworkers - n_gens + nsim_workers = nworkers # - n_gens if nsim_workers % 2 == 0: sys.exit( @@ -138,10 +139,17 @@ exp_srun.append(srun_p1 + str(nodename) + srun_p2 + str(ntasks) + srun_p3 + str(ntasks) + srun_p4) test_list = test_list_base - exp_list = exp_srun + exp_list_dynamic = exp_srun.copy() + if nworkers == 5: + # Iteration 0 (Dynamic): Workers 1, 2 -> node-2; 3, 4, 5 -> node-1 + n1 = srun_p1 + "node-1" + srun_p2 + "5" + srun_p3 + "5" + srun_p4 + n2 = srun_p1 + "node-2" + srun_p2 + "8" + srun_p3 + "8" + srun_p4 + exp_list_dynamic = [n2, n2, n1, n1, n1] + # Iteration 1 (Static): Worker 1 is gen. Workers 2, 3 -> node-1; 4, 5 -> node-2 + exp_list_static = [n1, n1, n2, n2] + sim_specs["user"] = { "tests": test_list, - "expect": exp_list, "persis_gens": n_gens, } @@ -149,15 +157,21 @@ for prob_id in range(iterations): if prob_id == 0: # Uses dynamic scheduler - will find node 2 slots first (as fewer) - libE_specs["num_resource_sets"] = nworkers - 1 # Any worker can be the gen - sim_specs["user"]["offset_for_scheduler"] = True # Changes expected values + libE_specs["gen_on_worker"] = False + libE_specs["num_resource_sets"] = nworkers + sim_specs["user"]["expect"] = exp_list_dynamic + sim_specs["user"]["offset_for_scheduler"] = False + sim_specs["user"]["persis_gens"] = 0 persis_info = add_unique_random_streams({}, nworkers + 1) else: # Uses static scheduler - will find node 1 slots first + libE_specs["gen_on_worker"] = True + sim_specs["user"]["expect"] = exp_list_static del libE_specs["num_resource_sets"] libE_specs["zero_resource_workers"] = [1] # Gen must be worker 1 sim_specs["user"]["offset_for_scheduler"] = False + sim_specs["user"]["persis_gens"] = 1 persis_info = add_unique_random_streams({}, nworkers + 1) # Perform the run diff --git a/libensemble/tests/functionality_tests/test_mpi_runners_zrw_supernode_uneven.py b/libensemble/tests/functionality_tests/test_mpi_runners_zrw_supernode_uneven.py index 640d613bf..77ce05006 100644 --- a/libensemble/tests/functionality_tests/test_mpi_runners_zrw_supernode_uneven.py +++ b/libensemble/tests/functionality_tests/test_mpi_runners_zrw_supernode_uneven.py @@ -33,9 +33,10 @@ sim_app = "/path/to/fakeapp.x" comms = libE_specs["comms"] - libE_specs["zero_resource_workers"] = [1] + libE_specs["zero_resource_workers"] = [0, 1] libE_specs["dedicated_mode"] = True libE_specs["enforce_worker_core_bounds"] = True + libE_specs["gen_on_worker"] = True # To allow visual checking - log file not used in test log_file = "ensemble_mpi_runners_zrw_supernode_uneven_comms_" + str(comms) + "_wrks_" + str(nworkers) + ".log" @@ -45,7 +46,7 @@ # For varying size test - relate node count to nworkers in_place = libE_specs["zero_resource_workers"] - n_gens = len(in_place) + n_gens = len([w for w in in_place if w != 0]) nsim_workers = nworkers - n_gens comms = libE_specs["comms"] node_file = "nodelist_mpi_runners_zrw_supernode_uneven_comms_" + str(comms) + "_wrks_" + str(nworkers) diff --git a/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py b/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py index d9b946508..7d6389ad3 100644 --- a/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py +++ b/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py @@ -13,7 +13,7 @@ # Do not change these lines - they are parsed by run-tests.sh # TESTSUITE_COMMS: mpi local -# TESTSUITE_NPROCS: 5 7 +# TESTSUITE_NPROCS: 3 5 # TESTSUITE_OS_SKIP: WIN import sys @@ -36,6 +36,8 @@ n = 2 init_batch_size = nworkers - ngens + libE_specs["gen_on_worker"] = True + if ngens >= nworkers: sys.exit("The number of generators must be less than the number of workers -- aborting...") diff --git a/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py b/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py index 81a18a528..643b4723d 100644 --- a/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py +++ b/libensemble/tests/functionality_tests/test_persistent_uniform_sampling.py @@ -87,9 +87,9 @@ sim_specs["in"] = ["x", "obj_component"] # sim_specs["out"] = [("f", float), ("grad", float, n)] elif run == 3: - libE_specs["gen_on_manager"] = True + libE_specs["gen_on_worker"] = True elif run == 4: - libE_specs["gen_on_manager"] = False + libE_specs["gen_on_worker"] = False libE_specs["gen_workers"] = [2] # Perform the run diff --git a/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_nonblocking.py b/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_nonblocking.py index 542557884..b62cbe6b6 100644 --- a/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_nonblocking.py +++ b/libensemble/tests/functionality_tests/test_persistent_uniform_sampling_nonblocking.py @@ -68,4 +68,4 @@ assert len(np.unique(H["gen_ended_time"])) == 2 save_libE_output(H, persis_info, __file__, nworkers) - assert persis_info[1]["spin_count"] > 0, "This should have been a nonblocking receive" + assert persis_info[0]["spin_count"] > 0, "This should have been a nonblocking receive" diff --git a/libensemble/tests/functionality_tests/test_runlines_adaptive_workers_persistent_oversubscribe_rsets.py b/libensemble/tests/functionality_tests/test_runlines_adaptive_workers_persistent_oversubscribe_rsets.py index fb730b966..94bf1a387 100644 --- a/libensemble/tests/functionality_tests/test_runlines_adaptive_workers_persistent_oversubscribe_rsets.py +++ b/libensemble/tests/functionality_tests/test_runlines_adaptive_workers_persistent_oversubscribe_rsets.py @@ -31,9 +31,9 @@ # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - nsim_workers = nworkers - 1 + nsim_workers = nworkers - libE_specs["zero_resource_workers"] = [1] + libE_specs["zero_resource_workers"] = [0] rsets = nsim_workers * 2 libE_specs["num_resource_sets"] = rsets @@ -64,7 +64,7 @@ "persis_in": ["f", "x", "sim_id"], "out": [("priority", float), ("resource_sets", int), ("x", float, n), ("x_on_cube", float, n)], "user": { - "initial_batch_size": nworkers - 1, + "initial_batch_size": nworkers, "max_resource_sets": max_rsets, "lb": np.array([-3, -2]), "ub": np.array([3, 2]), @@ -91,7 +91,7 @@ "node_file": node_file, } # Name of file containing a node-list - persis_info = add_unique_random_streams({}, nworkers + 1) + persis_info = add_unique_random_streams({}, nworkers) exit_criteria = {"sim_max": 40, "wallclock_max": 300} # Perform the run diff --git a/libensemble/tests/functionality_tests/test_sim_dirs_per_worker.py b/libensemble/tests/functionality_tests/test_sim_dirs_per_worker.py index 69bb34ab8..a15ede9e9 100644 --- a/libensemble/tests/functionality_tests/test_sim_dirs_per_worker.py +++ b/libensemble/tests/functionality_tests/test_sim_dirs_per_worker.py @@ -23,14 +23,14 @@ from libensemble.tests.regression_tests.support import write_sim_func as sim_f from libensemble.tools import add_unique_random_streams, parse_args -nworkers, is_manager, libE_specs, _ = parse_args() +n_simworkers, is_manager, libE_specs, _ = parse_args() # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": sim_input_dir = "./sim_input_dir" dir_to_copy = sim_input_dir + "/copy_this" dir_to_symlink = sim_input_dir + "/symlink_this" - w_ensemble = "./ensemble_workdirs_w" + str(nworkers) + "_" + libE_specs.get("comms") + w_ensemble = "./ensemble_workdirs_w" + str(n_simworkers) + "_" + libE_specs.get("comms") print("creating ensemble dir: ", w_ensemble, flush=True) for dir in [sim_input_dir, dir_to_copy, dir_to_symlink]: @@ -60,7 +60,7 @@ }, } - persis_info = add_unique_random_streams({}, nworkers + 1) + persis_info = add_unique_random_streams({}, n_simworkers) exit_criteria = {"sim_max": 21} @@ -69,9 +69,9 @@ if is_manager: assert os.path.isdir(w_ensemble), f"Ensemble directory {w_ensemble} not created." worker_dir_sum = sum(["worker" in i for i in os.listdir(w_ensemble)]) - assert worker_dir_sum == nworkers, "Number of worker dirs ({}) does not match nworkers ({}).".format( - worker_dir_sum, nworkers - ) + assert ( + worker_dir_sum == n_simworkers + 1 + ), "Number of worker dirs ({}) does not match n_simworkers ({}).".format(worker_dir_sum, n_simworkers) input_copied = [] sim_dir_sum = 0 diff --git a/libensemble/tests/functionality_tests/test_zero_resource_workers.py b/libensemble/tests/functionality_tests/test_zero_resource_workers.py index c8f0786d0..bc03ebbb7 100644 --- a/libensemble/tests/functionality_tests/test_zero_resource_workers.py +++ b/libensemble/tests/functionality_tests/test_zero_resource_workers.py @@ -1,16 +1,14 @@ """ Runs libEnsemble testing the zero_resource_workers argument. -Execute via one of the following commands (e.g. 3 workers): - mpiexec -np 4 python test_zero_resource_workers.py - python test_zero_resource_workers.py --nworkers 3 - python test_zero_resource_workers.py --nworkers 3 --comms tcp +Execute via one of the following commands (e.g. 4 workers): + mpiexec -np 5 python test_zero_resource_workers.py + python test_zero_resource_workers.py --nworkers 4 + python test_zero_resource_workers.py --nworkers 4 --comms tcp -The number of concurrent evaluations of the objective function will be 4-1=3. +The number of concurrent evaluations of the objective function will be 4. """ -import sys - import numpy as np from libensemble import logger @@ -23,11 +21,11 @@ from libensemble.tools import add_unique_random_streams, parse_args # logger.set_level("DEBUG") # For testing the test -logger.set_level("INFO") +logger.set_level("DEBUG") # Do not change these lines - they are parsed by run-tests.sh # TESTSUITE_COMMS: mpi local -# TESTSUITE_NPROCS: 3 4 +# TESTSUITE_NPROCS: 3 5 # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": @@ -36,7 +34,7 @@ sim_app = "/path/to/fakeapp.x" comms = libE_specs["comms"] - libE_specs["zero_resource_workers"] = [1] + libE_specs["zero_resource_workers"] = [0] libE_specs["dedicated_mode"] = True libE_specs["enforce_worker_core_bounds"] = True @@ -47,9 +45,10 @@ nodes_per_worker = 2 # For varying size test - relate node count to nworkers - in_place = libE_specs["zero_resource_workers"] - n_gens = len(in_place) - nsim_workers = nworkers - n_gens + # With gen-on-manager (default), all user workers are sim workers. + # Worker 0 (gen) is hidden and in zero_resource_workers. + n_gens = 0 + nsim_workers = nworkers comms = libE_specs["comms"] nodes_per_worker = 2 @@ -79,8 +78,8 @@ exctr = MPIExecutor(custom_info=mpi_customizer) exctr.register_app(full_path=sim_app, calc_type="sim") - if nworkers < 2: - sys.exit("Cannot run with a persistent worker if only one worker -- aborting...") + # if nworkers < 2: + # sys.exit("Cannot run with a persistent worker if only one worker -- aborting...") n = 2 sim_specs = { diff --git a/libensemble/tests/functionality_tests/test_zero_resource_workers_subnode.py b/libensemble/tests/functionality_tests/test_zero_resource_workers_subnode.py index 69ea2b559..19986097c 100644 --- a/libensemble/tests/functionality_tests/test_zero_resource_workers_subnode.py +++ b/libensemble/tests/functionality_tests/test_zero_resource_workers_subnode.py @@ -2,13 +2,13 @@ Runs libEnsemble testing the zero_resource_workers argument with 2 workers per node. -This test must be run on an odd number of workers >= 3 (e.g. even number of +This test must be run on an even number of workers >= 2 (e.g. odd number of procs when using mpi4py). Execute via one of the following commands (e.g. 3 workers): - mpiexec -np 4 python test_zero_resource_workers_subnode.py - python test_zero_resource_workers_subnode.py --nworkers 3 - python test_zero_resource_workers_subnode.py --nworkers 3 --comms tcp + mpiexec -np 5 python test_zero_resource_workers_subnode.py + python test_zero_resource_workers_subnode.py --nworkers 4 + python test_zero_resource_workers_subnode.py --nworkers 4 --comms tcp """ import sys @@ -29,7 +29,7 @@ # Do not change these lines - they are parsed by run-tests.sh # TESTSUITE_COMMS: mpi local -# TESTSUITE_NPROCS: 4 +# TESTSUITE_NPROCS: 5 # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": @@ -39,7 +39,7 @@ sim_app = "/path/to/fakeapp.x" comms = libE_specs["comms"] - libE_specs["zero_resource_workers"] = [1] + libE_specs["zero_resource_workers"] = [0] libE_specs["dedicated_mode"] = True libE_specs["enforce_worker_core_bounds"] = True @@ -50,9 +50,10 @@ nodes_per_worker = 0.5 # For varying size test - relate node count to nworkers - in_place = libE_specs["zero_resource_workers"] - n_gens = len(in_place) - nsim_workers = nworkers - n_gens + # With gen-on-manager (default), all user workers are sim workers. + # Worker 0 (gen) is hidden and in zero_resource_workers. + n_gens = 0 + nsim_workers = nworkers if not (nsim_workers * nodes_per_worker).is_integer(): sys.exit(f"Sim workers ({nsim_workers}) must divide evenly into nodes") diff --git a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py index a85771d7d..5a36e2e1e 100644 --- a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py +++ b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py @@ -64,7 +64,7 @@ def six_hump_camel_func(x): n = 2 workflow.alloc_specs = AllocSpecs(alloc_f=alloc_f) - workflow.libE_specs.gen_on_manager = True + workflow.libE_specs.gen_on_worker = False vocs = VOCS( variables={"core": [-3, 3], "edge": [-2, 2], "core_on_cube": [-3, 3], "edge_on_cube": [-2, 2]}, diff --git a/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py b/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py index 481db8419..60e43fa57 100644 --- a/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py +++ b/libensemble/tests/regression_tests/test_evaluate_mixed_sample.py @@ -44,7 +44,6 @@ H0["sim_ended"][:500] = True sampling = Ensemble(parse_args=True) - sampling.libE_specs.gen_on_manager = True sampling.H0 = H0 sampling.sim_specs = SimSpecs(sim_f=sim_f, inputs=["x"], out=[("f", float)]) sampling.alloc_specs = AllocSpecs(alloc_f=alloc_f) diff --git a/libensemble/tests/regression_tests/test_optimas_ax_mf.py b/libensemble/tests/regression_tests/test_optimas_ax_mf.py index 758aa1fc2..d99bc518a 100644 --- a/libensemble/tests/regression_tests/test_optimas_ax_mf.py +++ b/libensemble/tests/regression_tests/test_optimas_ax_mf.py @@ -42,7 +42,7 @@ def eval_func_mf(input_params): n = 2 batch_size = 2 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) vocs = VOCS( variables={"x0": [-50.0, 5.0], "x1": [-5.0, 15.0], "res": [1.0, 8.0]}, diff --git a/libensemble/tests/regression_tests/test_optimas_ax_multitask.py b/libensemble/tests/regression_tests/test_optimas_ax_multitask.py index 43c6c444e..65c81a373 100644 --- a/libensemble/tests/regression_tests/test_optimas_ax_multitask.py +++ b/libensemble/tests/regression_tests/test_optimas_ax_multitask.py @@ -57,7 +57,7 @@ def eval_func_multitask(input_params): n = 2 batch_size = 2 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) vocs = VOCS( variables={ diff --git a/libensemble/tests/regression_tests/test_optimas_ax_sf.py b/libensemble/tests/regression_tests/test_optimas_ax_sf.py index e4ee9e8a7..b04753490 100644 --- a/libensemble/tests/regression_tests/test_optimas_ax_sf.py +++ b/libensemble/tests/regression_tests/test_optimas_ax_sf.py @@ -41,7 +41,7 @@ def eval_func_sf(input_params): n = 2 batch_size = 2 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) vocs = VOCS( variables={ diff --git a/libensemble/tests/regression_tests/test_optimas_grid_sample.py b/libensemble/tests/regression_tests/test_optimas_grid_sample.py index 57c6c8fed..555ef071f 100644 --- a/libensemble/tests/regression_tests/test_optimas_grid_sample.py +++ b/libensemble/tests/regression_tests/test_optimas_grid_sample.py @@ -42,7 +42,7 @@ def eval_func(input_params: dict): n = 2 batch_size = 4 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) # Create varying parameters. lower_bounds = [-3.0, 2.0] diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_dfols.py b/libensemble/tests/regression_tests/test_persistent_aposmm_dfols.py index 6e1993069..ed6f9be50 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_dfols.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_dfols.py @@ -69,6 +69,7 @@ def combine_component(x): "gen_f": gen_f, "persis_in": ["f", "fvec"] + [n[0] for n in gen_out], "out": gen_out, + "initial_batch_size": 100, "user": { "initial_sample_size": 100, "localopt_method": "dfols", @@ -102,7 +103,7 @@ def combine_component(x): H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs) if is_manager: - assert persis_info[1].get("run_order"), "Run_order should have been given back" + assert persis_info[0].get("run_order"), "Run_order should have been given back" assert flag == 0 assert np.min(H["f"][H["sim_ended"]]) <= 3000, "Didn't find a value below 3000" diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_exception.py b/libensemble/tests/regression_tests/test_persistent_aposmm_exception.py index b197dc3f0..09544e51d 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_exception.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_exception.py @@ -69,6 +69,7 @@ def assertion(passed): "gen_f": gen_f, "persis_in": ["f"] + [n[0] for n in gen_out], "out": gen_out, + "initial_batch_size": 100, "user": { "initial_sample_size": 100, "localopt_method": "LN_BOBYQA", diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_external_localopt.py b/libensemble/tests/regression_tests/test_persistent_aposmm_external_localopt.py index dd01d1069..22cf47325 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_external_localopt.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_external_localopt.py @@ -75,6 +75,7 @@ "gen_f": gen_f, "persis_in": ["f"] + [n[0] for n in gen_out], "out": gen_out, + "initial_batch_size": 100, "user": { "initial_sample_size": 100, "sample_points": np.round(minima, 1), diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_ibcdfo_pounders.py b/libensemble/tests/regression_tests/test_persistent_aposmm_ibcdfo_pounders.py index ed9dd71a3..ff3e2404e 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_ibcdfo_pounders.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_ibcdfo_pounders.py @@ -130,7 +130,7 @@ def synthetic_beamline_mapping(H, _, sim_specs): H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs) if is_manager: - assert persis_info[1].get("run_order"), "Run_order should have been given back" + assert persis_info[0].get("run_order"), "Run_order should have been given back" assert flag == 0 save_libE_output(H, persis_info, __file__, nworkers) diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py index 9d4278443..bf5914573 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py @@ -64,6 +64,7 @@ "gen_f": gen_f, "persis_in": ["f"] + [n[0] for n in gen_out], "out": gen_out, + "initial_batch_size": 100, "user": { "initial_sample_size": 100, "sample_points": np.round(minima, 1), diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_periodic.py b/libensemble/tests/regression_tests/test_persistent_aposmm_periodic.py index d99e8802a..8cc215800 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_periodic.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_periodic.py @@ -89,7 +89,7 @@ H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs) if is_manager: - assert persis_info[1].get("run_order"), "Run_order should have been given back" + assert persis_info[0].get("run_order"), "Run_order should have been given back" min_ids = np.where(H["local_min"]) # The minima are known on this test problem. If the above [lb, ub] domain is diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_tao_blmvm.py b/libensemble/tests/regression_tests/test_persistent_aposmm_tao_blmvm.py index 39ff3b79f..8a3c76287 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_tao_blmvm.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_tao_blmvm.py @@ -67,6 +67,7 @@ "gen_f": gen_f, "persis_in": ["f", "grad"] + [n[0] for n in gen_out], "out": gen_out, + "initial_batch_size": 100, "user": { "initial_sample_size": 100, "sample_points": np.round(minima, 1), diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_timeout.py b/libensemble/tests/regression_tests/test_persistent_aposmm_timeout.py index e61843fd7..e6014cbee 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_timeout.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_timeout.py @@ -87,6 +87,6 @@ if is_manager: assert flag == 2, "Test should have timed out" - assert persis_info[1].get("run_order"), "Run_order should have been given back" + assert persis_info[0].get("run_order"), "Run_order should have been given back" min_ids = np.where(H["local_min"]) save_libE_output(H, persis_info, __file__, nworkers) diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_with_grad.py b/libensemble/tests/regression_tests/test_persistent_aposmm_with_grad.py index f2d2f09cc..48b1ae2ff 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_with_grad.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_with_grad.py @@ -121,9 +121,9 @@ H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0=H0) if is_manager: - assert persis_info[1].get("run_order"), "Run_order should have been given back" + assert persis_info[0].get("run_order"), "Run_order should have been given back" assert ( - len(persis_info[1]["run_order"]) >= gen_specs["user"]["stop_after_k_minima"] + len(persis_info[0]["run_order"]) >= gen_specs["user"]["stop_after_k_minima"] ), "This test should have many runs started." assert len(H) < exit_criteria["sim_max"], "Test should have stopped early due to 'stop_after_k_minima'" diff --git a/libensemble/tests/regression_tests/test_persistent_fd_param_finder.py b/libensemble/tests/regression_tests/test_persistent_fd_param_finder.py index ac01d5683..de97470dc 100644 --- a/libensemble/tests/regression_tests/test_persistent_fd_param_finder.py +++ b/libensemble/tests/regression_tests/test_persistent_fd_param_finder.py @@ -70,6 +70,6 @@ if fd_test.is_manager: assert len(H) < fd_test.exit_criteria.gen_max, "Problem didn't stop early, which should have been the case." - assert np.all(persis_info[1]["Fnoise"] > 0), "gen_f didn't find noise for all F_i components." + assert np.all(persis_info[0]["Fnoise"] > 0), "gen_f didn't find noise for all F_i components." fd_test.save_output(__file__) diff --git a/libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py b/libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py index 990493a17..f88db4fe0 100644 --- a/libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py +++ b/libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py @@ -2,8 +2,6 @@ Example of multi-fidelity optimization using a persistent GP gen_func (calling Ax). -This test uses the gen_on_manager option (persistent generator runs on -a thread). Therefore nworkers is the number of simulation workers. Execute via one of the following commands: mpiexec -np 4 python test_persistent_gp_multitask_ax.py @@ -64,7 +62,6 @@ def run_simulation(H, persis_info, sim_specs, libE_info): # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": nworkers, is_manager, libE_specs, _ = parse_args() - libE_specs["gen_on_manager"] = True mt_params = { "name_hifi": "expensive_model", diff --git a/libensemble/tests/regression_tests/test_xopt_EI.py b/libensemble/tests/regression_tests/test_xopt_EI.py index a78aee60a..3c902ee9d 100644 --- a/libensemble/tests/regression_tests/test_xopt_EI.py +++ b/libensemble/tests/regression_tests/test_xopt_EI.py @@ -53,7 +53,7 @@ def xtest_sim(H, persis_info, sim_specs, _): n = 2 batch_size = 4 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) libE_specs.reuse_output_dir = True vocs = VOCS( diff --git a/libensemble/tests/regression_tests/test_xopt_EI_xopt_sim.py b/libensemble/tests/regression_tests/test_xopt_EI_xopt_sim.py index 07ace47fa..8082a67fa 100644 --- a/libensemble/tests/regression_tests/test_xopt_EI_xopt_sim.py +++ b/libensemble/tests/regression_tests/test_xopt_EI_xopt_sim.py @@ -47,7 +47,7 @@ def xtest_callable(input_dict: dict, a=0) -> dict: n = 2 batch_size = 4 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) libE_specs.reuse_output_dir = True vocs = VOCS( diff --git a/libensemble/tests/regression_tests/test_xopt_nelder_mead.py b/libensemble/tests/regression_tests/test_xopt_nelder_mead.py index fe8039b95..d8bfdb459 100644 --- a/libensemble/tests/regression_tests/test_xopt_nelder_mead.py +++ b/libensemble/tests/regression_tests/test_xopt_nelder_mead.py @@ -38,7 +38,7 @@ def rosenbrock_callable(input_dict: dict) -> dict: batch_size = 1 - libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + libE_specs = LibeSpecs(nworkers=batch_size) libE_specs.reuse_output_dir = True vocs = VOCS( diff --git a/libensemble/tests/scaling_tests/forces/forces_simple_xopt/run_libe_forces.py b/libensemble/tests/scaling_tests/forces/forces_simple_xopt/run_libe_forces.py index 2a7f8f701..2d23c83a8 100644 --- a/libensemble/tests/scaling_tests/forces/forces_simple_xopt/run_libe_forces.py +++ b/libensemble/tests/scaling_tests/forces/forces_simple_xopt/run_libe_forces.py @@ -32,7 +32,6 @@ # Persistent gen does not need resources ensemble.libE_specs = LibeSpecs( - gen_on_manager=True, sim_dirs_make=True, ) diff --git a/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py b/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py index 6d056b1e0..9d1d45fec 100644 --- a/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py +++ b/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py @@ -34,6 +34,25 @@ ], ) +W_gen_mgr = np.array( + [ + (0, True, 0, 0, False, False), + (1, False, 0, 0, False, False), + (2, False, 0, 0, False, False), + (3, False, 0, 0, False, False), + (4, False, 0, 0, False, False), + ], + dtype=[ + ("worker_id", " None: self.msg = msg self.exc = exc @@ -180,7 +181,7 @@ def __init__( self.runners = {EVAL_SIM_TAG: self.sim_runner.run, EVAL_GEN_TAG: self.gen_runner.run} self.calc_iter = {EVAL_SIM_TAG: 0, EVAL_GEN_TAG: 0} Worker._set_executor(self.workerID, self.comm) - Worker._set_resources(self.workerID, self.comm) + Worker._set_resources(self.workerID, self.comm, self.libE_specs) self.EnsembleDirectory = EnsembleDirectory(libE_specs=libE_specs) @staticmethod @@ -218,11 +219,11 @@ def _set_executor(workerID: int, comm: Comm) -> bool: return False @staticmethod - def _set_resources(workerID, comm: Comm) -> bool: + def _set_resources(workerID, comm: Comm, libE_specs) -> bool: """Sets worker ID in the resources, return True if set""" resources = Resources.resources if isinstance(resources, Resources): - resources.set_worker_resources(comm.get_num_workers(), workerID) + resources.set_worker_resources(comm.get_num_workers() + 1, workerID) return True else: logger.debug(f"No resources set on worker {workerID}") @@ -240,7 +241,7 @@ def _extract_debug_data(self, calc_type, Work): calc_id = calc_id.rjust(5, " ") return enum_desc, calc_id - def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, int): + def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> tuple[npt.NDArray | None, dict, int | str]: """Runs a calculation on this worker object. This routine calls the user calculations. Exceptions are caught, @@ -282,7 +283,7 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, logger.debug(f"Returned from user function for {enum_desc} {calc_id}") - calc_status = UNSET_TAG + calc_status: int | str = UNSET_TAG # Check for buffered receive if self.comm.recv_buffer: tag, message = self.comm.recv() @@ -316,7 +317,7 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, logging.getLogger(LogConfig.config.stats_name).info(calc_msg) - def _get_calc_msg(self, enum_desc: str, calc_id: int, calc_type: int, timer: Timer, status: str) -> str: + def _get_calc_msg(self, enum_desc: str, calc_id: str, calc_type: str, timer: Timer, status: int | str) -> str: """Construct line for libE_stats.txt file""" calc_msg = f"{enum_desc} {calc_id}: {calc_type} {timer}" @@ -332,7 +333,7 @@ def _get_calc_msg(self, enum_desc: str, calc_id: int, calc_type: int, timer: Tim return calc_msg - def _recv_H_rows(self, Work: dict) -> (dict, int, npt.NDArray): + def _recv_H_rows(self, Work: dict) -> tuple[dict, int, npt.NDArray]: """Unpacks Work request and receives any history rows""" libE_info = Work["libE_info"] calc_type = Work["tag"] @@ -346,7 +347,7 @@ def _recv_H_rows(self, Work: dict) -> (dict, int, npt.NDArray): return libE_info, calc_type, calc_in - def _handle(self, Work: dict) -> dict: + def _handle(self, Work: dict) -> dict | None: """Handles a work request from the manager""" # Check work request and receive second message (if needed) libE_info, calc_type, calc_in = self._recv_H_rows(Work) diff --git a/pixi.lock b/pixi.lock index 185e31d3a..43cdd5d0c 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:817f427252b67646c9eea0f0a4c5a1ce662c2859400dc5fd563f314e92914891 +oid sha256:f2dd47db40b2492eaf439dbf2bf60c96a552c7b9a1d5b8d66f286bc402d5ee69 size 1021669 diff --git a/pyproject.toml b/pyproject.toml index 5538457e5..225232542 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,12 +8,7 @@ authors = [ { name = "John-Luke Navarro" }, ] -dependencies = [ - "numpy", - "psutil", - "pydantic", - "gest-api>=0.1,<0.2", -] +dependencies = ["numpy", "psutil", "pydantic", "gest-api>=0.1,<0.2"] description = "A Python toolkit for coordinating asynchronous and dynamic ensembles of calculations." name = "libensemble" @@ -212,7 +207,7 @@ docs = ["pyenchant", "enchant>=0.0.1,<0.0.2", "sphinx-lfs-content>=1.1.10,<2"] # Various config from here onward [tool.black] line-length = 120 -target-version = ['py310', 'py311', 'py312', 'py313'] +target-version = ["py311", "py312", "py313", "py314"] force-exclude = ''' ( /(