diff --git a/public/js/graph.js b/public/js/graph.js index 5880330..0af68bc 100644 --- a/public/js/graph.js +++ b/public/js/graph.js @@ -107,7 +107,15 @@ var plasma = null; draw_node: function(ctx, node, pt) { // determine the box size and round off the coords if we'll be // drawing a text label (awful alignment jitter otherwise...) - var label = "" + (node.data.label || node.name.substring("5, 9")); + var label = ""; + if (node.label) { + label = node.data.label; + } else if (node.name && is_uuid(node.name)) { + label = node.name.substring(5, 9); + } else { + label = node.name + } + var width = ctx.measureText(label).width + 12; var height = 18; pt.x = Math.floor(pt.x) @@ -117,7 +125,7 @@ var plasma = null; var fill = "rgb(40, 40, 40)"; if(plasma.graph.renderer.selected_node == node.name) { fill = "red"; - } + } // Rectangle centered at point pt ctx.save(); @@ -212,18 +220,10 @@ var plasma = null; } var setup_graph = function() { - var model = arbor.ParticleSystem({repulsion: 800, stiffness: 600, friction: 0.7, fps: 2}); + var model = arbor.ParticleSystem({repulsion: 800, stiffness: 600, friction: 0.1, fps: 2}); model.parameters({gravity:true}); // center-gravity to make the graph settle model.renderer = graph_renderer("#graph-view"); - /* - model.addEdge('a','b'); - model.addEdge('a','c'); - model.addEdge('a','d'); - model.addEdge('a','e'); - model.addNode('f', {alone:true, mass:.25}); - */ - // or, equivalently: // // model.graft({ @@ -324,8 +324,9 @@ var plasma = null; node = {id: id, edges: []}; } + node.isExpanded = false; this.graph.addNode(id, node); - this.add_node_edges(node); + //this.add_node_edges(node); }, find_node: function(id, handler) { @@ -341,20 +342,10 @@ var plasma = null; } }, - expand_node: function(id, handler) { - console.log("expanding node: " + id); - this.find_node(id, function(node) { - //jQuery.each(node.edges, function(prop, val) { - // console.log("[expand_node] edge - " + prop + ": " + val); - //}); - if(node) { - plasma.add_node(node.id, node); - - if(handler) { - handler(node); - } - } - }); + expand_node: function(node) { + var edges = node.edges; + node.isExpanded = true; + add_node_edges(node); }, show_node: function(n) { @@ -381,13 +372,12 @@ var plasma = null; plasma.clear_graph(); plasma.add_node(result); plasma.show_node(result); - plasma.expand_node(result); } else if (result.type == "node") { var node = result; plasma.clear_graph(); - plasma.add_node(node.id, node); + var n = plasma.add_node(node.id, node); + n._fixed = true; plasma.show_node(result); - plasma.expand_node(result); } }); }, @@ -396,9 +386,9 @@ var plasma = null; graph_model.renderer.on_node_clicked( function(node) { console.log("clicked: " + node.name); plasma.show_object(node); - if(is_uuid(node.name)) { - plasma.expand_node(node.name, plasma.show_node); - } + if(is_uuid(node.name) && !node.isExpanded) { + client.find_node(node.name, client.expand_node); + }; }); return client; diff --git a/src/benchmark/flood.clj b/src/benchmark/flood.clj new file mode 100644 index 0000000..03799aa --- /dev/null +++ b/src/benchmark/flood.clj @@ -0,0 +1,56 @@ +(ns benchmark.flood + (:use [plasma util core connection peer viz] + [clojure test stacktrace] + test-utils) + (:require [logjam.core :as log] + [plasma.query :as q])) + +(defn- link-proxy + [local remote] + (let [net (first (query local (q/path [:net]))) + p-root (get-node remote ROOT-ID) + p-id (:id p-root) + p-port (:port remote)] + (make-edge net + (make-proxy-node p-id (plasma-url "localhost" p-port)) + :peer))) + +(defn flood-peers [] + (let [peers (make-peers (+ 1 5 25) (+ 2000 (rand-int 10000)) + (fn [i] + (clear-graph) + (make-edge ROOT-ID (make-node {:name :net}) :net) + (make-edge ROOT-ID (make-node {:value i}) :data))) + base (first peers) + neighbors (take 5 (drop 1 peers)) + nneighbors (partition 5 (drop 6 peers))] + (with-peer-graph base + (doseq [p neighbors] + (link-proxy base p))) + (doseq [[p p-peers] (map vector neighbors nneighbors)] + (with-peer-graph p + (doseq [pp p-peers] + (link-proxy p pp)))) + peers)) + +(defn flood-benchmark [] + (let [peers (flood-peers) + base (first peers)] + (try + (println "peers: " (query base (q/path [:net :peer]))) + (println "proxies: " (query base (-> (q/path [p [:net :peer]]) + (q/project 'p :proxy)))) + (println "peer values: " (query base (-> (q/path [p [:net :peer] + d [p :data]]) + (q/project 'd :value)))) + (println "tier two: " (query base (-> (q/path [d [:net :peer :net :peer :data]]) + (q/project 'd :value)))) + (finally + (close-peers peers))))) + +;(comment +(def peers (flood-peers)) +(def base (first peers)) +(close-peers peers) + +) diff --git a/test/benchmark/traversal.clj b/src/benchmark/traversal.clj similarity index 73% rename from test/benchmark/traversal.clj rename to src/benchmark/traversal.clj index 6d0e8ec..6ad1a4e 100644 --- a/test/benchmark/traversal.clj +++ b/src/benchmark/traversal.clj @@ -7,7 +7,7 @@ (defn criss-cross-graph [] (let [root (root-node) - [a b c d e f g h] (repeatedly 8 make-node)] + [a b c d e f g h] (repeatedly 8 #(make-node {:value (rand)}))] (make-edge root a :a) (make-edge root b :a) (make-edge a c :b) @@ -25,7 +25,10 @@ (with-peer-graph p (clear-graph) (criss-cross-graph)) - (query p (q/path [:a :b :c :d :e])) + (query p (-> (q/path [c [:a :b :c] + v [c :d :e]] + (where (> (:value v) 0.1))) + (q/project 'v :value))) (finally (close p))))) diff --git a/src/plasma/operator.clj b/src/plasma/operator.clj index dd36f36..00ed74b 100644 --- a/src/plasma/operator.clj +++ b/src/plasma/operator.clj @@ -82,7 +82,7 @@ (let [recv-op (first (filter #(= :receive (:type %)) (vals (:ops plan)))) new-root (first (:deps recv-op)) new-ops (sub-query-ops plan new-root end-id) - + ; connect a new param op that will start the query at the source of the proxy node param-op (plan-op :parameter :args [start-node]) p-id (:id param-op) @@ -111,7 +111,6 @@ (doseq [[id op] new-ops] (println (trim-id id) ":" (:type op) (:args op))))) (assoc plan - :id (uuid) :root new-root :params {start-node p-id} :ops new-ops))) @@ -125,7 +124,7 @@ [plan end-id start-id url] (let [sub-query (build-sub-query plan start-id end-id) sender (peer-sender url)] - (log/to :op "sending remote-sub-query: " (:id sub-query)) + (log/to :flow "[remote-sub-query]" (:id sub-query)) ;(log/to :op "[remote-sub-query] sub-query: " sub-query) (sender sub-query))) @@ -207,7 +206,7 @@ (throw (Exception. (str "Unsupported predicate type: " (type pred)))))) (defn- visit [s id] - (dosync + (dosync (ensure s) (if (@s id) false @@ -231,16 +230,16 @@ (cond (proxy-node? src-id) (let [proxy (:proxy src-node)] - (log/to :flow "[traverse] proxy:" proxy "[" src-id "]") + (log/format :flow "[traverse] [%s] proxy: %s " (trim-id src-id) proxy) ; Send the remote-sub-query channel to the recv operator (enqueue recv-chan (remote-sub-query plan id src-id proxy))) :default (let [tgts (keys (get-edges src-id edge-pred-fn))] - (log/to :flow "[traverse] " - src-id " - " - edge-predicate " -> " tgts) + (log/format :flow "[traverse] %s - %s -> [%s]" + src-id edge-predicate (apply str (interleave (map trim-id tgts) + (cycle " ")))) (siphon (apply channel (map #(assoc pt id %) tgts)) out))))))))) @@ -432,7 +431,7 @@ (let [left-out (:out left) out (map* (if (empty? props) (fn [pt] (get pt project-key)) - (fn [pt] + (fn [pt] (let [m (get pt (get pt project-key))] (select-keys m props)))) left-out)] diff --git a/src/plasma/query.clj b/src/plasma/query.clj index afc0246..7e0bde6 100644 --- a/src/plasma/query.clj +++ b/src/plasma/query.clj @@ -271,6 +271,7 @@ Missing either a binding or a path operator."))) paths (vec (map vec (partition 2 bindings))) plan {:type :query + :id (uuid) :root nil ; root operator id :params {} ; param-name -> parameter-op :ops {} ; id -> operator @@ -450,6 +451,7 @@ "Convert a query plan into a query execution tree." [plan] {:type :query-tree + :id (:id plan) :ops (build-query plan) :root (:root plan) :params (:params plan)}) @@ -487,8 +489,12 @@ (unless *graph* (throw (Exception. "Cannot run a query without binding a graph. \nFor example:\n\t(with-graph G (query q))\n"))) - (log/to :query "run-query params: " (:params tree) - "\nparam-map: " param-map) + (log/format :flow "[run-query] query: %s + query-params: %s + input-params: %s" + (trim-id (:id tree)) + (keys (:params tree)) + param-map) (doseq [[param-name param-id] (:params tree)] (let [param-val (if (contains? param-map param-name) (get param-map param-name) diff --git a/src/plasma/web.clj b/src/plasma/web.clj index cd15e05..6c5aeeb 100644 --- a/src/plasma/web.clj +++ b/src/plasma/web.clj @@ -1,18 +1,18 @@ (ns plasma.web (:use [ring.middleware file file-info] - [lamina.core :exclude (restart)] [aleph formats http tcp] [plasma core config util connection peer] [clojure.contrib json] [clojure stacktrace]) - (:require [logjam.core :as log])) + (:require [logjam.core :as log] + [lamina.core :as lamina])) ; Remember: must be set in javascript client also. (def WEB-PORT 4242) -(defn- rpc-handler +(defn- web-rpc-handler [p req] - (log/to :web "rpc-handler: " req) + (log/to :web "web-rpc-handler: " req) (let [res (case (:method req) "query" @@ -33,9 +33,9 @@ (try (let [request (read-json msg true) _ (log/to :web "request: " request) - res (rpc-handler p request)] + res (web-rpc-handler p request)] (log/to :web "Result: " res) - (enqueue ch (json-str res))) + (lamina/enqueue ch (json-str res))) (catch Exception e (log/to :web "Request Exception: " e) (log/to :web "Trace: " (with-out-str (print-stack-trace e))))))) @@ -55,10 +55,10 @@ (defn- server [p ch request] (log/to :web "client connect: " (str request)) (if (:websocket request) - (receive-all ch (partial request-handler p ch)) + (lamina/receive-all ch (partial request-handler p ch)) (if-let [sync-response (sync-app request)] - (enqueue ch sync-response) - (enqueue ch {:status 404 :body "Page Not Found"})))) + (lamina/enqueue ch sync-response) + (lamina/enqueue ch {:status 404 :body "Page Not Found"})))) (defn web-interface "Start a web interface for the give peer.