Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async_stream + Stream.run is blocking #8445

Closed
virtual-light opened this issue Dec 1, 2018 · 5 comments
Closed

async_stream + Stream.run is blocking #8445

virtual-light opened this issue Dec 1, 2018 · 5 comments

Comments

@virtual-light
Copy link

Just wondering why such code:

items = Repo.stream(Something)

stream = Task.Supervisor.async_stream_nolink(TaskSupervisor, items, fn item -> 
  do_some_io_with(item)
end)

Repo.transaction(fn -> Stream.run(stream) end)

blocks caller?

Stream.run(stream) is assuming that I don't care about results so there is no need to wait while all tasks will be finished. So why it act the same as Enum.to_list(stream)? Is there some way to do it without blocking except starting a separate process?

@josevalim
Copy link
Member

Yes, Stream.run always blocks the caller. The difference to Enum.to_list is that it doesn't have to build a list because you don't care about results. Stream is always about laziness, it never starts processes for you. In your case, you can start a separate process yourself.

Just an example, if Stream.run started another process, then your code sample wouldn't work, because the new process wouldn't be inside the transaction (which is per process).

@virtual-light
Copy link
Author

virtual-light commented Dec 1, 2018

@josevalim thank you,

Stream is always about laziness

Yes, but async_stream about asynchronous processing, so I hoped to have it advantages. But it looks like it will always wait for results even if I don't care about them.

@josevalim
Copy link
Member

Ah, I see. If you want it to fully run as a separate tree of processes, then you can look at GenStage and Flow.

@virtual-light
Copy link
Author

virtual-light commented Dec 1, 2018

you can look at GenStage and Flow.

@josevalim As I know they don't work well with Repo.stream

elixir-lang/gen_stage#150
https://elixirforum.com/t/repo-stream-flow-from-enumerable/16477/3

Does this still relevant?

We will support this likely on Ecto 3.1

@josevalim
Copy link
Member

Ah, yes, you are correct. So you will have to start the process yourself for now but, as I said, if we started the process automatically, which is what Flow/GenStage does, then it wouldn't work with Ecto either. :D

The feature you request was not yet implemented, so Ecto 3.1 it is indeed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants