Skip to content

Commit

Permalink
Use next.jdbc/plan and test queries beyond :limit
Browse files Browse the repository at this point in the history
  • Loading branch information
kelvinqian00 committed Feb 14, 2025
1 parent 7ec555a commit 5fce0e3
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 67 deletions.
9 changes: 9 additions & 0 deletions src/db/postgres/lrsql/postgres/record.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns lrsql.postgres.record
(:require [com.stuartsierra.component :as cmp]
[hugsql.core :as hug]
[next.jdbc :as jdbc]
[lrsql.backend.data :as bd]
[lrsql.backend.protocol :as bp]
[lrsql.init :refer [init-hugsql-adapter!]]
Expand All @@ -18,6 +19,8 @@
(hug/def-db-fns "lrsql/postgres/sql/update.sql")
(hug/def-db-fns "lrsql/postgres/sql/delete.sql")

(hug/def-sqlvec-fns "lrsql/postgres/sql/query.sql")

;; Define record
#_{:clj-kondo/ignore [:unresolved-symbol]} ; Shut up VSCode warnings
(defrecord PostgresBackend [tuning]
Expand Down Expand Up @@ -102,6 +105,12 @@
(query-statement-exists tx input))
(-query-statement-descendants [_ tx input]
(query-statement-descendants tx input))
(-query-statements-lazy [_ tx input]
(let [sqlvec (query-statements-sqlvec input)]
(jdbc/plan tx sqlvec {:fetch-size 4000
:concurrency :read-only
:cursors :close
:result-type :forward-only})))

bp/ActorBackend
(-insert-actor! [_ tx input]
Expand Down
7 changes: 6 additions & 1 deletion src/db/sqlite/lrsql/sqlite/record.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:require [clojure.tools.logging :as log]
[com.stuartsierra.component :as cmp]
[hugsql.core :as hug]
[next.jdbc :as jdbc]
[lrsql.backend.protocol :as bp]
[lrsql.backend.data :as bd]
[lrsql.init :refer [init-hugsql-adapter!]]
Expand All @@ -19,6 +20,8 @@
(hug/def-db-fns "lrsql/sqlite/sql/update.sql")
(hug/def-db-fns "lrsql/sqlite/sql/delete.sql")

(hug/def-sqlvec-fns "lrsql/sqlite/sql/query.sql")

;; Schema Update Helpers

#_{:clj-kondo/ignore [:unresolved-symbol]}
Expand Down Expand Up @@ -135,6 +138,9 @@
(query-statement-exists tx input))
(-query-statement-descendants [_ tx input]
(query-statement-descendants tx input))
(-query-statements-lazy [_ tx input]
(let [sqlvec (query-statements-sqlvec input)]
(jdbc/plan tx sqlvec)))

bp/ActorBackend
(-insert-actor! [_ tx input]
Expand All @@ -151,7 +157,6 @@
(delete-actor-agent-profile tx input)
(delete-actor-state-document tx input)
(delete-actor-actor tx input))

(-query-actor [_ tx input]
(query-actor tx input))

Expand Down
22 changes: 9 additions & 13 deletions src/main/lrsql/admin/interceptors/lrs_management.clj
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,6 @@
{"Content-Type" "text/csv"
"Content-Disposition" "attachment"})

(defn- stream-csv
[csv-data-seq]
(fn [^ServletOutputStream os]
(with-open [writer (io/writer os)]
(csv/write-csv writer csv-data-seq :newline :cr+lf))))

(def download-statement-csv
(interceptor
{:name ::download-statement-csv
Expand All @@ -92,11 +86,13 @@
request :request}
ctx
{:keys [property-paths query-params]}
request
csv-data-seq (adp/-get-statements-csv lrs
property-paths
query-params)]
request]
(assoc ctx
:response {:status 200
:headers csv-response-header
:body (stream-csv csv-data-seq)})))}))
:response
{:status 200
:headers csv-response-header
:body (fn [^ServletOutputStream os]
(adp/-get-statements-csv lrs
os
property-paths
query-params))})))}))
2 changes: 1 addition & 1 deletion src/main/lrsql/admin/protocol.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@

(defprotocol AdminLRSManager
(-delete-actor [this params])
(-get-statements-csv [this property-paths params]))
(-get-statements-csv [this writer property-paths params]))
1 change: 1 addition & 0 deletions src/main/lrsql/backend/protocol.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
;; Queries
(-query-statement [this tx input])
(-query-statements [this tx input])
(-query-statements-lazy [this tx input])
(-query-statement-exists [this tx input])
(-query-statement-descendants [this tx input]))

