Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Discussion about the use of multiprocessing #158

Closed
Psykopear opened this issue Oct 20, 2022 · 3 comments
Closed

[FEATURE] Discussion about the use of multiprocessing #158

Psykopear opened this issue Oct 20, 2022 · 3 comments
Assignees

Comments

@Psykopear
Copy link
Contributor

Is your feature request related to a problem? Please describe.

To spawn a cluster we use the spawn_cluster function, which is defined in python and does this:

def _gen_addresses(proc_count: int) -> Iterable[str]:
    return [f"localhost:{proc_id + 2101}" for proc_id in range(proc_count)]


def spawn_cluster(
    flow: Dataflow,
    *,
    epoch_config: Optional[EpochConfig] = None,
    recovery_config: Optional[RecoveryConfig] = None,
    proc_count: int = 1,
    worker_count_per_proc: int = 1,
    mp_ctx=get_context("spawn"),
) -> List[Tuple[int, Any]]:

    addresses = _gen_addresses(proc_count)
    with mp_ctx.Pool(processes=proc_count) as pool:
        futures = [
            pool.apply_async(
                cluster_main,
                (flow,),
                {
                    "epoch_config": epoch_config,
                    "recovery_config": recovery_config,
                    "addresses": addresses,
                    "proc_id": proc_id,
                    "worker_count_per_proc": worker_count_per_proc,
                },
            )
            for proc_id in range(proc_count)
        ]
        pool.close()

        for future in futures:
            # Will re-raise exceptions from subprocesses.
            future.get()

        pool.join()

It spawns proc_count processes using multiprocess.
This works fine, but it requires all the arguments of apply_sync to be pickable, which is limiting us in some cases (for example we can't use decorators that return generators), and it requires some boilerplate on the Rust side (see the "Egregious hack").

The point is that we don't use any of the multiprocessing utilities, except in a couple of tests, so if we can find a way to avoid using multiprocessing we could remove a lot of the boilerplate, remove a dependency, and avoid the pitfalls of requiring everything to be pickable.

Describe the solution you'd like

Since we don't need to share state between the processes (not in python at least), we could make spawn_cluster spawn subprocesses in the system, so that the python file is evaluated anew for each process.

Something like this:

def spawn_cluster(
    flow: Dataflow,
    *,
    epoch_config: Optional[EpochConfig] = None,
    recovery_config: Optional[RecoveryConfig] = None,
    proc_count: int = 1,
    worker_count_per_proc: int = 1,
) -> List[Tuple[int, Any]]:
    addresses = _gen_addresses(proc_count)

    proc_id = os.getenv("BYTEWAX_PROC_ID", None)
    if proc_id is not None:
        cluster_main(
            flow,
            addresses,
            int(proc_id),
            epoch_config=epoch_config,
            recovery_config=recovery_config,
            worker_count_per_proc=worker_count_per_proc,
        )
    else:
        processes = []
        for proc_id in range(proc_count):
            env = os.environ.copy()
            env["BYTEWAX_PROC_ID"] = f"{proc_id}"
            processes.append(subprocess.Popen(["python", *sys.argv], env=env))

        while True:
            if all([process.poll() is not None for process in processes]):
                break

Describe alternatives you've considered
I don't particularly like this solution, but it's one way to solve the problem.
I opened this issue to discuss possible alternatives.

Another thing we could do is make the spawn_cluster an "external" script, so not called from python directly.

We could remove the spawn_cluster function from the python api, and let users use just cluster_main.
cluster_main could read the other parameters (proc_id, worker_count_per_proc and addresses) from args, or another env var, and the spawn_process script (which could also be written in rust, or a shell script, or another python script, it doesn't really matter) would set the variables similarly to what I proposed above:

if __name__ == '__main__':
    cluster_main(flow, epoch_config=epoch_config, recovery_config=recovery_config)

Which then you'd run with:

spawn_cluster dataflow.py -p 2 -w 3

But I still haven't properly explored this solution, it might hide some other problems

Additional context
Let me know what you think, and if you have other ideas I can spend some time building a proof of concept and see if it could work

@github-actions github-actions bot added the needs triage New issue, needs triage label Oct 20, 2022
@whoahbot whoahbot removed the needs triage New issue, needs triage label Oct 20, 2022
@davidselassie
Copy link
Contributor

I really like the idea of making spawn_cluster a script that does the coordination. I think that'll make demo straightforward.


Since we don't need to share state between the processes (not in python at least)

This is true for the "running the dataflow", but is not true for our test suite. Some of it uses multiprocessing.Manager.list to "easily" send the input and record the output in each worker and gather it back to the testing process. So I'd hope we can come up with some way of programmatically doing this in a "spawn_cluster is just a script" world. (Unclear if all the work we have to do to make pickling work is truly "easy".)

Perhaps something like UnixSocket{Input,Output}Config? Or we could read and write to temp files? I'm sure we could think of others.

In any case, this is something to think about in regards to this change.


Although it is very finicky, I still aspire for us to have "tested documentation" and if there's a big divergence between how we make multi-process testing work and how you run a demo, it might be more work to bridge the gap back to allow a documentation example that starts up a cluster to be programmatically tested.

Currently, we are just using "capture and compare stdout" so that might not actually be that hard to bridge. But the command to spawn the cluster would need to work in the doctests and markdown tests environments.

It might be that this is too much work and not worth it, but again something to think about here.

@Psykopear
Copy link
Contributor Author

This is true for the "running the dataflow", but is not true for our test suite. Some of it uses multiprocessing.Manager.list to "easily" send the input and record the output in each worker and gather it back to the testing process. So I'd hope we can come up with some way of programmatically doing this in a "spawn_cluster is just a script" world. (Unclear if all the work we have to do to make pickling work is truly "easy".)

Perhaps something like UnixSocket{Input,Output}Config? Or we could read and write to temp files? I'm sure we could think of others.

Right, what I meant is that if we don't need it in the execution, we just need to find a different way to test the behavior.
For testing purposes, maybe reading the stdoutput like we already do elsewhere could be enough.

Currently, we are just using "capture and compare stdout" so that might not actually be that hard to bridge. But the command to spawn the cluster would need to work in the doctests and markdown tests environments.

You are right about doctests and markdown tests though, that could be trickier depending on how we implement spawn_cluster.
The implementation I proposed in the first comment might still work, but it would still be a function that you import from python, while ideally I'd like the dataflow file to just use cluster_main, making spawn_cluster something external to the dataflow code.

@davidselassie
Copy link
Contributor

Another issue we had with pickling for tests https://bytewax.slack.com/archives/C028Q25AK3K/p1671138645483599

_pickle.PicklingError: Can't pickle <function make_set_closure_cell.<locals>.set_closure_cell at 0x7f0b37c18160>: it's not found as attr._compat.make_set_closure_cell.<locals>.set_closure_cell
/usr/lib/python3.10/pickle.py:1071: PicklingError

Turns out it's related to importing redirect_stdout within the docs tests. Spooky interference!

Apparently multiprocess makes no claims of compatibility with pytest. uqfoundation/multiprocess#67

So we might continue to run into strange issues.

@Psykopear Psykopear self-assigned this Feb 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants