Skip to content

Commit

Permalink
fix: manifold memory leak (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
mccraigmccraig authored Apr 13, 2021
1 parent 480d461 commit 7f098c1
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 106 deletions.
4 changes: 4 additions & 0 deletions .clj-kondo/config.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{:lint-as {qbits.alia/when-opt clojure.core/when
manifold.deferred/loop clojure.core/loop}
:linters {:unresolved-symbol
{:exclude [(qbits.alia/when-opt)]}}}
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ pom.xml
.idea
.lein-env
/bench.clj
/.clj-kondo/.cache
/.clj-kondo/cache
149 changes: 90 additions & 59 deletions modules/alia-manifold/src/qbits/alia/manifold.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,78 @@
[manifold.deferred :as d]
[manifold.stream :as s]
[qbits.alia :as alia]
[qbits.alia.completable-future :as cf]
[qbits.alia.error :as err]
[qbits.alia.result-set :as result-set])
(:import
[com.datastax.oss.driver.api.core.session Session]
[com.datastax.oss.driver.api.core CqlSession]
[java.util.concurrent CompletionStage]))



(defn handle-page-completion-stage
[^CompletionStage completion-stage
{stream :stream
:as opts}]
(cf/handle-completion-stage
completion-stage

(fn [{current-page :current-page
:as async-result-set-page}]

(if (some? async-result-set-page)

(d/chain
(s/put! stream current-page)
(fn [put?]
(cond

;; last page put ok and there is another
(and put?
(true? (result-set/has-more-pages? async-result-set-page)))
(handle-page-completion-stage
(result-set/fetch-next-page async-result-set-page)
opts)

;; last page put ok and was the last
put?
(s/close! stream)

;; bork! last page did not put.
;; maybe the stream was closed?
:else
(throw
(ex-info
"qbits.alia.manifold/stream-put!-fail"
(merge val (select-keys opts [:statement :values])))))))

;; edge-case - when :page-size lines up with result size,
;; the final page is empty, resulting in a nil async-result-set
(s/close! stream)))

(fn [err]
(d/finally
(s/put! stream err)
(fn [] (s/close! stream))))

opts))
{stream :stream :as opts}]

(-> completion-stage

(d/chain

(fn [async-result-set-page]

(d/loop [{current-page :current-page
:as async-result-set-page} async-result-set-page]

(if (some? async-result-set-page)

(d/chain

(do
;; (prn "handle-page-completion-stage - put!-page" stream)
(s/put! stream current-page))

(fn [put?]
(cond

;; last page put ok and there is another
(and put?
(true? (result-set/has-more-pages? async-result-set-page)))
(do
;; (prn "handle-page-completion-stage - go-again" stream)
(d/chain
(result-set/fetch-next-page async-result-set-page)
(fn [next-async-result-set-page]
(d/recur next-async-result-set-page))))

;; last page put ok and was the last
put?
(do
;; (prn "handle-page-completion-stage - close!" stream)
(s/close! stream))

;; bork! last page did not put.
;; maybe the stream was closed?
:else
(do
;; (prn "handle-page-completion-stage - bork!" stream)
(throw
(ex-info
"qbits.alia.manifold/stream-put!-fail"
(merge val (select-keys opts [:statement :values]))))))))

;; edge-case - when :page-size lines up with result size,
;; the final page is empty, resulting in a nil async-result-set
(do
;; (prn "handle-page-completion-stage - lined-up close!" stream)
(s/close! stream))))))

(d/catch
(fn [err]
(d/finally
(s/put!
stream
(err/ex->ex-info
err
(select-keys opts [:statement :values])))
(fn [] (s/close! stream)))))))

(defn execute-stream-pages
"similar to `qbits.alia/execute`, but executes async and returns a
Expand All @@ -78,39 +95,53 @@
:as opts}]
(let [stream (or stream
;; fetch one page ahead by default
(s/stream (or page-buffer-size 1)))
(s/stream (or page-buffer-size 1)))]

page-cs (alia/execute-async session query opts)]
(try
(let [page-cs (alia/execute-async session query opts)]

(handle-page-completion-stage
page-cs
(merge opts
{:stream stream
:statement query}))
(handle-page-completion-stage
page-cs
(merge opts
{:stream stream
:statement query})))
(catch Exception e
(s/close! stream)
(throw e)))

stream))

([^CqlSession session query]
(execute-stream-pages session query {})))