Expand Down
51 changes: 41 additions & 10 deletions src/main/lrsql/ops/query/statement.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
(ns lrsql.ops.query.statement
(:require [clojure.spec.alpha :as s]
[clojure.java.io :as io]
[clojure.data.csv :as csv]
[com.yetanalytics.lrs.protocol :as lrsp]
[lrsql.backend.protocol :as bp]
[lrsql.spec.common :refer [transaction?]]
Expand Down Expand Up @@ -74,8 +76,8 @@
query-results (bp/-query-statements bk tx input*)
?next-cursor (when (and limit
(= (inc limit) (count query-results)))
(-> query-results last :id u/uuid->str))
query-results* (if (not-empty ?next-cursor)
(-> query-results last :id))
query-results* (if (some? ?next-cursor)
(butlast query-results)
query-results)
stmt-results (map (partial query-res->statement format ltags)
Expand Down Expand Up @@ -127,25 +129,54 @@
(query-one-statement bk tx input ltags)
(query-many-statements bk tx input ltags prefix))))

(s/fdef query-all-statements
#_(s/fdef query-all-statements
:args (s/cat :bk ss/statement-backend?
:tx transaction?
:input ss/statement-query-many-spec
:ltags ss/lang-tags-spec))
:ltags ss/lang-tags-spec
:property-paths vector?))

(defn query-all-statements
"Query a lazy seq of all the statements in the database, filtered by `input`.
The `:limit` parameter will dictate the size of each query batch, but will
not limit the total number of statements streamed. Ignores attachments."
[bk tx input ltags]
(let [{:keys [statement-results ?next-cursor]}
[bk tx input ltags property-paths writeable]
(let [format (:format input)
input (-> input
(dissoc :from :query-params)
(assoc :limit 1000000))
json-paths (us/property-paths->json-paths property-paths)
csv-headers (us/property-paths->csv-headers property-paths)]
(with-open [writer (io/writer writeable)]
(csv/write-csv writer [csv-headers] :newline :cr+lf)
(transduce (comp (map (fn [res]
(query-res->statement format ltags res)))
(map (fn [stmt]
(us/statement->csv-row json-paths stmt))))
(fn write-csv-reducer
([writer]
writer)
([writer row]
(csv/write-csv writer [row] :newline :cr+lf)
writer))
writer
(bp/-query-statements-lazy bk tx input))
#_(->> (bp/-query-statements-lazy bk tx input)
#_(map format-fn result)
(into [])
#_(us/statements->csv-seq property-paths)
#_(reduce (fn [writer row]
(csv/write-csv writer [row] :newline :cr+lf))
writer))))
#_(let [{:keys [statement-results ?next-cursor]}
(query-many-statements* bk tx input ltags)]
(if ?next-cursor
(let [new-input (-> input
(let [next-str (u/uuid->str ?next-cursor)
new-input (-> input
(assoc :from ?next-cursor)
(assoc-in [:query-params :from] ?next-cursor))]
(lazy-cat statement-results
(query-all-statements bk tx new-input ltags)))
(assoc-in [:query-params :from] next-str))]
(concat statement-results
(lazy-seq (query-all-statements bk tx new-input ltags property-paths))))
statement-results)))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
Expand Down
23 changes: 14 additions & 9 deletions src/main/lrsql/system/lrs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,17 @@
input (agent-input/delete-actor-input actor-ifi)]
(jdbc/with-transaction [tx conn]
(stmt-cmd/delete-actor! backend tx input))))
(-get-statements-csv [lrs property-paths params]
(let [conn (lrs-conn lrs)]
(jdbc/with-transaction [tx conn]
(let [config (:config lrs)
input (-> params ; TODO: Higher limit for CSV stream?
(stmt-util/ensure-default-max-limit config)
(stmt-input/query-statement-input nil))
stmt-seq (stmt-q/query-all-statements backend tx input {})]
(stmt-util/statements->csv-seq property-paths stmt-seq))))))
(-get-statements-csv [lrs output-stream property-paths params]
(let [conn (lrs-conn lrs)
config (:config lrs)
input (-> params ; TODO: Higher limit for CSV stream?
(stmt-util/ensure-default-max-limit config)
(stmt-input/query-statement-input nil))]
(stmt-q/query-all-statements backend
conn
input
{}
property-paths
output-stream)
#_(jdbc/with-transaction [tx conn]
(stmt-q/query-all-statements backend tx input {} property-paths)))))
12 changes: 10 additions & 2 deletions src/main/lrsql/util/statement.clj
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@
"/statements?"
(form-encode
(cond-> query-params
true (assoc :from next-cursor)
true (assoc :from (u/uuid->str next-cursor))
?agent (assoc :agent (u/write-json-str ?agent)))))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
Expand All @@ -179,7 +179,15 @@
{:return-missing? true
:return-duplicates? false})

