diff --git a/src/exoscale/vinyl/cursor.clj b/src/exoscale/vinyl/cursor.clj index 37d502d..ce151e7 100644 --- a/src/exoscale/vinyl/cursor.clj +++ b/src/exoscale/vinyl/cursor.clj @@ -15,6 +15,31 @@ (as-list [this] "Transform a cursor or cursor future to a list") (as-iterator [this] "Transform a cursor or cursor future to an iterator")) +(defn apply-transduce-with-reducer + "Apply reducer (transducer) over cursor. Set completion? to true if final + transducing call needs to be done (stateful transducer)." + [^RecordCursor cursor completion? cont-fn reducer acc] + (.thenApply + (AsyncUtil/whileTrue + (reify Supplier + (get [_] + (-> cursor + .onNext + (.thenApply + (fn/make-fun + (fn [^RecordCursorResult result] + (when (ifn? cont-fn) + (-> result .getContinuation .toBytes cont-fn)) + (let [next? (.hasNext result) + new-acc (when next? (swap! acc reducer (.get result)))] + (and (not (reduced? new-acc)) next?)))))))) + (.getExecutor cursor)) + (fn/make-fun (fn [_] + (unreduced + (cond-> @acc + completion? + reducer)))))) + (defn apply-transduce "A variant of `RecordCursor::reduce` that honors `reduced?` and supports transducers. @@ -23,36 +48,12 @@ When `cont-fn` is given, it will be called on the last seen continuation byte array for every new element." - ([^RecordCursor cursor xform f init cont-fn] + ([cursor xform f init cont-fn] (let [reducer (if (some? xform) (xform f) f) acc (if (instance? clojure.lang.Atom init) init (atom (or init (f))))] - (.thenApply - (AsyncUtil/whileTrue - (reify Supplier - (get [_] - (try - (-> cursor - .onNext - (.thenApply - (fn/make-fun - (fn [^RecordCursorResult result] - (when (ifn? cont-fn) - (-> result .getContinuation .toBytes cont-fn)) - (let [next? (.hasNext result) - new-acc (when next? (swap! acc reducer (.get result)))] - (and (not (reduced? new-acc)) next?)))))) - (catch Exception e - (when (some? xform) - (reducer @acc)) - (throw e))))) - (.getExecutor cursor)) - (fn/make-fun (fn [_] - (unreduced - (cond-> @acc - (some? xform) - reducer))))))) + (apply-transduce-with-reducer cursor (some? xform) cont-fn reducer acc))) ([cursor f init cont-fn] (apply-transduce cursor nil f init cont-fn)) ([cursor f init] diff --git a/test/exoscale/vinyl/cursor_test.clj b/test/exoscale/vinyl/cursor_test.clj index ed4b664..defbd33 100644 --- a/test/exoscale/vinyl/cursor_test.clj +++ b/test/exoscale/vinyl/cursor_test.clj @@ -1,6 +1,6 @@ (ns exoscale.vinyl.cursor-test (:require [clojure.test :refer [deftest are is]] - [exoscale.vinyl.cursor :refer [apply-transforms]] + [exoscale.vinyl.cursor :refer [apply-transforms apply-transduce-with-reducer]] [exoscale.vinyl.demostore :as ds :refer [*db*]] [exoscale.vinyl.store :as store]) (:import [com.apple.foundationdb.record RecordCursor] @@ -57,13 +57,16 @@ [0 1 2 3 4 5 6] (completing reduce-plus) 0 15)) (deftest stateful-transducer-test - (let [processed (atom [])] - (is (= [[1 2 3] [4 5] [6 7 8] [9]] - (ds/with-build-fdb - (fn [] (let [cursor (from-iterator (make-faulty-iterator [1 2 3 4 5 6 7 8 9] 5))] - @(store/run-async *db* (fn [_store] (apply-transforms - cursor - {::store/reducer (completing (fn [_acc items] (swap! processed conj items))) - ::store/reduce-init [] - ::store/transducer (partition-all 3)}))) - @processed))))))) + (is (= [[1 2 3] [4 5 6] [7 8 9] [10]] + (ds/with-build-fdb + (fn [] (let [cursor (from-iterator (make-faulty-iterator [1 2 3 4 5 6 7 8 9 10] 5)) + transducer (partition-all 3) + reducer-fn (completing (fn [acc items] (conj acc items))) + reducer (transducer reducer-fn) + acc (atom [])] + @(store/run-async *db* (fn [_store] (apply-transduce-with-reducer + cursor + transducer + nil + reducer + acc)))))))))