(defn ^:private safe-identity
(defn ^:private coerce-seq
"if the page should happen to be an Exception, wrap
it in a vector so that it can be concatenated to the
value stream"
[v]
(if (sequential? v) v [v]))

(defn concat-seq-s
"concat a stream of seqs onto a stream of plain values"
[stream-s]
(let [out (s/stream)]

(s/connect-via
stream-s
#(s/put-all! out (coerce-seq %))
out)

out))

(defn execute-stream
"like `execute-stream-pages`, but returns a `Stream<row>`
supports all the args of `execute-stream-pages`"
([^CqlSession session query {:as opts}]
(let [stream (execute-stream-pages session query opts)]
(let [page-s (execute-stream-pages session query opts)]

(s/transform
(mapcat safe-identity)
stream)))
(concat-seq-s page-s)))

([^CqlSession session query]
(execute-stream session query {})))
Expand Down
44 changes: 7 additions & 37 deletions modules/alia/src/qbits/alia/completable_future.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
(ns qbits.alia.completable-future
(:require
[qbits.alia.error :as err])
(:import
[java.util.concurrent
Executor
CompletionStage
CompletableFuture
ExecutionException
CompletionException]
CompletableFuture]
[java.util.function BiFunction]))

(defn completed-future
Expand All @@ -19,25 +19,17 @@
(.completeExceptionally f e)
f))

(defn ex-unwrap
"Unwraps exceptions if we have a valid ex-cause present"
[ex]
(if (or (instance? ExecutionException ex)
(instance? CompletionException ex))
(or (ex-cause ex) ex)
ex))

(defn handle-completion-stage*
(defn handle-completion-stage
"java incantation to handle both branches of a completion-stage"
([^CompletionStage completion-stage
on-success
on-error
{^Executor executor :executor
:as opts}]
:as _opts}]
(let [handler-bifn (reify BiFunction
(apply [_ r ex]
(if (some? ex)
(on-error (ex-unwrap ex))
(on-error (err/ex-unwrap ex))
(on-success r))))]

(if (some? executor)
Expand All @@ -46,26 +38,4 @@
([^CompletionStage completion-stage
on-success
on-error]
(handle-completion-stage* completion-stage on-success on-error {})))

(defmacro handle-completion-stage
"handle both branches of a completion stage
it's a macro to capture any exceptions from the completion-stage
form and also send them through the `on-error` handler"
([completion-stage
on-success
on-error
opts]
`(try
(handle-completion-stage* ~completion-stage ~on-success ~on-error ~opts)
(catch Exception e#
(handle-completion-stage*
(failed-future (ex-unwrap e#))
~on-success
~on-error
~opts))))
([completion-stage
on-success
on-error]
`(handle-completion-stage ~completion-stage ~on-success ~on-error {})))
(handle-completion-stage completion-stage on-success on-error {})))
23 changes: 14 additions & 9 deletions modules/alia/src/qbits/alia/error.clj
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
(ns qbits.alia.error
(:import
[java.util.concurrent ExecutionException]
[java.util.concurrent ExecutionException CompletionException]
[com.datastax.oss.driver.api.core.cql
Statement
SimpleStatement
PreparedStatement
BoundStatement
BatchStatement]))

(defn ex-unwrap
"Unwraps exceptions from j.u.c wrappers if there is an ex-cause"
[ex]
(if (or (instance? ExecutionException ex)
(instance? CompletionException ex))
(or (ex-cause ex) ex)
ex))

(defprotocol IStatementQuery
(statement-query [stmt]))

(extend-protocol IStatementQuery
SimpleStatement
SimpleStatement
(statement-query [stmt]
[:simple (.getQuery stmt)])
PreparedStatement
Expand All @@ -23,7 +30,7 @@
[:bound (-> stmt .getPreparedStatement .getQuery)])
BatchStatement
(statement-query [stmts]
[:batch
[:batch
{:type (str (.getBatchType stmts))}
(for [stmt stmts] (statement-query stmt))])
Object
Expand All @@ -35,17 +42,15 @@

(defn ^:no-doc ex->ex-info
"wrap an exception with some context information"
([^Exception ex
([^Exception ex
{stmt :statement
:as data}
:as data}
msg]
(let [stmt-query (statement-query stmt)]
(ex-info msg
(merge {:type :qbits.alia/execute
:query stmt-query}
data)
(if (instance? ExecutionException ex)
(ex-cause ex)
ex))))
(ex-unwrap ex))))
([ex data]
(ex->ex-info ex data "Query execution failed")))
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject cc.qbits/alia-all "5.0.0-alpha4-SNAPSHOT"
(defproject cc.qbits/alia-all "5.0.0-alpha8-SNAPSHOT"
:description "Cassandra CQL3 client for Clojure - datastax/java-driver wrapper"
:url "https://github.com/mpenet/alia"
:scm {:name "git"
Expand Down

0 comments on commit 7f098c1

Please sign in to comment.