(defn- statement->csv-row
(defn property-paths->json-paths
[property-paths]
(mapv up/path->jsonpath-vec property-paths))

(defn property-paths->csv-headers
[property-paths]
(mapv up/path->csv-header property-paths))

(defn statement->csv-row
[json-paths statement]
(pa/get-values* statement json-paths json-path-opts))

Expand Down
78 changes: 47 additions & 31 deletions src/test/lrsql/admin/protocol_test.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
(ns lrsql.admin.protocol-test
"Test the protocol fns of `AdminAccountManager`, `APIKeyManager`, `AdminStatusProvider` directly."
(:require [clojure.test :refer [deftest testing is use-fixtures]]
[com.stuartsierra.component :as component]
[clojure.data.csv :as csv]
[next.jdbc :as jdbc]
[com.stuartsierra.component :as component]
[com.yetanalytics.squuid :as squuid]
[com.yetanalytics.lrs.protocol :as lrsp]
[xapi-schema.spec.regex :refer [Base64RegEx]]
[lrsql.admin.protocol :as adp]
[lrsql.lrs-test :as lrst]
[lrsql.lrs-test :as lrst]
[lrsql.test-support :as support]
[lrsql.util :as u]
[lrsql.test-constants :as tc]
[next.jdbc :as jdbc]
[lrsql.util.actor :as ua]))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
Expand Down Expand Up @@ -271,35 +273,49 @@
(try
(lrsp/-store-statements lrs auth-ident [stmt-0 stmt-1] [])
(testing "CSV Seq - no params"
(let [stmt-seq (adp/-get-statements-csv lrs hdrs {})]
(is (not (realized? stmt-seq)))
(is (= ["id" "actor_mbox" "verb_id" "object_id"]
(first stmt-seq)))
(is (= [(get stmt-1 "id")
(get-in stmt-1 ["actor" "mbox"])
(get-in stmt-1 ["verb" "id"])
(get-in stmt-1 ["object" "id"])]
(first (rest stmt-seq))))
(is (= [(get stmt-0 "id")
(get-in stmt-0 ["actor" "mbox"])
(get-in stmt-0 ["verb" "id"])
(get-in stmt-0 ["object" "id"])]
(first (rest (rest stmt-seq)))))))
(with-open [writer (java.io.StringWriter.)]
(adp/-get-statements-csv lrs writer hdrs {})
(let [stmt-str (str writer)
stmt-seq (csv/read-csv stmt-str)]
(is (= ["id" "actor_mbox" "verb_id" "object_id"]
(first stmt-seq)))
(is (= [(get stmt-1 "id")
(get-in stmt-1 ["actor" "mbox"])
(get-in stmt-1 ["verb" "id"])
(get-in stmt-1 ["object" "id"])]
(first (rest stmt-seq))))
(is (= [(get stmt-0 "id")
(get-in stmt-0 ["actor" "mbox"])
(get-in stmt-0 ["verb" "id"])
(get-in stmt-0 ["object" "id"])]
(first (rest (rest stmt-seq))))))))
(testing "CSV Seq - ascending set to true"
(let [stmt-seq (adp/-get-statements-csv lrs hdrs {:ascending true})]
(is (not (realized? stmt-seq)))
(is (= ["id" "actor_mbox" "verb_id" "object_id"]
(first stmt-seq)))
(is (= [(get stmt-0 "id")
(get-in stmt-0 ["actor" "mbox"])
(get-in stmt-0 ["verb" "id"])
(get-in stmt-0 ["object" "id"])]
(first (rest stmt-seq))))
(is (= [(get stmt-1 "id")
(get-in stmt-1 ["actor" "mbox"])
(get-in stmt-1 ["verb" "id"])
(get-in stmt-1 ["object" "id"])]
(first (rest (rest stmt-seq)))))))
(with-open [writer (java.io.StringWriter.)]
(adp/-get-statements-csv lrs writer hdrs {:ascending true})
(let [stmt-str (str writer)
stmt-seq (csv/read-csv stmt-str)]
(is (not (realized? stmt-seq)))
(is (= ["id" "actor_mbox" "verb_id" "object_id"]
(first stmt-seq)))
(is (= [(get stmt-0 "id")
(get-in stmt-0 ["actor" "mbox"])
(get-in stmt-0 ["verb" "id"])
(get-in stmt-0 ["object" "id"])]
(first (rest stmt-seq))))
(is (= [(get stmt-1 "id")
(get-in stmt-1 ["actor" "mbox"])
(get-in stmt-1 ["verb" "id"])
(get-in stmt-1 ["object" "id"])]
(first (rest (rest stmt-seq))))))))
(testing "CSV Seq - Entire database gets returned beyond `:limit`"
(let [statements (->> #(assoc stmt-0 "id" (str (squuid/generate-squuid)))
(repeatedly 100))]
(lrsp/-store-statements lrs auth-ident statements []))
(with-open [writer (java.io.StringWriter.)]
(adp/-get-statements-csv lrs writer hdrs {})
(let [stmt-str (str writer)
stmt-seq (csv/read-csv stmt-str)]
(is (= 103 (count stmt-seq))))))
(finally (component/stop sys')))))

;; TODO: Add tests for creds with no explicit scopes, once
Expand Down

0 comments on commit 5fce0e3

Please sign in to comment.