diff --git a/.gitignore b/.gitignore index 6eae6a5..73aceec 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,9 @@ pom.xml +*.log +db/* *jar lib classes +.cake +ext +test/db diff --git a/src/plasma/operator.clj b/src/plasma/operator.clj index 504348c..fabe893 100644 --- a/src/plasma/operator.clj +++ b/src/plasma/operator.clj @@ -12,121 +12,112 @@ ; a graph node. (log/channel :op :debug) -(log/channel :flow :op) ; to log values flowing through the operator graph - -(def op-branch-map - {:project true - :property true - :aggregate true - :join true - :traverse false - :parameter false - :receive true - :send true - :select true}) - -(defn operator-deps-zip [plan end-op-id] +(log/channel :flow :op) ; log values flowing through the operator graph +(log/channel :close :flow) ; log operators closing their output channels + +(defn- close-log + [op chan] + (on-closed chan #(log/format :close "[%s] closed" op))) + +(defn- flow-log + [op chan] + (receive-all (fork chan) + (fn [pt] + (log/format :flow "[%s] %s" op pt))) + (close-log op chan)) + +(defn plan-op + "Creates a query plan operator node. Takes an operator type, dependent + operator ids, and the operator parameters." + [op & {:keys [deps args]}] + {:type op + :id (uuid) + :deps (vec deps) + :args (vec args)}) + +(defn operator-deps-zip [plan start-id end-id] (let [ops (:ops plan)] (zip/zipper (fn branch? [op-id] - (if (= end-op-id op-id) - false - (get op-branch-map (:type (get ops op-id))))) + (and (not= end-id op-id) + (not (empty? (get-in ops [op-id :deps]))))) (fn children [op-id] - (if (= end-op-id op-id) - [] - (let [op (get ops op-id) - {:keys [type args]} op] - (case type - :property [(first args)] - :project [(first args)] - :aggregate [(first args)] - :join [(second args) (first args)] - :send [(first args)] - :receive [(first args)] - :select [(first args)])))) - - (fn make-op [op args] - (assoc op :args args)) - - (:root plan)))) + (get-in ops [op-id :deps])) + + (fn make-op [op-id deps] + (assoc (get ops op-id) :deps deps)) + + start-id))) (defn sub-query-ops "Returns the operator tree from the root out to the end-id, and no further." - [plan end-node-id] + [plan start-id end-id] (log/to :op "[sub-query-ops] start-plan: " plan) (let [ops (:ops plan)] - (loop [loc (operator-deps-zip plan end-node-id) + (loop [loc (operator-deps-zip plan start-id end-id) sub-query-ops {}] (let [op-id (zip/node loc) op-node (get ops op-id)] - (log/to :op "[sub-query-ops] op-node: " op-node) - (if (or (= end-node-id op-id) - (zip/end? loc)) + (log/to :sub-query "[sub-query-ops] op-node: " op-node) + (if (zip/end? loc) (assoc sub-query-ops op-id op-node) (recur (zip/next loc) (assoc sub-query-ops op-id op-node))))))) -(defn build-sub-query - [plan end-op-id src-node-id] +(defn build-sub-query + "Generates a sub-query plan from the " + [plan start-node end-id] (let [recv-op (first (filter #(= :receive (:type %)) (vals (:ops plan)))) - new-root (first (:args recv-op)) - new-plan (assoc plan :root new-root) - new-ops (sub-query-ops new-plan end-op-id) + new-root (first (:deps recv-op)) + new-ops (sub-query-ops plan new-root end-id) ; connect a send op at the root - s-id (uuid) - send-op {:type :send :id s-id :args [new-root]} + send-op (plan-op :send :deps [new-root]) + s-id (:id send-op) ; connect a new param op that will start the query at the source of the proxy node - p-id (uuid) - param-op {:type :parameter :id p-id :args [src-node-id]} + param-op (plan-op :parameter :args [start-node]) + p-id (:id param-op) ; hook the new param node up to the join that feeds the traversal we need to start at - _ (log/to :op "[build-sub-query] end-op-id: " end-op-id "\nnew-ops: " new-ops) + _ (log/to :sub-query "[build-sub-query] end-id: " end-id "\nnew-ops: " new-ops) end-join-op (first (filter #(and (= :join (:type %)) - (= end-op-id (second (:args %)))) + (= end-id (second (:deps %)))) (vals new-ops))) - _ (log/to :op "[build-sub-query] end-join-op: " end-join-op) + _ (log/to :sub-query "[build-sub-query] end-join-op: " end-join-op) new-join-op (assoc end-join-op - :args [p-id (second (:args end-join-op))]) - _ (log/to :op "[build-sub-query] new-join-op: " new-join-op) + :deps [p-id (second (:deps end-join-op))]) + _ (log/to :sub-query "[build-sub-query] new-join-op: " new-join-op) ; and modify the traverse-op's src-key so it uses the new param node's value - trav-op (get new-ops end-op-id) + trav-op (get new-ops end-id) trav-op (assoc trav-op :args [p-id (second (:args trav-op))]) - _ (log/to :op "[build-sub-query] new-join-op: " new-join-op) + _ (log/to :sub-query "[build-sub-query] new-join-op: " new-join-op) - new-ops (assoc new-ops + new-ops (assoc new-ops (:id new-join-op) new-join-op (:id trav-op) trav-op s-id send-op p-id param-op)] - (assoc new-plan + (assoc plan :root (:id send-op) - :params {src-node-id p-id} + :params {start-node p-id} :type :sub-query :ops new-ops))) -(defn remote-sub-query +(defn remote-sub-query "Generates a sub-query for the given plan, starting at the receive operator and ending at the end-id. The sub-query will begin traversal at the src-node-id. It sends the sub-query to the remote peer, and returns a channel that will receive the stream of path-tuple results from the execution of the sub-query." - [plan end-id src-node-id url] - (let [sub-query (build-sub-query plan end-id src-node-id) + [plan end-id start-id url] + (let [sub-query (build-sub-query plan start-id end-id) sender (peer-sender url)] (log/to :op "[remote-sub-query] sub-query: " sub-query) (sender sub-query))) -(defn- flow-log - [op chan] - (receive-all (fork chan) - (fn [pt] - (log/format :flow "[%s] out: %s" op pt)))) - (defn parameter-op "An operator designed to accept a query parameter. Forwards the parameter value to its output channel and then closes it to signify @@ -134,39 +125,44 @@ [id & [param-name]] (let [in (channel) out (map* (fn [val] {id val}) in)] + (on-closed in #(close out)) (flow-log "parameter" out) - (on-closed in #(do - (log/to :flow "[parameter] closed") - (close out))) + (close-log "parameter" out) {:type :parameter :id id :in in :out out :name param-name})) -; TODO: Need to register remote queries so we can close only after all -; results have been received or a timeout has occurred. (defn receive-op "A receive operator to merge values from local query processing and - remote query results." - [id left in] + remote query results. + + Network receive channels are sent to the remotes channel so we can wire them into + the running query. + " + [id left remotes] (let [out (channel) - left-out (:out left)] + left-out (:out left) + sub-chans (atom []) + all-closed (fn [] + (when (and (closed? left-out) + (every? closed? @sub-chans)) + (close out)))] + ; Wire remote results of sub-queries into the graph + (receive-all remotes + (fn [chan] + (swap! sub-chans conj chan) + (siphon chan out) + (on-closed chan all-closed))) + (siphon left-out out) - (siphon in out) + (on-closed left-out all-closed) (flow-log "receive" out) - ;(on-closed left-out #(do - ; (log/to :op "[receive] closed...") - ; (close out))) - - (on-closed in #(do - (log/to :flow "[receive] closed...") - (close out))) - {:type :receive :id id - :in in + :in remotes :out out})) (defn send-op @@ -189,13 +185,6 @@ :id id :dest dest}) -(defn recv-from - "Forward results from a sub-query result channel to a receive channel. - Could just use siphon, but it's nice to have logging..." - [res-chan recv-chan] - (siphon res-chan recv-chan) - (flow-log "recv-from" recv-chan)) - (defn traverse-op "Uses the src-key to lookup a node ID from each PT in the in queue. For each source node traverse the edges passing the edge-predicate, and put @@ -209,22 +198,24 @@ (receive-all in (fn [pt] (when pt - (let [src-id (get pt src-key)] - (log/to :flow "[traverse] in:" pt) - (log/to :flow "[traverse] src: " src-id " - " edge-predicate " -> " - (count (get-edges src-id edge-pred-fn)) " edges") - (if (proxy-node? src-id) - (do - (log/to :flow "[traverse] proxy-node:" (:proxy (find-node src-id))) - (recv-from - (remote-sub-query plan id src-id (:proxy (find-node src-id))) - recv-chan)) - (let [edges (get-edges src-id edge-pred-fn) - tgts (keys edges) - path-tuples (map #(assoc pt id %) tgts)] - (doseq [path-tuple path-tuples] - (log/to :flow "[traverse] out: " (get path-tuple id)) - (enqueue out path-tuple)))))))) + (let [src-id (get pt src-key) + src-node (find-node src-id)] + (cond + (proxy-node? src-id) + (let [proxy (:proxy src-node)] + (log/to :flow "[traverse] proxy:" proxy) + ; Send the remote-sub-query channel to the recv operator + (enqueue recv-chan + (remote-sub-query plan id src-id proxy))) + + :default + (let [tgts (keys (get-edges src-id edge-pred-fn))] + (log/to :flow "[traverse] " + src-id " - " + edge-predicate " -> " + "<" (count tgts) " matches>") + (siphon (apply channel (map #(assoc pt id %) tgts)) + out))))))) (on-closed in #(do (log/to :op "[traverse] closed") @@ -244,14 +235,10 @@ right-in (:in right) right-out (:out right) out (channel)] - (log/to :op-b "[join] left: " left " right: " right) (siphon left-out right-in) - (on-closed left-out #(close right-in)) - (siphon right-out out) - (on-closed right-out #(do - (log/to :op "[join] closed") - (close out))) + (on-closed left-out #(close right-in)) + (on-closed right-out #(close out)) (flow-log "join" out) {:type :join :id id @@ -272,13 +259,13 @@ out (channel) agg-fn (or agg-fn identity)] (siphon left-out buf) + (flow-log "aggregate in" left-out) (on-closed left-out (fn [] (let [aggregated (agg-fn (channel-seq buf))] (log/to :op "[aggregate] count: " (count aggregated)) (doseq [item aggregated] (enqueue out item))) - (log/to :op "[aggregate] closed") (close out))) (flow-log "aggregate" out) {:type :aggregate @@ -341,12 +328,14 @@ [(apply max-key key-fn arg-seq)])] (aggregate-op left max-fn))) -(defn- do-predicate [props predicate] - (let [prop (get props (:property predicate)) - op (ns-resolve *ns* (:operator predicate)) - result (op prop (:value predicate))] - (log/to :op "[do-predicate] prop: " prop " op: " op "result: " result) - result)) +(def PREDICATE-OPS + {'= = + '== == + 'not= not= + '< < + '> > + '<= <= + '>= >=}) ; Expects a predicate in the form of: ; {:type :predicate @@ -363,15 +352,20 @@ (siphon (filter* (fn [pt] (let [node-id (get pt select-key) - props (get pt node-id)] - (log/format :flow - "[select-op] skey: %s\n\tnode-id: %s\n\tpt: %s\n\tprops: %s\n\tpredicate: %s" - select-key node-id pt props predicate) - (do-predicate props predicate))) + node (get pt node-id) + {:keys [property operator value]} predicate + pval (get node property) + op (get PREDICATE-OPS operator) + result (op pval value)] + (log/format :flow "[select] (%s (%s node) %s) => (%s %s %s) => %s" + operator property value + operator pval value + result) + result)) left-out) out) (on-closed left-out #(close out)) -; (flow-log "select" out) + (flow-log "select" out) {:type :select :id id :select-key select-key @@ -384,23 +378,18 @@ properties for operations like select and sort that rely on property values already being in the PT map." [id left pt-key props] - (log/to :op "[property-op] loader for props: " props) (let [left-out (:out left) out (map* (fn [pt] - (log/to :flow "[property] pt: " pt) - (let [node-id (get pt pt-key) - node (find-node node-id) - _ (log/to :flow "[property] node: " node - " props: " props) - vals (select-keys node props) + (let [node-id (get pt pt-key) + node (find-node node-id) + vals (select-keys node props) existing (get pt node-id) - pt (assoc pt node-id (merge existing vals))] - (log/to :flow "[property] pt out: " pt) + pt (assoc pt + node-id (merge existing vals))] pt)) left-out)] - (on-closed left-out #(do - (log/to :flow "[property] closed...") - (close out))) + (on-closed left-out #(close out)) + (flow-log "property" out) {:type :property :id id :left left @@ -413,24 +402,13 @@ [id left project-key props?] (let [left-out (:out left) out (map* (fn [pt] - (log/to :flow "[project] proj-key: " project-key "\npt: " pt) (let [node-id (get pt project-key)] (if props? (get pt node-id) - node-id))) + node-id))) left-out)] - (on-closed left-out #(do - (log/to :flow "[project] closed...") - (close out))) - - (comment receive-all left-out - (fn [pt] - (when pt - (log/to :op "[project] out: " (get pt project-key)) - (enqueue out (get pt project-key))) - (when (nil? pt) - (log/to :op "[project] got nil!!!!!!!!!!!!!") - (close out)))) + (on-closed left-out #(close out)) + (flow-log "project" out) {:type :project :id id :project-key project-key diff --git a/src/plasma/query.clj b/src/plasma/query.clj index 2715368..7185720 100644 --- a/src/plasma/query.clj +++ b/src/plasma/query.clj @@ -3,10 +3,12 @@ [jiraph graph] [lamina core]) (:require [clojure (zip :as zip)] + [clojure (set :as set)] [logjam.core :as log])) (log/channel :query :debug) (log/channel :optimize :query) +(log/channel :build :query) (defn where-form [body] (let [where? #(and (list? %) (= 'where (first %))) @@ -54,12 +56,6 @@ pred-list)] (assoc plan :filters predicates))) -(defn plan-op - [op & args] - {:type op - :id (uuid) - :args (vec args)}) - (defn- path-start-src "Determines the starting operator for a single path component, returning the start-op and rest of the path." @@ -70,7 +66,8 @@ (cond ; path starting with a keyword means start at the root (keyword? start) - (let [root-op (plan-op :parameter ROOT-ID)] + (let [root-op (plan-op :parameter + :args [ROOT-ID])] [root-op (:id root-op)]) ; starting with a symbol, refers to a previous bind point in the query @@ -83,7 +80,8 @@ ; starting with the UUID of a node starts at that node" (node-exists? :graph start) - (let [root-op (plan-op :parameter start)] + (let [root-op (plan-op :parameter + :args [start])] [root-op (:id root-op)])))) (defn- path-plan @@ -96,9 +94,11 @@ ops {root-id root-op}] (log/to :query "root: " root-id " src: " src-id " path: " path) (if path - (let [trav (plan-op :traverse src-id (first path)) + (let [trav (plan-op :traverse + :args [src-id (first path)]) t-id (:id trav) - join (plan-op :join root-id t-id) + join (plan-op :join + :deps [root-id t-id]) j-id (:id join)] (recur j-id t-id (next path) (assoc ops @@ -121,7 +121,7 @@ ops (merge (:ops plan) path-ops) _ (log/to :query "root-op: " root-op) bind-op (if (= :join (:type root-op)) - (second (:args root-op)) + (second (:deps root-op)) (:id root-op)) plan (assoc-in plan [:pbind bind-name] bind-op) jbind (assoc jbind bind-name (:id root-op))] @@ -154,8 +154,12 @@ (fn [plan [binding pred]] (log/to :query "pred: " pred) (let [select-key (get-in plan [:pbind binding]) - p-op (plan-op :property (:root plan) select-key [(:property pred)]) - s-op (plan-op :select (:id p-op) select-key pred) + p-op (plan-op :property + :deps [(:root plan)] + :args [select-key [(:property pred)]]) + s-op (plan-op :select + :deps [(:id p-op)] + :args [select-key pred]) ops (assoc (:ops plan) (:id p-op) p-op (:id s-op) s-op)] @@ -165,30 +169,13 @@ plan flat-preds))) -(comment defn- projection - "Add the projection operator node to produce the final result nodes for a query." - [plan body] - (log/to :query "projection (body): " body "\n" - (get (:ops plan) (get-in plan [:pbind (last body)]))) - (let [params (:pbind plan) - res-sym (last body) - res-id (if (and (symbol? res-sym) - (contains? params res-sym)) - (get params res-sym) - (second (first params))) - result-op (get (:ops plan) res-id) - _ (log/to :query "res-op: " result-op) - proj-op (plan-op :project (:root plan) res-id) - ops (assoc (:ops plan) (:id proj-op) proj-op)] - (assoc plan - :root (:id proj-op) - :ops ops))) - (defn- load-props [plan bind-sym properties] (let [bind-op (get (:pbind plan) bind-sym) root-op (:root plan) - prop-op (plan-op :property root-op bind-op properties)] + prop-op (plan-op :property + :deps [root-op] + :args [bind-op properties])] (assoc plan :root (:id prop-op) :ops (assoc (:ops plan) (:id prop-op) prop-op)))) @@ -198,23 +185,25 @@ (let [q (path [people [:app :social :friends]])] ... - + ; get the UUIDs of people nodes - (project q 'people) - + (project q 'people) + ; get the given set of props for each person node (project 'people :name :email :age)) - + " [plan bind-sym & properties] (log/to :query "[project] bind-sym: " bind-sym) - (let [props? (not (empty? properties)) + (let [props? (not (empty? properties)) plan (if props? (load-props plan bind-sym properties) plan) bind-op (get (:pbind plan) bind-sym) root-op (:root plan) - proj-op (plan-op :project root-op bind-op props?)] + proj-op (plan-op :project + :deps [root-op] + :args [bind-op props?])] (assoc plan :root (:id proj-op) :ops (assoc (:ops plan) (:id proj-op) proj-op)))) @@ -233,7 +222,8 @@ (defn- plan-receiver [plan] - (let [op (plan-op :receive (:root plan)) + (let [op (plan-op :receive + :deps [(:root plan)]) ops (assoc (:ops plan) (:id op) op)] (assoc plan :ops ops @@ -291,8 +281,12 @@ (let [order (or order :asc) {:keys [root ops]} plan sort-key (get (:pbind plan) sort-var) - p-op (plan-op :property (:root plan) sort-key [sort-prop]) - s-op (plan-op :sort (:id p-op) sort-key sort-prop order) + p-op (plan-op :property + :deps [(:root plan)] + :args [sort-key [sort-prop]]) + s-op (plan-op :sort + :deps [(:id p-op)] + :args [sort-key sort-prop order]) ops (assoc ops (:id p-op) p-op (:id s-op) s-op)] @@ -300,23 +294,17 @@ :root (:id s-op) :ops ops))) -; TODO: Keeping the operator params mixed with the input deps is a mess, change operator -; plan nodes to be more structured (defn downstream-op-node "Find the downstream operator node for the given operator node in the plan." [plan op-id] - (first (filter (fn [op] - (or - (and (= :join (:type op)) - (= op-id (second (:args op)))) - (and (not= :traverse (:type op)) - (= op-id (first (:args op)))))) + (first (filter #(some #{op-id} (:deps %)) (vals (:ops plan))))) (defn replace-input-op "Returns a new operator node with the left input replaced with new-left." - [op old-in new-in] - (assoc-in op [:args (.indexOf (:args op) old-in)] new-in)) + [op old new] + (let [index (.indexOf (:deps op) old)] + (assoc-in op [:deps index] new))) (defn reparent-op "Move an operator node in the plan to be the direct downstream operator from a target op." @@ -324,12 +312,13 @@ (let [ops (:ops plan) ; create new version of moving op with tgt-id as left input op (get ops op-id) - parent-op-id (first (:args op)) + parent-op-id (first (:deps op)) new-op (replace-input-op op parent-op-id tgt-id) _ (log/to :optimize "[reparent-op] new-op: " new-op) ; create new version of op downstream from tgt with moving op-id as replaced input tgt-down (downstream-op-node plan tgt-id) + _ (log/to :optimize "[reparent-op] ***** tgt-down: " tgt-down) tgt-down (replace-input-op tgt-down tgt-id op-id) _ (log/to :optimize "[reparent-op] tgt-down: " tgt-down) @@ -345,7 +334,7 @@ _ (log/to :optimize "[reparent-op] new-op-down: " new-op-down) (assoc new-ops (:id op-down) new-op-down))) plan (if (= (:root plan) op-id) - (assoc plan :ops new-ops :root parent-op-id) + (assoc plan :ops new-ops :root parent-op-id) (assoc plan :ops new-ops))] plan)) @@ -360,20 +349,21 @@ ; Pull property and select ops to just below the join after their associated traversals (let [select-ops (ops-of-type plan :select) selects-up (reduce (fn [mem op] - (let [select-key (second (:args op)) + (let [select-key (first (:args op)) tgt-op (downstream-op-node mem select-key)] (log/to :optimize "[optimize-plan] select-tgt: " tgt-op) (reparent-op mem (:id op) (:id tgt-op)))) plan select-ops) prop-ops (ops-of-type plan :property) -; _ (print-query selects-up) props-up (reduce (fn [mem op] - (let [prop-key (second (:args op)) + (let [prop-key (first (:args op)) tgt-op (downstream-op-node mem prop-key)] (log/to :optimize "[optimize-plan] prop-key: " prop-key " prop-tgt: " tgt-op) - (reparent-op mem (:id op) (:id tgt-op)))) + (if (= (first (:deps op)) (:id tgt-op)) + mem + (reparent-op mem (:id op) (:id tgt-op))))) selects-up prop-ops)] props-up)) @@ -381,8 +371,9 @@ (defn- op-node "Instantiate an operator node based on a query node." [plan ops recv-chan op-node] - (let [{:keys [type id args]} op-node - _ (log/format :op-node "[op-node] type: %s\nid: %s\nargs: %s " type id args) + (let [{:keys [type id deps args]} op-node + deps-ops (map ops deps) + _ (log/format :op-node "[op-node] type: %s\nid: %s\ndeps: %s\nargs: %s " type id deps args) op-name (symbol (str (name type) "-op")) op-fn (ns-resolve 'plasma.operator op-name)] (case type @@ -390,68 +381,71 @@ (apply op-fn id plan recv-chan args) :join - (apply op-fn id (map ops args)) + (apply op-fn id deps-ops) :parameter (apply op-fn id args) :project - (apply op-fn id (get ops (first args)) (rest args)) + (apply op-fn id (first deps-ops) args) :aggregate - (apply op-fn id (map #(get ops %) args)) + (apply op-fn id (first deps-ops) args) :receive - (op-fn id (get ops (first args)) recv-chan) + (op-fn id (first deps-ops) recv-chan) :send - (do - (log/to :query "[send-op] op-node: " (str op-node)) - (op-fn id (get ops (first args)) (second args))) + (apply op-fn id (first deps-ops) args) :select - (let [[left skey pred] args - left (get ops left)] - (op-fn id left skey pred)) + (apply op-fn id (first deps-ops) args) :property - (apply op-fn id (get ops (first args)) (rest args)) + (apply op-fn id (first deps-ops) args) ))) -(defn- ready-op? - "Check whether an operator's child operators have been instantiated if it has children, - to see whether it is ready to be instantiated." - [tree plan op] - (let [root-id ROOT-ID - ops (:ops plan) - child-ids (doall (filter #(and (uuid? %) (contains? ops %)) (:args op)))] - (log/to :op-b "[ready-op?] op: " op " child-ids: " (seq child-ids) "\ntree: " tree) - (if (empty? child-ids) - true - (every? #(contains? tree %) child-ids)))) - -; NOTE: This could be made more efficient by starting at the outer parameter -; nodes and working down to the root, but for now who cares... +(defn- op-dep-list + "Returns a sorted operator dependency list. (Starting at the leaves of the tree + where the operators have no dependencies and iterating in towards the root.)" + [plan] + (let [ops (vals (:ops plan)) + leaves (filter #(empty? (:deps %)) ops)] + (loop [sorted (vec leaves) + others (set/difference (set ops) (set leaves))] + (if (empty? others) + sorted + (let [sort-set (set (map :id sorted)) + next-level (filter #(set/subset? (set (:deps %)) + sort-set) + others) + sorted (concat sorted next-level)] + (recur + sorted + (set/difference others (set sorted)))))))) + (defn- build-query "Iterate over the query operators, instantiating each one after its dependency operators have been instantiated." [plan] - (let [recv-chan (channel)] - (loop [tree {} - ops (set (keys (:ops plan)))] - (if (empty? ops) - tree - (let [_ (log/to :op-b "ops-remaining: " ops) - ; _ (log/to :op-b "tree: " tree) - op-id (first (filter #(ready-op? tree plan (get (:ops plan) %)) ops)) - _ (log/to :op-b "build-query op-id: " op-id) - op (get (:ops plan) op-id) - _ (log/to :op-b "build-query op: " op) - tree (assoc tree (:id op) - (op-node plan tree recv-chan op))] - (if (nil? op) - nil - (recur tree (disj ops op-id)))))))) + (let [recv-chan (channel) + sorted (op-dep-list plan)] +; (log/to :build "\n" (with-out-str (print-query plan))) + (log/to :build "[build-query] sorted: " (seq sorted)) + (reduce + (fn [ops op] + (assoc ops (:id op) + (op-node plan ops recv-chan op))) + {} + sorted))) + +(defn query-tree + "Convert a query plan into a query execution tree." + [plan] + {:type :query-tree + :ops (build-query plan) + :root (:root plan) + :params (:params plan)}) (defn has-projection? "Check whether a query plan contains a project operator." @@ -471,15 +465,6 @@ (first (last (:paths plan))))] (project plan bind-sym)))) -(defn query-tree - "Convert a query plan into a query execution tree." - [plan] - (let [tree (build-query plan)] - {:type :query-tree - :ops tree - :root (:root plan) - :params (:params plan)})) - (defn run-query "Execute a query by feeding parameters into a query operator tree." [tree param-map] @@ -515,7 +500,7 @@ [plan ch] (let [ops (map (fn [op] (if (= :send (:type op)) - (assoc op :args (vec (concat (:args op) [ch]))) + (assoc op :args [ch]) op)) (vals (:ops plan))) ops (reduce (fn [mem op] @@ -534,11 +519,7 @@ [ch plan & [param-map]] (assert (sub-query? plan)) (let [plan (with-send-channel plan ch) -; z (with-out-str (print-query plan)) -; _ (log/to :sub-query "[sub-query]:\n" z) -; plan (optimize-plan plan) -; z (with-out-str (print-query plan)) -; _ (log/to :sub-query "[sub-query]:\n" z) + plan (optimize-plan plan) tree (query-tree plan)] (run-query tree {}))) diff --git a/src/plasma/viz.clj b/src/plasma/viz.clj index 269db2b..3cff254 100644 --- a/src/plasma/viz.clj +++ b/src/plasma/viz.clj @@ -8,27 +8,13 @@ (log/channel :viz :debug) (defn- tree-vecs* [q root] - (let [node (get (:ops q) root) - branch? (get op-branch-map (:type node)) - _ (log/to :viz "type: " (:type node) - "\nbranch: " branch?) - children (if branch? - (case (:type node) - :join (:args node) - (:send - :select - :project - :property - :aggregate - :receive) [(first (:args node))]) - nil) - _ (log/to :viz "children: " children) + (let [{:keys [id type deps args]} (get (:ops q) root) label (case (:type node) - :traverse (str "tr " (second (:args node))) - :parameter (str "pr \"" (first (:args node)) "\"") - (name (:type node))) - label (str label " [" (apply str (take 4 (drop 5 (:id node)))) "]")] - (apply vector label (map (partial tree-vecs* q) children)))) + :traverse (str "tr " (second args)) + :parameter (str "pr \"" (first args) "\"") + (name type)) + label (str label " [" (apply str (take 4 (drop 5 id))) "]")] + (apply vector label (map (partial tree-vecs* q) deps)))) (defn- tree-vecs [q] [(tree-vecs* q (:root q))]) diff --git a/test/plasma/operator_test.clj b/test/plasma/operator_test.clj index a8d0408..f210b2d 100644 --- a/test/plasma/operator_test.clj +++ b/test/plasma/operator_test.clj @@ -12,9 +12,9 @@ p2 (parameter-op id)] (enqueue-and-close (:in p1) 42) (enqueue-and-close (:in p2) ROOT-ID) - (is (= {id 42} + (is (= {id 42} (first (lazy-channel-seq (:out p1))))) - (is (= {id ROOT-ID} + (is (= {id ROOT-ID} (first (lazy-channel-seq (:out p2))))) (is (and (closed? (:out p1)) @@ -204,7 +204,7 @@ (choose-op-test) (send-receive-op-test)) -(defn test-ns-hook +(defn test-ns-hook [] (test-fixture ops-test)) diff --git a/test/plasma/peer_test.clj b/test/plasma/peer_test.clj index 238a6b3..63938e2 100644 --- a/test/plasma/peer_test.clj +++ b/test/plasma/peer_test.clj @@ -78,7 +78,7 @@ port (+ 1000 (rand-int 10000)) local (local-peer "db/local" port) local-p (peer "localhost" port) - peers (doall + peers (doall (map (fn [n] (let [p (local-peer (str "db/peer-" n) (+ port n 1))] @@ -105,7 +105,7 @@ net (node :label :net)] (edge root-id net :label :net) (doseq [[p peer-root n] peers] - (edge net + (edge net (proxy-node peer-root (str "plasma://localhost:" (+ port n 1))) :label :peer)) ; (println "peers: " (query (path [:net :peer]))) @@ -115,11 +115,11 @@ (project 'doc :label :score)) res (peer-query local-p q 4000)] (println "res: " res) - (is (= (* 3 n-peers) (count res)))) + (is (= n-peers (count res)))) (finally ; Ghetto stuff... ; Until Clojure supports recur in a try/finally form - (peer-close local) + (peer-close local) (peer-close (first (nth peers 0))) (peer-close (first (nth peers 1))) (peer-close (first (nth peers 2))) diff --git a/test/plasma/query_test.clj b/test/plasma/query_test.clj index 9aab0e9..8b87422 100644 --- a/test/plasma/query_test.clj +++ b/test/plasma/query_test.clj @@ -1,5 +1,5 @@ (ns plasma.query-test - (:use [plasma core query viz] + (:use [plasma core operator query viz] [clojure test stacktrace] [jiraph graph] lamina.core @@ -53,7 +53,8 @@ (defn append-send-node [plan] (let [{:keys [ops root]} plan - op (plan-op :send root) + op (plan-op :send + :deps [root]) ops (assoc ops (:id op) op) ; add to ops plan (assoc plan :type :sub-query @@ -79,8 +80,8 @@ tree (query-tree plan) optimized (optimize-plan plan) opt-tree (query-tree optimized)] - (print-query plan) - (print-query optimized) + ;(print-query plan) + ;(print-query optimized) (run-query tree {}) (run-query opt-tree {}) (is (= (doall (query-results tree))