diff --git a/src/main/lrsql/admin/protocol.clj b/src/main/lrsql/admin/protocol.clj index ed7dbec0..1877d000 100644 --- a/src/main/lrsql/admin/protocol.clj +++ b/src/main/lrsql/admin/protocol.clj @@ -43,5 +43,11 @@ "Soft-delete a reaction.")) (defprotocol AdminLRSManager - (-delete-actor [this params]) - (-get-statements-csv [this writer property-paths params])) + (-delete-actor [this params] + "Delete actor by `:actor-id`") + (-get-statements-csv [this writer property-paths params] + "Retrieve statements by CSV. Instead of returning a sequence of + statements, streams them to `writer` as a side effect, in order to + avoid storing them in memory. `property-paths` are defined in the + Reactions API, while `params` are the same query params for + `-get-statements`.")) diff --git a/src/main/lrsql/ops/query/statement.clj b/src/main/lrsql/ops/query/statement.clj index b8b151b4..033814a5 100644 --- a/src/main/lrsql/ops/query/statement.clj +++ b/src/main/lrsql/ops/query/statement.clj @@ -1,6 +1,5 @@ (ns lrsql.ops.query.statement (:require [clojure.spec.alpha :as s] - [clojure.java.io :as io] [clojure.data.csv :as csv] [com.yetanalytics.lrs.protocol :as lrsp] [lrsql.backend.protocol :as bp] @@ -129,55 +128,38 @@ (query-one-statement bk tx input ltags) (query-many-statements bk tx input ltags prefix)))) -#_(s/fdef query-all-statements +(s/fdef query-statements-stream :args (s/cat :bk ss/statement-backend? :tx transaction? :input ss/statement-query-many-spec :ltags ss/lang-tags-spec - :property-paths vector?)) + :property-paths vector? + :writer #(instance? java.io.Writer %))) -(defn query-all-statements - "Query a lazy seq of all the statements in the database, filtered by `input`. - The `:limit` parameter will dictate the size of each query batch, but will - not limit the total number of statements streamed. Ignores attachments." - [bk tx input ltags property-paths writeable] - (let [format (:format input) - input (-> input - (dissoc :from :query-params) - (assoc :limit 1000000)) - json-paths (us/property-paths->json-paths property-paths) +(defn query-statements-stream + "Stream all the statements in the database, filtered by `input`, to `writer`. + The `:limit` parameter will be ignored. Attachments are not included." + [bk tx input ltags property-paths writer] + (let [format (:format input) + input (-> input ; TODO: Remove `:input` from query entirely. + (dissoc :from :query-params) + (assoc :limit 1000000)) + json-paths (us/property-paths->json-paths property-paths) csv-headers (us/property-paths->csv-headers property-paths)] - (with-open [writer (io/writer writeable)] - (csv/write-csv writer [csv-headers] :newline :cr+lf) - (transduce (comp (map (fn [res] - (query-res->statement format ltags res))) - (map (fn [stmt] - (us/statement->csv-row json-paths stmt)))) - (fn write-csv-reducer - ([writer] - writer) - ([writer row] - (csv/write-csv writer [row] :newline :cr+lf) - writer)) - writer - (bp/-query-statements-lazy bk tx input)) - #_(->> (bp/-query-statements-lazy bk tx input) - #_(map format-fn result) - (into []) - #_(us/statements->csv-seq property-paths) - #_(reduce (fn [writer row] - (csv/write-csv writer [row] :newline :cr+lf)) - writer)))) - #_(let [{:keys [statement-results ?next-cursor]} - (query-many-statements* bk tx input ltags)] - (if ?next-cursor - (let [next-str (u/uuid->str ?next-cursor) - new-input (-> input - (assoc :from ?next-cursor) - (assoc-in [:query-params :from] next-str))] - (concat statement-results - (lazy-seq (query-all-statements bk tx new-input ltags property-paths)))) - statement-results))) + (csv/write-csv writer [csv-headers] :newline :cr+lf) + (transduce (comp (map (fn [res] + (query-res->statement format ltags res))) + (map (fn [stmt] + (us/statement->csv-row json-paths stmt)))) + (fn write-csv-reducer + ([writer] + writer) + ([writer row] + (csv/write-csv writer [row] :newline :cr+lf) + writer)) + writer + (bp/-query-statements-lazy bk tx input)) + writer)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Statement Descendant Querying diff --git a/src/main/lrsql/system/lrs.clj b/src/main/lrsql/system/lrs.clj index ff1526d3..f14fa0ce 100644 --- a/src/main/lrsql/system/lrs.clj +++ b/src/main/lrsql/system/lrs.clj @@ -404,7 +404,7 @@ input (-> params ; TODO: Higher limit for CSV stream? (stmt-util/ensure-default-max-limit config) (stmt-input/query-statement-input nil))] - (stmt-q/query-all-statements backend + (stmt-q/query-statements-stream backend conn input {}