Skip to content

Commit

Permalink
extract local reducer, accumulator
Browse files Browse the repository at this point in the history
cnautze committed Sep 11, 2024
1 parent 550b1ce commit 5466b6c
Showing 2 changed files with 41 additions and 37 deletions.
53 changes: 27 additions & 26 deletions src/exoscale/vinyl/cursor.clj
Original file line number Diff line number Diff line change
@@ -15,6 +15,31 @@
(as-list [this] "Transform a cursor or cursor future to a list")
(as-iterator [this] "Transform a cursor or cursor future to an iterator"))

(defn apply-transduce-with-reducer
"Apply reducer (transducer) over cursor. Set completion? to true if final
transducing call needs to be done (stateful transducer)."
[^RecordCursor cursor completion? cont-fn reducer acc]
(.thenApply
(AsyncUtil/whileTrue
(reify Supplier
(get [_]
(-> cursor
.onNext
(.thenApply
(fn/make-fun
(fn [^RecordCursorResult result]
(when (ifn? cont-fn)
(-> result .getContinuation .toBytes cont-fn))
(let [next? (.hasNext result)
new-acc (when next? (swap! acc reducer (.get result)))]
(and (not (reduced? new-acc)) next?))))))))
(.getExecutor cursor))
(fn/make-fun (fn [_]
(unreduced
(cond-> @acc
completion?
reducer))))))

(defn apply-transduce
"A variant of `RecordCursor::reduce` that honors `reduced?` and supports
transducers.
@@ -23,36 +48,12 @@
When `cont-fn` is given, it will be called on the last seen continuation
byte array for every new element."
([^RecordCursor cursor xform f init cont-fn]
([cursor xform f init cont-fn]
(let [reducer (if (some? xform) (xform f) f)
acc (if (instance? clojure.lang.Atom init)
init
(atom (or init (f))))]
(.thenApply
(AsyncUtil/whileTrue
(reify Supplier
(get [_]
(try
(-> cursor
.onNext
(.thenApply
(fn/make-fun
(fn [^RecordCursorResult result]
(when (ifn? cont-fn)
(-> result .getContinuation .toBytes cont-fn))
(let [next? (.hasNext result)
new-acc (when next? (swap! acc reducer (.get result)))]
(and (not (reduced? new-acc)) next?))))))
(catch Exception e
(when (some? xform)
(reducer @acc))
(throw e)))))
(.getExecutor cursor))
(fn/make-fun (fn [_]
(unreduced
(cond-> @acc
(some? xform)
reducer)))))))
(apply-transduce-with-reducer cursor (some? xform) cont-fn reducer acc)))
([cursor f init cont-fn]
(apply-transduce cursor nil f init cont-fn))
([cursor f init]
25 changes: 14 additions & 11 deletions test/exoscale/vinyl/cursor_test.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns exoscale.vinyl.cursor-test
(:require [clojure.test :refer [deftest are is]]
[exoscale.vinyl.cursor :refer [apply-transforms]]
[exoscale.vinyl.cursor :refer [apply-transforms apply-transduce-with-reducer]]
[exoscale.vinyl.demostore :as ds :refer [*db*]]
[exoscale.vinyl.store :as store])
(:import [com.apple.foundationdb.record RecordCursor]
@@ -57,13 +57,16 @@
[0 1 2 3 4 5 6] (completing reduce-plus) 0 15))

(deftest stateful-transducer-test
(let [processed (atom [])]
(is (= [[1 2 3] [4 5] [6 7 8] [9]]
(ds/with-build-fdb
(fn [] (let [cursor (from-iterator (make-faulty-iterator [1 2 3 4 5 6 7 8 9] 5))]
@(store/run-async *db* (fn [_store] (apply-transforms
cursor
{::store/reducer (completing (fn [_acc items] (swap! processed conj items)))
::store/reduce-init []
::store/transducer (partition-all 3)})))
@processed)))))))
(is (= [[1 2 3] [4 5 6] [7 8 9] [10]]
(ds/with-build-fdb
(fn [] (let [cursor (from-iterator (make-faulty-iterator [1 2 3 4 5 6 7 8 9 10] 5))
transducer (partition-all 3)
reducer-fn (completing (fn [acc items] (conj acc items)))
reducer (transducer reducer-fn)
acc (atom [])]
@(store/run-async *db* (fn [_store] (apply-transduce-with-reducer
cursor
transducer
nil
reducer
acc)))))))))

0 comments on commit 5466b6c

Please sign in to comment.