From 701b5a2e3cf2d06c6a1576958a422753f621017b Mon Sep 17 00:00:00 2001 From: MilesCranmer Date: Tue, 13 Aug 2024 09:38:40 +0100 Subject: [PATCH 01/10] refactor: extensions to install at one time --- pysr/julia_extensions.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/pysr/julia_extensions.py b/pysr/julia_extensions.py index ac4714d48..7248beb78 100644 --- a/pysr/julia_extensions.py +++ b/pysr/julia_extensions.py @@ -4,6 +4,14 @@ from .julia_import import Pkg, jl +UUIDs = { + "LoopVectorization": "bdcacae8-1622-11e9-2a5c-532679323890", + "Bumper": "8ce10254-0962-460f-a3d8-1f77fea1446e", + "Zygote": "e88e6eb3-aa80-5325-afca-941959d7151f", + "MPIClusterManagers": "e7922434-ae4b-11e9-05c5-9780451d2c66", + "ClusterManagers": "34f1f09b-3a8b-5176-ab39-66d58a4d544e", +} + def load_required_packages( *, @@ -13,33 +21,36 @@ def load_required_packages( cluster_manager: Optional[str] = None, ): if turbo: - load_package("LoopVectorization", "bdcacae8-1622-11e9-2a5c-532679323890") + load_package("LoopVectorization") if bumper: - load_package("Bumper", "8ce10254-0962-460f-a3d8-1f77fea1446e") + load_package("Bumper") if enable_autodiff: - load_package("Zygote", "e88e6eb3-aa80-5325-afca-941959d7151f") + load_package("Zygote") if cluster_manager is not None: - load_package("ClusterManagers", "34f1f09b-3a8b-5176-ab39-66d58a4d544e") + if cluster_manager == "mpi": + load_package("MPIClusterManagers") + else: + load_package("ClusterManagers") def load_all_packages(): """Install and load all Julia extensions available to PySR.""" - load_required_packages( - turbo=True, bumper=True, enable_autodiff=True, cluster_manager="slurm" - ) + specs = [Pkg.PackageSpec(name=key, uuid=value) for key, value in UUIDs.items()] + Pkg.add(specs) + Pkg.resolve() # TODO: Refactor this file so we can install all packages at once using `juliapkg`, # ideally parameterizable via the regular Python extras API -def isinstalled(uuid_s: str): - return jl.haskey(Pkg.dependencies(), jl.Base.UUID(uuid_s)) +def isinstalled(package_name: str): + return jl.haskey(Pkg.dependencies(), jl.Base.UUID(UUIDs[package_name])) -def load_package(package_name: str, uuid_s: str) -> None: - if not isinstalled(uuid_s): - Pkg.add(name=package_name, uuid=uuid_s) +def load_package(package_name: str) -> None: + if not isinstalled(package_name): + Pkg.add(name=package_name, uuid=UUIDs[package_name]) Pkg.resolve() # TODO: Protect against loading the same symbol from two packages, From c00e26e1ae4dc5c41b58e40ad3b4b683d3f4aa70 Mon Sep 17 00:00:00 2001 From: MilesCranmer Date: Tue, 13 Aug 2024 09:39:04 +0100 Subject: [PATCH 02/10] feat: add mpi cluster manager backend --- pysr/julia_helpers.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pysr/julia_helpers.py b/pysr/julia_helpers.py index 18d4a6cf3..6f3e25234 100644 --- a/pysr/julia_helpers.py +++ b/pysr/julia_helpers.py @@ -28,8 +28,15 @@ def _escape_filename(filename): def _load_cluster_manager(cluster_manager: str): - jl.seval(f"using ClusterManagers: addprocs_{cluster_manager}") - return jl.seval(f"addprocs_{cluster_manager}") + if cluster_manager == "mpi": + jl.seval("using MPIClusterManagers: MPIWorkerManager") + return jl.seval( + "__pysr_mpi_addprocs(np; exeflags=``, kws...) = " + + "addprocs(MPIWorkerManager(np); exeflags=`$exeflags --project=$(Base.active_project())`, kws...)" + ) + else: + jl.seval(f"using ClusterManagers: addprocs_{cluster_manager}") + return jl.seval(f"addprocs_{cluster_manager}") def jl_array(x, dtype=None): From 5ee2d0ade182e13622481c4463fbd4d24d1e06e0 Mon Sep 17 00:00:00 2001 From: MilesCranmer Date: Tue, 13 Aug 2024 09:39:28 +0100 Subject: [PATCH 03/10] docs: describe MPI backend in docstring --- pysr/julia_helpers.py | 4 +++- pysr/sr.py | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pysr/julia_helpers.py b/pysr/julia_helpers.py index 6f3e25234..31813b1bd 100644 --- a/pysr/julia_helpers.py +++ b/pysr/julia_helpers.py @@ -49,7 +49,9 @@ def jl_array(x, dtype=None): def jl_is_function(f) -> bool: - return cast(bool, jl.seval("op -> op isa Function")(f)) + # We name it so we only compile it once + is_function = jl.seval("__pysr_jl_is_function(op) = op isa Function") + return cast(bool, is_function(f)) def jl_serialize(obj: Any) -> NDArray[np.uint8]: diff --git a/pysr/sr.py b/pysr/sr.py index 0054ce502..63c74d82c 100644 --- a/pysr/sr.py +++ b/pysr/sr.py @@ -495,8 +495,8 @@ class PySRRegressor(MultiOutputMixin, RegressorMixin, BaseEstimator): Using procs=0 will turn off both. Default is `True`. cluster_manager : str For distributed computing, this sets the job queue system. Set - to one of "slurm", "pbs", "lsf", "sge", "qrsh", "scyld", or - "htc". If set to one of these, PySR will run in distributed + to one of "slurm", "pbs", "lsf", "sge", "qrsh", "scyld", + "htc", or "mpi". If set to one of these, PySR will run in distributed mode, and use `procs` to figure out how many processes to launch. Default is `None`. heap_size_hint_in_bytes : int @@ -773,7 +773,7 @@ def __init__( procs: int = cpu_count(), multithreading: Optional[bool] = None, cluster_manager: Optional[ - Literal["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc"] + Literal["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc", "mpi"] ] = None, heap_size_hint_in_bytes: Optional[int] = None, batching: bool = False, From 8e92bf12755a77da2ff76254982737ca15c63af1 Mon Sep 17 00:00:00 2001 From: MilesCranmer Date: Tue, 13 Aug 2024 09:42:43 +0100 Subject: [PATCH 04/10] docs: mention `cluster_manager="mpi"` --- docs/tuning.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/tuning.md b/docs/tuning.md index 455c21da4..6f73d3a16 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -6,7 +6,13 @@ First, my general tips would be to avoid using redundant operators, like how `po When running PySR, I usually do the following: -I run from IPython (Jupyter Notebooks don't work as well[^1]) on the head node of a slurm cluster. Passing `cluster_manager="slurm"` will make PySR set up a run over the entire allocation. I set `procs` equal to the total number of cores over my entire allocation. +I run from IPython (Jupyter Notebooks don't work as well[^1]) on the head node of a slurm cluster. +Passing `cluster_manager="slurm"` will make PySR set up a run over the entire allocation (another common option is `cluster_manager="mpi"` which will use MPI). +I set `procs` equal to the total number of cores over my entire allocation. + +> [!NOTE] +> When running on a cluster, you should only launch the search from a single task on the head node, rather than starting PySR on every node simultaneously. +> The way that ClusterManagers.jl works will automatically call the correct command to spread out the processing over the topology of nodes, such as `srun` for slurm. [^1]: Jupyter Notebooks are supported by PySR, but miss out on some useful features available in IPython and Python: the progress bar, and early stopping with "q". In Jupyter you cannot interrupt a search once it has started; you have to restart the kernel. See [this issue](https://github.com/MilesCranmer/PySR/issues/260) for updates. From bfbd7751716754d32f3d9b4f1471e076aebfc131 Mon Sep 17 00:00:00 2001 From: MilesCranmer Date: Tue, 13 Aug 2024 09:47:36 +0100 Subject: [PATCH 05/10] fix: loading of all packages --- pysr/julia_extensions.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pysr/julia_extensions.py b/pysr/julia_extensions.py index 7248beb78..0fac35436 100644 --- a/pysr/julia_extensions.py +++ b/pysr/julia_extensions.py @@ -2,6 +2,7 @@ from typing import Optional +from .julia_helpers import jl_array from .julia_import import Pkg, jl UUIDs = { @@ -36,8 +37,9 @@ def load_required_packages( def load_all_packages(): """Install and load all Julia extensions available to PySR.""" specs = [Pkg.PackageSpec(name=key, uuid=value) for key, value in UUIDs.items()] - Pkg.add(specs) + Pkg.add(jl_array(specs)) Pkg.resolve() + jl.seval("import " + ", ".join(UUIDs.keys())) # TODO: Refactor this file so we can install all packages at once using `juliapkg`, @@ -55,5 +57,5 @@ def load_package(package_name: str) -> None: # TODO: Protect against loading the same symbol from two packages, # maybe with a @gensym here. - jl.seval(f"using {package_name}: {package_name}") + jl.seval(f"import {package_name}") return None From 5365c6b0a3cafb6dd2373a5408f5614a80ffb757 Mon Sep 17 00:00:00 2001 From: MilesCranmer Date: Tue, 13 Aug 2024 09:49:18 +0100 Subject: [PATCH 06/10] test: include test of MPIWorkerManager --- pysr/test/test.py | 61 ++++++++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/pysr/test/test.py b/pysr/test/test.py index c641e9f66..17fbadfb8 100644 --- a/pysr/test/test.py +++ b/pysr/test/test.py @@ -86,36 +86,37 @@ def test_linear_relation_weighted_bumper(self): ) def test_multiprocessing_turbo_custom_objective(self): - rstate = np.random.RandomState(0) - y = self.X[:, 0] - y += rstate.randn(*y.shape) * 1e-4 - model = PySRRegressor( - **self.default_test_kwargs, - # Turbo needs to work with unsafe operators: - unary_operators=["sqrt"], - procs=2, - multithreading=False, - turbo=True, - early_stop_condition="stop_if(loss, complexity) = loss < 1e-10 && complexity == 1", - loss_function=""" - function my_objective(tree::Node{T}, dataset::Dataset{T}, options::Options) where T - prediction, flag = eval_tree_array(tree, dataset.X, options) - !flag && return T(Inf) - abs3(x) = abs(x) ^ 3 - return sum(abs3, prediction .- dataset.y) / length(prediction) - end - """, - ) - model.fit(self.X, y) - print(model.equations_) - best_loss = model.equations_.iloc[-1]["loss"] - self.assertLessEqual(best_loss, 1e-10) - self.assertGreaterEqual(best_loss, 0.0) - - # Test options stored: - self.assertEqual( - jl.seval("((::Val{x}) where x) -> x")(model.julia_options_.turbo), True - ) + for cluster_manager in (None, "mpi"): + rstate = np.random.RandomState(0) + y = self.X[:, 0] + y += rstate.randn(*y.shape) * 1e-4 + model = PySRRegressor( + **self.default_test_kwargs, + # Turbo needs to work with unsafe operators: + unary_operators=["sqrt"], + procs=2, + multithreading=False, + turbo=True, + early_stop_condition="stop_if(loss, complexity) = loss < 1e-10 && complexity == 1", + loss_function=""" + function my_objective(tree::Node{T}, dataset::Dataset{T}, options::Options) where T + prediction, flag = eval_tree_array(tree, dataset.X, options) + !flag && return T(Inf) + abs3(x) = abs(x) ^ 3 + return sum(abs3, prediction .- dataset.y) / length(prediction) + end + """, + ) + model.fit(self.X, y) + print(model.equations_) + best_loss = model.equations_.iloc[-1]["loss"] + self.assertLessEqual(best_loss, 1e-10) + self.assertGreaterEqual(best_loss, 0.0) + + # Test options stored: + self.assertEqual( + jl.seval("((::Val{x}) where x) -> x")(model.julia_options_.turbo), True + ) def test_multiline_seval(self): # The user should be able to run multiple things in a single seval call: From 5e9b6790afe92ae1fbbec6b5b61bda1dd4cb2de3 Mon Sep 17 00:00:00 2001 From: MilesCranmer Date: Tue, 13 Aug 2024 09:51:19 +0100 Subject: [PATCH 07/10] refactor: clean up tests of mpi --- pysr/test/test.py | 67 +++++++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/pysr/test/test.py b/pysr/test/test.py index 17fbadfb8..76fa3ca65 100644 --- a/pysr/test/test.py +++ b/pysr/test/test.py @@ -85,38 +85,43 @@ def test_linear_relation_weighted_bumper(self): jl.seval("((::Val{x}) where x) -> x")(model.julia_options_.bumper), True ) + def _multiprocessing_turbo_custom_objective(self, cluster_manager): + rstate = np.random.RandomState(0) + y = self.X[:, 0] + y += rstate.randn(*y.shape) * 1e-4 + model = PySRRegressor( + **self.default_test_kwargs, + # Turbo needs to work with unsafe operators: + unary_operators=["sqrt"], + procs=2, + multithreading=False, + turbo=True, + early_stop_condition="stop_if(loss, complexity) = loss < 1e-10 && complexity == 1", + loss_function=""" + function my_objective(tree::Node{T}, dataset::Dataset{T}, options::Options) where T + prediction, flag = eval_tree_array(tree, dataset.X, options) + !flag && return T(Inf) + abs3(x) = abs(x) ^ 3 + return sum(abs3, prediction .- dataset.y) / length(prediction) + end + """, + ) + model.fit(self.X, y) + print(model.equations_) + best_loss = model.equations_.iloc[-1]["loss"] + self.assertLessEqual(best_loss, 1e-10) + self.assertGreaterEqual(best_loss, 0.0) + + # Test options stored: + self.assertEqual( + jl.seval("((::Val{x}) where x) -> x")(model.julia_options_.turbo), True + ) + def test_multiprocessing_turbo_custom_objective(self): - for cluster_manager in (None, "mpi"): - rstate = np.random.RandomState(0) - y = self.X[:, 0] - y += rstate.randn(*y.shape) * 1e-4 - model = PySRRegressor( - **self.default_test_kwargs, - # Turbo needs to work with unsafe operators: - unary_operators=["sqrt"], - procs=2, - multithreading=False, - turbo=True, - early_stop_condition="stop_if(loss, complexity) = loss < 1e-10 && complexity == 1", - loss_function=""" - function my_objective(tree::Node{T}, dataset::Dataset{T}, options::Options) where T - prediction, flag = eval_tree_array(tree, dataset.X, options) - !flag && return T(Inf) - abs3(x) = abs(x) ^ 3 - return sum(abs3, prediction .- dataset.y) / length(prediction) - end - """, - ) - model.fit(self.X, y) - print(model.equations_) - best_loss = model.equations_.iloc[-1]["loss"] - self.assertLessEqual(best_loss, 1e-10) - self.assertGreaterEqual(best_loss, 0.0) - - # Test options stored: - self.assertEqual( - jl.seval("((::Val{x}) where x) -> x")(model.julia_options_.turbo), True - ) + self._multiprocessing_turbo_custom_objective(None) + + def test_multiprocessing_turbo_custom_objective_mpi(self): + self._multiprocessing_turbo_custom_objective("mpi") def test_multiline_seval(self): # The user should be able to run multiple things in a single seval call: From 09c2db502a238042734ece9ab04dcf31f6df25ff Mon Sep 17 00:00:00 2001 From: MilesCranmer Date: Tue, 13 Aug 2024 09:55:27 +0100 Subject: [PATCH 08/10] fix: issue with addprocs function undefined --- pysr/julia_helpers.py | 4 +++- pysr/test/test.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pysr/julia_helpers.py b/pysr/julia_helpers.py index 31813b1bd..2591ec330 100644 --- a/pysr/julia_helpers.py +++ b/pysr/julia_helpers.py @@ -29,9 +29,11 @@ def _escape_filename(filename): def _load_cluster_manager(cluster_manager: str): if cluster_manager == "mpi": + jl.seval("using Distributed: addprocs") jl.seval("using MPIClusterManagers: MPIWorkerManager") + return jl.seval( - "__pysr_mpi_addprocs(np; exeflags=``, kws...) = " + "(np; exeflags=``, kws...) -> " + "addprocs(MPIWorkerManager(np); exeflags=`$exeflags --project=$(Base.active_project())`, kws...)" ) else: diff --git a/pysr/test/test.py b/pysr/test/test.py index 76fa3ca65..51b962d5a 100644 --- a/pysr/test/test.py +++ b/pysr/test/test.py @@ -95,6 +95,7 @@ def _multiprocessing_turbo_custom_objective(self, cluster_manager): unary_operators=["sqrt"], procs=2, multithreading=False, + cluster_manager=cluster_manager, turbo=True, early_stop_condition="stop_if(loss, complexity) = loss < 1e-10 && complexity == 1", loss_function=""" From b3f28d59329285925163f2e1a87d2b5011c7b70e Mon Sep 17 00:00:00 2001 From: MilesCranmer Date: Tue, 13 Aug 2024 10:08:25 +0100 Subject: [PATCH 09/10] fix: install `cluster_manager` at correct step --- pysr/sr.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pysr/sr.py b/pysr/sr.py index 63c74d82c..adb4fa95a 100644 --- a/pysr/sr.py +++ b/pysr/sr.py @@ -1690,9 +1690,6 @@ def _run( if not ALREADY_RAN and update_verbosity != 0: print("Compiling Julia backend...") - if cluster_manager is not None: - cluster_manager = _load_cluster_manager(cluster_manager) - # TODO(mcranmer): These functions should be part of this class. binary_operators, unary_operators = _maybe_create_inline_operators( binary_operators=binary_operators, @@ -1753,6 +1750,9 @@ def _run( cluster_manager=cluster_manager, ) + if cluster_manager is not None: + cluster_manager = _load_cluster_manager(cluster_manager) + mutation_weights = SymbolicRegression.MutationWeights( mutate_constant=self.weight_mutate_constant, mutate_operator=self.weight_mutate_operator, From dca76eeb4518a61401f80301ceee193a390d63f0 Mon Sep 17 00:00:00 2001 From: MilesCranmer Date: Wed, 14 Aug 2024 22:15:10 +0100 Subject: [PATCH 10/10] feat: create `mpiflags` option --- pysr/julia_helpers.py | 12 ++++++++++-- pysr/param_groupings.yml | 1 + pysr/sr.py | 8 +++++++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/pysr/julia_helpers.py b/pysr/julia_helpers.py index 2591ec330..b9bf6c5a5 100644 --- a/pysr/julia_helpers.py +++ b/pysr/julia_helpers.py @@ -27,14 +27,22 @@ def _escape_filename(filename): return str_repr -def _load_cluster_manager(cluster_manager: str): +def _load_cluster_manager(cluster_manager: str, mpi_flags: str): if cluster_manager == "mpi": jl.seval("using Distributed: addprocs") jl.seval("using MPIClusterManagers: MPIWorkerManager") return jl.seval( "(np; exeflags=``, kws...) -> " - + "addprocs(MPIWorkerManager(np); exeflags=`$exeflags --project=$(Base.active_project())`, kws...)" + + "addprocs(MPIWorkerManager(np);" + + ",".join( + [ + "exeflags=`$exeflags --project=$(Base.active_project())`", + f"mpiflags=`{mpi_flags}`", + "kws...", + ] + ) + + ")" ) else: jl.seval(f"using ClusterManagers: addprocs_{cluster_manager}") diff --git a/pysr/param_groupings.yml b/pysr/param_groupings.yml index 0ff9d63da..fcec5a6fc 100644 --- a/pysr/param_groupings.yml +++ b/pysr/param_groupings.yml @@ -70,6 +70,7 @@ - multithreading - cluster_manager - heap_size_hint_in_bytes + - mpi_flags - batching - batch_size - precision diff --git a/pysr/sr.py b/pysr/sr.py index adb4fa95a..63d4c9553 100644 --- a/pysr/sr.py +++ b/pysr/sr.py @@ -499,6 +499,10 @@ class PySRRegressor(MultiOutputMixin, RegressorMixin, BaseEstimator): "htc", or "mpi". If set to one of these, PySR will run in distributed mode, and use `procs` to figure out how many processes to launch. Default is `None`. + mpi_flags : str + (Experimental API) String of options to pass to `mpiexec`. + For example, `"-host worker1,worker2"`. + Default is `None`. heap_size_hint_in_bytes : int For multiprocessing, this sets the `--heap-size-hint` parameter for new Julia processes. This can be configured when using @@ -775,6 +779,7 @@ def __init__( cluster_manager: Optional[ Literal["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc", "mpi"] ] = None, + mpi_flags: str = "", heap_size_hint_in_bytes: Optional[int] = None, batching: bool = False, batch_size: int = 50, @@ -872,6 +877,7 @@ def __init__( self.procs = procs self.multithreading = multithreading self.cluster_manager = cluster_manager + self.mpi_flags = mpi_flags self.heap_size_hint_in_bytes = heap_size_hint_in_bytes self.batching = batching self.batch_size = batch_size @@ -1751,7 +1757,7 @@ def _run( ) if cluster_manager is not None: - cluster_manager = _load_cluster_manager(cluster_manager) + cluster_manager = _load_cluster_manager(cluster_manager, self.mpi_flags) mutation_weights = SymbolicRegression.MutationWeights( mutate_constant=self.weight_mutate_constant,