Skip to content

Commit

Permalink
working on flooding and moving benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
rosejn committed Apr 18, 2011
1 parent b418d25 commit b1a852c
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 54 deletions.
54 changes: 22 additions & 32 deletions public/js/graph.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,15 @@ var plasma = null;
draw_node: function(ctx, node, pt) {
// determine the box size and round off the coords if we'll be
// drawing a text label (awful alignment jitter otherwise...)
var label = "" + (node.data.label || node.name.substring("5, 9"));
var label = "";
if (node.label) {
label = node.data.label;
} else if (node.name && is_uuid(node.name)) {
label = node.name.substring(5, 9);
} else {
label = node.name
}

var width = ctx.measureText(label).width + 12;
var height = 18;
pt.x = Math.floor(pt.x)
Expand All @@ -117,7 +125,7 @@ var plasma = null;
var fill = "rgb(40, 40, 40)";
if(plasma.graph.renderer.selected_node == node.name) {
fill = "red";
}
}

// Rectangle centered at point pt
ctx.save();
Expand Down Expand Up @@ -212,18 +220,10 @@ var plasma = null;
}

var setup_graph = function() {
var model = arbor.ParticleSystem({repulsion: 800, stiffness: 600, friction: 0.7, fps: 2});
var model = arbor.ParticleSystem({repulsion: 800, stiffness: 600, friction: 0.1, fps: 2});
model.parameters({gravity:true}); // center-gravity to make the graph settle
model.renderer = graph_renderer("#graph-view");

/*
model.addEdge('a','b');
model.addEdge('a','c');
model.addEdge('a','d');
model.addEdge('a','e');
model.addNode('f', {alone:true, mass:.25});
*/

// or, equivalently:
//
// model.graft({
Expand Down Expand Up @@ -324,8 +324,9 @@ var plasma = null;
node = {id: id, edges: []};
}

node.isExpanded = false;
this.graph.addNode(id, node);
this.add_node_edges(node);
//this.add_node_edges(node);
},

find_node: function(id, handler) {
Expand All @@ -341,20 +342,10 @@ var plasma = null;
}
},

expand_node: function(id, handler) {
console.log("expanding node: " + id);
this.find_node(id, function(node) {
//jQuery.each(node.edges, function(prop, val) {
// console.log("[expand_node] edge - " + prop + ": " + val);
//});
if(node) {
plasma.add_node(node.id, node);

if(handler) {
handler(node);
}
}
});
expand_node: function(node) {
var edges = node.edges;
node.isExpanded = true;
add_node_edges(node);
},

show_node: function(n) {
Expand All @@ -381,13 +372,12 @@ var plasma = null;
plasma.clear_graph();
plasma.add_node(result);
plasma.show_node(result);
plasma.expand_node(result);
} else if (result.type == "node") {
var node = result;
plasma.clear_graph();
plasma.add_node(node.id, node);
var n = plasma.add_node(node.id, node);
n._fixed = true;
plasma.show_node(result);
plasma.expand_node(result);
}
});
},
Expand All @@ -396,9 +386,9 @@ var plasma = null;
graph_model.renderer.on_node_clicked( function(node) {
console.log("clicked: " + node.name);
plasma.show_object(node);
if(is_uuid(node.name)) {
plasma.expand_node(node.name, plasma.show_node);
}
if(is_uuid(node.name) && !node.isExpanded) {
client.find_node(node.name, client.expand_node);
};
});

return client;
Expand Down
56 changes: 56 additions & 0 deletions src/benchmark/flood.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
(ns benchmark.flood
(:use [plasma util core connection peer viz]
[clojure test stacktrace]
test-utils)
(:require [logjam.core :as log]
[plasma.query :as q]))

(defn- link-proxy
[local remote]
(let [net (first (query local (q/path [:net])))
p-root (get-node remote ROOT-ID)
p-id (:id p-root)
p-port (:port remote)]
(make-edge net
(make-proxy-node p-id (plasma-url "localhost" p-port))
:peer)))

(defn flood-peers []
(let [peers (make-peers (+ 1 5 25) (+ 2000 (rand-int 10000))
(fn [i]
(clear-graph)
(make-edge ROOT-ID (make-node {:name :net}) :net)
(make-edge ROOT-ID (make-node {:value i}) :data)))
base (first peers)
neighbors (take 5 (drop 1 peers))
nneighbors (partition 5 (drop 6 peers))]
(with-peer-graph base
(doseq [p neighbors]
(link-proxy base p)))
(doseq [[p p-peers] (map vector neighbors nneighbors)]
(with-peer-graph p
(doseq [pp p-peers]
(link-proxy p pp))))
peers))

