Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPI support #698

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
8 changes: 7 additions & 1 deletion docs/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ First, my general tips would be to avoid using redundant operators, like how `po

When running PySR, I usually do the following:

I run from IPython (Jupyter Notebooks don't work as well[^1]) on the head node of a slurm cluster. Passing `cluster_manager="slurm"` will make PySR set up a run over the entire allocation. I set `procs` equal to the total number of cores over my entire allocation.
I run from IPython (Jupyter Notebooks don't work as well[^1]) on the head node of a slurm cluster.
Passing `cluster_manager="slurm"` will make PySR set up a run over the entire allocation (another common option is `cluster_manager="mpi"` which will use MPI).
I set `procs` equal to the total number of cores over my entire allocation.

> [!NOTE]
> When running on a cluster, you should only launch the search from a single task on the head node, rather than starting PySR on every node simultaneously.
> The way that ClusterManagers.jl works will automatically call the correct command to spread out the processing over the topology of nodes, such as `srun` for slurm.

[^1]: Jupyter Notebooks are supported by PySR, but miss out on some useful features available in IPython and Python: the progress bar, and early stopping with "q". In Jupyter you cannot interrupt a search once it has started; you have to restart the kernel. See [this issue](https://github.com/MilesCranmer/PySR/issues/260) for updates.

Expand Down
39 changes: 26 additions & 13 deletions pysr/julia_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@

from typing import Optional

from .julia_helpers import jl_array
from .julia_import import Pkg, jl

UUIDs = {
"LoopVectorization": "bdcacae8-1622-11e9-2a5c-532679323890",
"Bumper": "8ce10254-0962-460f-a3d8-1f77fea1446e",
"Zygote": "e88e6eb3-aa80-5325-afca-941959d7151f",
"MPIClusterManagers": "e7922434-ae4b-11e9-05c5-9780451d2c66",
"ClusterManagers": "34f1f09b-3a8b-5176-ab39-66d58a4d544e",
}


def load_required_packages(
*,
Expand All @@ -13,36 +22,40 @@ def load_required_packages(
cluster_manager: Optional[str] = None,
):
if turbo:
load_package("LoopVectorization", "bdcacae8-1622-11e9-2a5c-532679323890")
load_package("LoopVectorization")
if bumper:
load_package("Bumper", "8ce10254-0962-460f-a3d8-1f77fea1446e")
load_package("Bumper")
if enable_autodiff:
load_package("Zygote", "e88e6eb3-aa80-5325-afca-941959d7151f")
load_package("Zygote")
if cluster_manager is not None:
load_package("ClusterManagers", "34f1f09b-3a8b-5176-ab39-66d58a4d544e")
if cluster_manager == "mpi":
load_package("MPIClusterManagers")
else:
load_package("ClusterManagers")


def load_all_packages():
"""Install and load all Julia extensions available to PySR."""
load_required_packages(
turbo=True, bumper=True, enable_autodiff=True, cluster_manager="slurm"
)
specs = [Pkg.PackageSpec(name=key, uuid=value) for key, value in UUIDs.items()]
Pkg.add(jl_array(specs))
Pkg.resolve()
jl.seval("import " + ", ".join(UUIDs.keys()))


# TODO: Refactor this file so we can install all packages at once using `juliapkg`,
# ideally parameterizable via the regular Python extras API


def isinstalled(uuid_s: str):
return jl.haskey(Pkg.dependencies(), jl.Base.UUID(uuid_s))
def isinstalled(package_name: str):
return jl.haskey(Pkg.dependencies(), jl.Base.UUID(UUIDs[package_name]))


def load_package(package_name: str, uuid_s: str) -> None:
if not isinstalled(uuid_s):
Pkg.add(name=package_name, uuid=uuid_s)
def load_package(package_name: str) -> None:
if not isinstalled(package_name):
Pkg.add(name=package_name, uuid=UUIDs[package_name])
Pkg.resolve()

# TODO: Protect against loading the same symbol from two packages,
# maybe with a @gensym here.
jl.seval(f"using {package_name}: {package_name}")
jl.seval(f"import {package_name}")
return None
27 changes: 23 additions & 4 deletions pysr/julia_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,26 @@ def _escape_filename(filename):
return str_repr


def _load_cluster_manager(cluster_manager: str):
jl.seval(f"using ClusterManagers: addprocs_{cluster_manager}")
return jl.seval(f"addprocs_{cluster_manager}")
def _load_cluster_manager(cluster_manager: str, mpi_flags: str):
if cluster_manager == "mpi":
jl.seval("using Distributed: addprocs")
jl.seval("using MPIClusterManagers: MPIWorkerManager")

return jl.seval(
"(np; exeflags=``, kws...) -> "
+ "addprocs(MPIWorkerManager(np);"
+ ",".join(
[
"exeflags=`$exeflags --project=$(Base.active_project())`",
f"mpiflags=`{mpi_flags}`",
"kws...",
]
)
+ ")"
)
else:
jl.seval(f"using ClusterManagers: addprocs_{cluster_manager}")
return jl.seval(f"addprocs_{cluster_manager}")


def jl_array(x, dtype=None):
Expand All @@ -42,7 +59,9 @@ def jl_array(x, dtype=None):


def jl_is_function(f) -> bool:
return cast(bool, jl.seval("op -> op isa Function")(f))
# We name it so we only compile it once
is_function = jl.seval("__pysr_jl_is_function(op) = op isa Function")
return cast(bool, is_function(f))


def jl_serialize(obj: Any) -> NDArray[np.uint8]:
Expand Down
1 change: 1 addition & 0 deletions pysr/param_groupings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
- multithreading
- cluster_manager
- heap_size_hint_in_bytes
- mpi_flags
- batching
- batch_size
- precision
Expand Down
18 changes: 12 additions & 6 deletions pysr/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,14 @@ class PySRRegressor(MultiOutputMixin, RegressorMixin, BaseEstimator):
Using procs=0 will turn off both. Default is `True`.
cluster_manager : str
For distributed computing, this sets the job queue system. Set
to one of "slurm", "pbs", "lsf", "sge", "qrsh", "scyld", or
"htc". If set to one of these, PySR will run in distributed
to one of "slurm", "pbs", "lsf", "sge", "qrsh", "scyld",
"htc", or "mpi". If set to one of these, PySR will run in distributed
mode, and use `procs` to figure out how many processes to launch.
Default is `None`.
mpi_flags : str
(Experimental API) String of options to pass to `mpiexec`.
For example, `"-host worker1,worker2"`.
Default is `None`.
heap_size_hint_in_bytes : int
For multiprocessing, this sets the `--heap-size-hint` parameter
for new Julia processes. This can be configured when using
Expand Down Expand Up @@ -773,8 +777,9 @@ def __init__(
procs: int = cpu_count(),
multithreading: Optional[bool] = None,
cluster_manager: Optional[
Literal["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc"]
Literal["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc", "mpi"]
] = None,
mpi_flags: str = "",
heap_size_hint_in_bytes: Optional[int] = None,
batching: bool = False,
batch_size: int = 50,
Expand Down Expand Up @@ -872,6 +877,7 @@ def __init__(
self.procs = procs
self.multithreading = multithreading
self.cluster_manager = cluster_manager
self.mpi_flags = mpi_flags
self.heap_size_hint_in_bytes = heap_size_hint_in_bytes
self.batching = batching
self.batch_size = batch_size
Expand Down Expand Up @@ -1690,9 +1696,6 @@ def _run(
if not ALREADY_RAN and update_verbosity != 0:
print("Compiling Julia backend...")

if cluster_manager is not None:
cluster_manager = _load_cluster_manager(cluster_manager)

# TODO(mcranmer): These functions should be part of this class.
binary_operators, unary_operators = _maybe_create_inline_operators(
binary_operators=binary_operators,
Expand Down Expand Up @@ -1753,6 +1756,9 @@ def _run(
cluster_manager=cluster_manager,
)

if cluster_manager is not None:
cluster_manager = _load_cluster_manager(cluster_manager, self.mpi_flags)

mutation_weights = SymbolicRegression.MutationWeights(
mutate_constant=self.weight_mutate_constant,
mutate_operator=self.weight_mutate_operator,
Expand Down
9 changes: 8 additions & 1 deletion pysr/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_linear_relation_weighted_bumper(self):
jl.seval("((::Val{x}) where x) -> x")(model.julia_options_.bumper), True
)

def test_multiprocessing_turbo_custom_objective(self):
def _multiprocessing_turbo_custom_objective(self, cluster_manager):
rstate = np.random.RandomState(0)
y = self.X[:, 0]
y += rstate.randn(*y.shape) * 1e-4
Expand All @@ -95,6 +95,7 @@ def test_multiprocessing_turbo_custom_objective(self):
unary_operators=["sqrt"],
procs=2,
multithreading=False,
cluster_manager=cluster_manager,
turbo=True,
early_stop_condition="stop_if(loss, complexity) = loss < 1e-10 && complexity == 1",
loss_function="""
Expand All @@ -117,6 +118,12 @@ def test_multiprocessing_turbo_custom_objective(self):
jl.seval("((::Val{x}) where x) -> x")(model.julia_options_.turbo), True
)

def test_multiprocessing_turbo_custom_objective(self):
self._multiprocessing_turbo_custom_objective(None)

def test_multiprocessing_turbo_custom_objective_mpi(self):
self._multiprocessing_turbo_custom_objective("mpi")

def test_multiline_seval(self):
# The user should be able to run multiple things in a single seval call:
num = jl.seval(
Expand Down
Loading