Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GenStage and Ecto streams? #150

Closed
mtwilliams opened this issue Feb 27, 2017 · 12 comments
Closed

GenStage and Ecto streams? #150

mtwilliams opened this issue Feb 27, 2017 · 12 comments

Comments

@mtwilliams
Copy link

During my work, I've discovered an "impedance mismatch" between GenStage and Ecto. Specifically, we drive a lot work using Ecto.Stream and its lower-level equivalent:

alias Ecto.Adapters.SQL

# We have a complicated query that produces millions of rows.
q = "SELECT n FROM generate_series(1, 1000000) n;"

chunks = SQL.stream(My.Repo, q, [], log: false)

stream = Stream.flat_map(chunks, fn
  %{num_rows: 0} ->
    []
  %{rows: rows} ->
    Enum.map(rows, fn row -> ... end end)
end)

# We use those rows to drive a bunch of (parallelizable) work.
{:ok, producer} = GenStage.from_enumerable(stream, ...)
:ok = GenStage.async_subscribe(self(), to: producer, ...)

We use the above pattern a lot. Unfortunately, Ecto requires streams to be run inside a transaction, thus making GenStage.from_enumerable/2 unusable.

To work around this, we spawn a forwarding process that sends chunks of events whenever our GenStage producer requests them:

forward_upon_request = fn chunk ->
  receive do
    :more ->
      send(..., {:supply, chunk})
  end
end

My.Repo.transaction! fn ->
  stream |> Stream.chunk(n) |> Stream.each(forward_upon_request) |> Stream.run
end

We also tried to write our own producer that reduces streams in a transaction in a similar fashion to GenStage.Streamer. It didn't work because – as far as I could tell – the continuations reuse the connection from the first transaction?

While the aforementioned hack works, it is sub-optimal.

Do you see any way GenStage.Streamer can support such a use case through some sort of generalized functionality?

If not, should Ecto or another library provide a GenStage producer that produces events from a query?

@josevalim
Copy link
Member

josevalim commented Feb 27, 2017 via email

@mtwilliams
Copy link
Author

Thanks for the quick response.

Unfortunately, GenStage fits our use case better than Task.async_stream.

@josevalim
Copy link
Member

@mtwilliams unfortunately Repo.stream won't work with GenStage.from_enumerable because both need the inbox to work. The fact those can't work together is one of the reasons we ended-up creating GenStage anyway, so the best would be if Postgrex support GenStage directly in the connection. Outside of that there is not much GenStage itself can do. For now you will have to use any of the alternative solutions we have mentioned here.

@narrowtux
Copy link

narrowtux commented Oct 9, 2017

I have made a module that hacks an ecto stream into a genstage producer. Easy to use and seems quite performant. It also respects the demanded amount and won't send more than demanded, and also waits until more items have been demanded.

https://gist.github.com/narrowtux/286666711864246d3dbb6859dda0d694

Might be useful for anybody that's stumbling upon this issue in the future.

@fishcakez
Copy link
Member

fishcakez commented Oct 9, 2017

@narrowtux nice that you got it working! With this approach the forwarder will fetch max_rows at a time, and then push those based on demand. So even though its sending what is demanded, it will always fetch 500 (by default) from the database.

There is work in progress to support transactions over multiple callbacks during a process' lifecycle (e.g begin in init/1, commit in terminate/2) and support lower level access to the cursors so that each fetch would only fetch the number of demanded rows at a time. elixir-ecto/postgrex#321 xerions/mariaex#196. It has stalled on my end but I hope to return to it soon.

@narrowtux
Copy link

narrowtux commented Oct 9, 2017 via email

@fishcakez
Copy link
Member

Do you think that my module does not actually fetch a
lot of rows at a time from the DB?

Your module will fetch 500 rows (or max_rows option amount) in chunks from the database, if the demand is less than number of rows it fetches then the remaining rows will be buffered inside the stream. Similarly if the demand is more, then it will fetch chunks of rows until it has greater than or equal to the demanded amount, with any left overs getting buffered.

@narrowtux
Copy link

Ah ok. Just scanned your response earlier and wasn't sure whose code you were talking about :)

@fishcakez
Copy link
Member

I edited my previous comment so there are 2 paragraphs to make it clearer

@msmykowski
Copy link

Any update on a good approach for this problem? I am currently dealing with the same issue.

@sobolevn
Copy link

sobolevn commented Mar 6, 2019

I have found https://github.com/mtwilliams/bourne
It looks like it solves the task. But, I have not tested it in production yet.

@mtwilliams
Copy link
Author

@sobolevn I wrote it exactly for this use case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

6 participants