(defn flood-benchmark []
(let [peers (flood-peers)
base (first peers)]
(try
(println "peers: " (query base (q/path [:net :peer])))
(println "proxies: " (query base (-> (q/path [p [:net :peer]])
(q/project 'p :proxy))))
(println "peer values: " (query base (-> (q/path [p [:net :peer]
d [p :data]])
(q/project 'd :value))))
(println "tier two: " (query base (-> (q/path [d [:net :peer :net :peer :data]])
(q/project 'd :value))))
(finally
(close-peers peers)))))

;(comment
(def peers (flood-peers))
(def base (first peers))
(close-peers peers)

)
7 changes: 5 additions & 2 deletions test/benchmark/traversal.clj → src/benchmark/traversal.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

(defn criss-cross-graph []
(let [root (root-node)
[a b c d e f g h] (repeatedly 8 make-node)]
[a b c d e f g h] (repeatedly 8 #(make-node {:value (rand)}))]
(make-edge root a :a)
(make-edge root b :a)
(make-edge a c :b)
Expand All @@ -25,7 +25,10 @@
(with-peer-graph p
(clear-graph)
(criss-cross-graph))
(query p (q/path [:a :b :c :d :e]))
(query p (-> (q/path [c [:a :b :c]
v [c :d :e]]
(where (> (:value v) 0.1)))
(q/project 'v :value)))
(finally
(close p)))))

17 changes: 8 additions & 9 deletions src/plasma/operator.clj
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
(let [recv-op (first (filter #(= :receive (:type %)) (vals (:ops plan))))
new-root (first (:deps recv-op))
new-ops (sub-query-ops plan new-root end-id)

; connect a new param op that will start the query at the source of the proxy node
param-op (plan-op :parameter :args [start-node])
p-id (:id param-op)
Expand Down Expand Up @@ -111,7 +111,6 @@
(doseq [[id op] new-ops]
(println (trim-id id) ":" (:type op) (:args op)))))
(assoc plan
:id (uuid)
:root new-root
:params {start-node p-id}
:ops new-ops)))
Expand All @@ -125,7 +124,7 @@
[plan end-id start-id url]
(let [sub-query (build-sub-query plan start-id end-id)
sender (peer-sender url)]
(log/to :op "sending remote-sub-query: " (:id sub-query))
(log/to :flow "[remote-sub-query]" (:id sub-query))
;(log/to :op "[remote-sub-query] sub-query: " sub-query)
(sender sub-query)))

Expand Down Expand Up @@ -207,7 +206,7 @@
(throw (Exception. (str "Unsupported predicate type: " (type pred))))))

(defn- visit [s id]
(dosync
(dosync
(ensure s)
(if (@s id)
false
Expand All @@ -231,16 +230,16 @@
(cond
(proxy-node? src-id)
(let [proxy (:proxy src-node)]
(log/to :flow "[traverse] proxy:" proxy "[" src-id "]")
(log/format :flow "[traverse] [%s] proxy: %s " (trim-id src-id) proxy)
; Send the remote-sub-query channel to the recv operator
(enqueue recv-chan
(remote-sub-query plan id src-id proxy)))

:default
(let [tgts (keys (get-edges src-id edge-pred-fn))]
(log/to :flow "[traverse] "
src-id " - "
edge-predicate " -> " tgts)
(log/format :flow "[traverse] %s - %s -> [%s]"
src-id edge-predicate (apply str (interleave (map trim-id tgts)
(cycle " "))))
(siphon (apply channel (map #(assoc pt id %) tgts))
out)))))))))

Expand Down Expand Up @@ -432,7 +431,7 @@
(let [left-out (:out left)
out (map* (if (empty? props)
(fn [pt] (get pt project-key))
(fn [pt]
(fn [pt]
(let [m (get pt (get pt project-key))]
(select-keys m props))))
left-out)]
Expand Down
10 changes: 8 additions & 2 deletions src/plasma/query.clj
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@
Missing either a binding or a path operator.")))
paths (vec (map vec (partition 2 bindings)))
plan {:type :query
:id (uuid)
:root nil ; root operator id
:params {} ; param-name -> parameter-op
:ops {} ; id -> operator
Expand Down Expand Up @@ -450,6 +451,7 @@
"Convert a query plan into a query execution tree."
[plan]
{:type :query-tree
:id (:id plan)
:ops (build-query plan)
:root (:root plan)
:params (:params plan)})
Expand Down Expand Up @@ -487,8 +489,12 @@
(unless *graph*
(throw (Exception. "Cannot run a query without binding a graph.
\nFor example:\n\t(with-graph G (query q))\n")))
(log/to :query "run-query params: " (:params tree)
"\nparam-map: " param-map)
(log/format :flow "[run-query] query: %s
query-params: %s
input-params: %s"
(trim-id (:id tree))
(keys (:params tree))
param-map)
(doseq [[param-name param-id] (:params tree)]
(let [param-val (if (contains? param-map param-name)
(get param-map param-name)
Expand Down
18 changes: 9 additions & 9 deletions src/plasma/web.clj
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
(ns plasma.web
(:use [ring.middleware file file-info]
[lamina.core :exclude (restart)]
[aleph formats http tcp]
[plasma core config util connection peer]
[clojure.contrib json]
[clojure stacktrace])
(:require [logjam.core :as log]))
(:require [logjam.core :as log]
[lamina.core :as lamina]))

; Remember: must be set in javascript client also.
(def WEB-PORT 4242)

(defn- rpc-handler
(defn- web-rpc-handler
[p req]
(log/to :web "rpc-handler: " req)
(log/to :web "web-rpc-handler: " req)
(let [res
(case (:method req)
"query"
Expand All @@ -33,9 +33,9 @@
(try
(let [request (read-json msg true)
_ (log/to :web "request: " request)
res (rpc-handler p request)]
res (web-rpc-handler p request)]
(log/to :web "Result: " res)
(enqueue ch (json-str res)))
(lamina/enqueue ch (json-str res)))
(catch Exception e
(log/to :web "Request Exception: " e)
(log/to :web "Trace: " (with-out-str (print-stack-trace e)))))))
Expand All @@ -55,10 +55,10 @@
(defn- server [p ch request]
(log/to :web "client connect: " (str request))
(if (:websocket request)
(receive-all ch (partial request-handler p ch))
(lamina/receive-all ch (partial request-handler p ch))
(if-let [sync-response (sync-app request)]
(enqueue ch sync-response)
(enqueue ch {:status 404 :body "Page Not Found"}))))
(lamina/enqueue ch sync-response)
(lamina/enqueue ch {:status 404 :body "Page Not Found"}))))

(defn web-interface
"Start a web interface for the give peer.
Expand Down

0 comments on commit b1a852c

Please sign in to comment.