diff --git a/examples/bunny/Dockerfile b/examples/bunny/Dockerfile new file mode 100644 index 0000000..39532d5 --- /dev/null +++ b/examples/bunny/Dockerfile @@ -0,0 +1,13 @@ +FROM python:slim + +ENV SHATTERED_APP=logger.py + +COPY requirements.txt /src/requirements.txt + +RUN pip install -r /src/requirements.txt + +COPY . /src + +WORKDIR /src + +CMD ["shattered", "run"] diff --git a/examples/bunny/bunny.html b/examples/bunny/bunny.html new file mode 100644 index 0000000..1a56151 --- /dev/null +++ b/examples/bunny/bunny.html @@ -0,0 +1,144 @@ + + + + + + + + RabbitMQ Web STOMP Examples: Bunny Drawing + + + +

+ RabbitMQ Web STOMP Examples > Bunny Drawing +

+ + + + diff --git a/examples/bunny/bunny.png b/examples/bunny/bunny.png new file mode 100644 index 0000000..6c2284b Binary files /dev/null and b/examples/bunny/bunny.png differ diff --git a/examples/bunny/docker-compose.yml b/examples/bunny/docker-compose.yml new file mode 100644 index 0000000..a89b8c9 --- /dev/null +++ b/examples/bunny/docker-compose.yml @@ -0,0 +1,30 @@ +version: "2" +services: + logger: + build: . + depends_on: + - rabbitmq + volumes: + - .:/src + command: + [ + "./wait-for-it.sh", + "-t", + "300", + "rabbitmq:61613", + "--", + "shattered", + "run", + ] + bunny: + image: python:slim + working_dir: /src + volumes: + - .:/src + ports: + - 8000:8000 + command: ["python", "-m", "http.server"] + rabbitmq: + build: ../../rabbitmq + ports: + - 15674:15674 diff --git a/examples/bunny/echo.html b/examples/bunny/echo.html new file mode 100644 index 0000000..cf81ffa --- /dev/null +++ b/examples/bunny/echo.html @@ -0,0 +1,111 @@ + + + + + + + RabbitMQ Web STOMP Examples : Echo Server + + + +

RabbitMQ Web STOMP Examples > Echo Server

+ +
+

Received

+
+
+
+ +
+

Logs

+
+
+ + + + diff --git a/examples/bunny/index.html b/examples/bunny/index.html new file mode 100644 index 0000000..76e1279 --- /dev/null +++ b/examples/bunny/index.html @@ -0,0 +1,16 @@ + + + + + RabbitMQ Web STOMP Examples + + + +

RabbitMQ Web STOMP Examples

+ + + diff --git a/examples/bunny/logger.py b/examples/bunny/logger.py new file mode 100644 index 0000000..def5462 --- /dev/null +++ b/examples/bunny/logger.py @@ -0,0 +1,15 @@ +import logging + +from shattered import Shattered + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +shattered_app = Shattered(host="rabbitmq") + + +@shattered_app.subscribe("/queue/test") +@shattered_app.subscribe("/topic/test") +@shattered_app.subscribe("/topic/bunny") +def echo(headers, body, conn): + logger.info(body) diff --git a/examples/bunny/main.css b/examples/bunny/main.css new file mode 100644 index 0000000..f2d1bb5 --- /dev/null +++ b/examples/bunny/main.css @@ -0,0 +1,40 @@ +body { + font-family: "Arial"; + color: #444; +} + +h1, +h2 { + color: #f60; + font-weight: normal; +} + +h1 { + font-size: 1.5em; +} + +h2 { + font-size: 1.2em; + margin: 0; +} + +a { + color: #f60; + border: 1px solid #fda; + background: #fff0e0; + border-radius: 3px; + -moz-border-radius: 3px; + padding: 2px; + text-decoration: none; + /* font-weight: bold; */ +} + +ul.menu { + list-style-type: none; + padding: 0; + margin: 0; +} + +ul.menu li { + padding: 5px 0; +} diff --git a/examples/bunny/pencil.cur b/examples/bunny/pencil.cur new file mode 100755 index 0000000..a3e3598 Binary files /dev/null and b/examples/bunny/pencil.cur differ diff --git a/examples/bunny/requirements.txt b/examples/bunny/requirements.txt new file mode 100644 index 0000000..a29b3da --- /dev/null +++ b/examples/bunny/requirements.txt @@ -0,0 +1 @@ +shattered diff --git a/examples/bunny/stomp.js b/examples/bunny/stomp.js new file mode 100644 index 0000000..6e35f11 --- /dev/null +++ b/examples/bunny/stomp.js @@ -0,0 +1,607 @@ +// Generated by CoffeeScript 1.7.1 + +/* + Stomp Over WebSocket http://jmesnil.net/stomp-websocket/doc/ | Apache License V2.0 + + Copyright (C) 2010-2013 [Jeff Mesnil](https://jmesnil.net/) + Copyright (C) 2012 [FuseSource, Inc.](https://fusesource.com) + */ + +(function() { + var Byte, + Client, + Frame, + Stomp, + __hasProp = {}.hasOwnProperty, + __slice = [].slice; + + Byte = { + LF: "\x0A", + NULL: "\x00" + }; + + Frame = (function() { + var unmarshallSingle; + + function Frame(command, headers, body) { + this.command = command; + this.headers = headers != null ? headers : {}; + this.body = body != null ? body : ""; + } + + Frame.prototype.toString = function() { + var lines, name, skipContentLength, value, _ref; + lines = [this.command]; + skipContentLength = + this.headers["content-length"] === false ? true : false; + if (skipContentLength) { + delete this.headers["content-length"]; + } + _ref = this.headers; + for (name in _ref) { + if (!__hasProp.call(_ref, name)) continue; + value = _ref[name]; + lines.push("" + name + ":" + value); + } + if (this.body && !skipContentLength) { + lines.push("content-length:" + Frame.sizeOfUTF8(this.body)); + } + lines.push(Byte.LF + this.body); + return lines.join(Byte.LF); + }; + + Frame.sizeOfUTF8 = function(s) { + if (s) { + return encodeURI(s).match(/%..|./g).length; + } else { + return 0; + } + }; + + unmarshallSingle = function(data) { + var body, + chr, + command, + divider, + headerLines, + headers, + i, + idx, + len, + line, + start, + trim, + _i, + _j, + _len, + _ref, + _ref1; + divider = data.search(RegExp("" + Byte.LF + Byte.LF)); + headerLines = data.substring(0, divider).split(Byte.LF); + command = headerLines.shift(); + headers = {}; + trim = function(str) { + return str.replace(/^\s+|\s+$/g, ""); + }; + _ref = headerLines.reverse(); + for (_i = 0, _len = _ref.length; _i < _len; _i++) { + line = _ref[_i]; + idx = line.indexOf(":"); + headers[trim(line.substring(0, idx))] = trim(line.substring(idx + 1)); + } + body = ""; + start = divider + 2; + if (headers["content-length"]) { + len = parseInt(headers["content-length"]); + body = ("" + data).substring(start, start + len); + } else { + chr = null; + for ( + i = _j = start, _ref1 = data.length; + start <= _ref1 ? _j < _ref1 : _j > _ref1; + i = start <= _ref1 ? ++_j : --_j + ) { + chr = data.charAt(i); + if (chr === Byte.NULL) { + break; + } + body += chr; + } + } + return new Frame(command, headers, body); + }; + + Frame.unmarshall = function(datas) { + var frame, frames, last_frame, r; + frames = datas.split(RegExp("" + Byte.NULL + Byte.LF + "*")); + r = { + frames: [], + partial: "" + }; + r.frames = (function() { + var _i, _len, _ref, _results; + _ref = frames.slice(0, -1); + _results = []; + for (_i = 0, _len = _ref.length; _i < _len; _i++) { + frame = _ref[_i]; + _results.push(unmarshallSingle(frame)); + } + return _results; + })(); + last_frame = frames.slice(-1)[0]; + if ( + last_frame === Byte.LF || + last_frame.search(RegExp("" + Byte.NULL + Byte.LF + "*$")) !== -1 + ) { + r.frames.push(unmarshallSingle(last_frame)); + } else { + r.partial = last_frame; + } + return r; + }; + + Frame.marshall = function(command, headers, body) { + var frame; + frame = new Frame(command, headers, body); + return frame.toString() + Byte.NULL; + }; + + return Frame; + })(); + + Client = (function() { + var now; + + function Client(ws) { + this.ws = ws; + this.ws.binaryType = "arraybuffer"; + this.counter = 0; + this.connected = false; + this.heartbeat = { + outgoing: 10000, + incoming: 10000 + }; + this.maxWebSocketFrameSize = 16 * 1024; + this.subscriptions = {}; + this.partialData = ""; + } + + Client.prototype.debug = function(message) { + var _ref; + return typeof window !== "undefined" && window !== null + ? (_ref = window.console) != null + ? _ref.log(message) + : void 0 + : void 0; + }; + + now = function() { + if (Date.now) { + return Date.now(); + } else { + return new Date().valueOf; + } + }; + + Client.prototype._transmit = function(command, headers, body) { + var out; + out = Frame.marshall(command, headers, body); + if (typeof this.debug === "function") { + this.debug(">>> " + out); + } + while (true) { + if (out.length > this.maxWebSocketFrameSize) { + this.ws.send(out.substring(0, this.maxWebSocketFrameSize)); + out = out.substring(this.maxWebSocketFrameSize); + if (typeof this.debug === "function") { + this.debug("remaining = " + out.length); + } + } else { + return this.ws.send(out); + } + } + }; + + Client.prototype._setupHeartbeat = function(headers) { + var serverIncoming, serverOutgoing, ttl, v, _ref, _ref1; + if ( + (_ref = headers.version) !== Stomp.VERSIONS.V1_1 && + _ref !== Stomp.VERSIONS.V1_2 + ) { + return; + } + (_ref1 = (function() { + var _i, _len, _ref1, _results; + _ref1 = headers["heart-beat"].split(","); + _results = []; + for (_i = 0, _len = _ref1.length; _i < _len; _i++) { + v = _ref1[_i]; + _results.push(parseInt(v)); + } + return _results; + })()), + (serverOutgoing = _ref1[0]), + (serverIncoming = _ref1[1]); + if (!(this.heartbeat.outgoing === 0 || serverIncoming === 0)) { + ttl = Math.max(this.heartbeat.outgoing, serverIncoming); + if (typeof this.debug === "function") { + this.debug("send PING every " + ttl + "ms"); + } + this.pinger = Stomp.setInterval( + ttl, + (function(_this) { + return function() { + _this.ws.send(Byte.LF); + return typeof _this.debug === "function" + ? _this.debug(">>> PING") + : void 0; + }; + })(this) + ); + } + if (!(this.heartbeat.incoming === 0 || serverOutgoing === 0)) { + ttl = Math.max(this.heartbeat.incoming, serverOutgoing); + if (typeof this.debug === "function") { + this.debug("check PONG every " + ttl + "ms"); + } + return (this.ponger = Stomp.setInterval( + ttl, + (function(_this) { + return function() { + var delta; + delta = now() - _this.serverActivity; + if (delta > ttl * 2) { + if (typeof _this.debug === "function") { + _this.debug( + "did not receive server activity for the last " + + delta + + "ms" + ); + } + return _this.ws.close(); + } + }; + })(this) + )); + } + }; + + Client.prototype._parseConnect = function() { + var args, connectCallback, errorCallback, headers; + args = 1 <= arguments.length ? __slice.call(arguments, 0) : []; + headers = {}; + switch (args.length) { + case 2: + (headers = args[0]), (connectCallback = args[1]); + break; + case 3: + if (args[1] instanceof Function) { + (headers = args[0]), + (connectCallback = args[1]), + (errorCallback = args[2]); + } else { + (headers.login = args[0]), + (headers.passcode = args[1]), + (connectCallback = args[2]); + } + break; + case 4: + (headers.login = args[0]), + (headers.passcode = args[1]), + (connectCallback = args[2]), + (errorCallback = args[3]); + break; + default: + (headers.login = args[0]), + (headers.passcode = args[1]), + (connectCallback = args[2]), + (errorCallback = args[3]), + (headers.host = args[4]); + } + return [headers, connectCallback, errorCallback]; + }; + + Client.prototype.connect = function() { + var args, errorCallback, headers, out; + args = 1 <= arguments.length ? __slice.call(arguments, 0) : []; + out = this._parseConnect.apply(this, args); + (headers = out[0]), + (this.connectCallback = out[1]), + (errorCallback = out[2]); + if (typeof this.debug === "function") { + this.debug("Opening Web Socket..."); + } + this.ws.onmessage = (function(_this) { + return function(evt) { + var arr, + c, + client, + data, + frame, + messageID, + onreceive, + subscription, + unmarshalledData, + _i, + _len, + _ref, + _results; + data = + typeof ArrayBuffer !== "undefined" && + evt.data instanceof ArrayBuffer + ? ((arr = new Uint8Array(evt.data)), + typeof _this.debug === "function" + ? _this.debug("--- got data length: " + arr.length) + : void 0, + (function() { + var _i, _len, _results; + _results = []; + for (_i = 0, _len = arr.length; _i < _len; _i++) { + c = arr[_i]; + _results.push(String.fromCharCode(c)); + } + return _results; + })().join("")) + : evt.data; + _this.serverActivity = now(); + if (data === Byte.LF) { + if (typeof _this.debug === "function") { + _this.debug("<<< PONG"); + } + return; + } + if (typeof _this.debug === "function") { + _this.debug("<<< " + data); + } + unmarshalledData = Frame.unmarshall(_this.partialData + data); + _this.partialData = unmarshalledData.partial; + _ref = unmarshalledData.frames; + _results = []; + for (_i = 0, _len = _ref.length; _i < _len; _i++) { + frame = _ref[_i]; + switch (frame.command) { + case "CONNECTED": + if (typeof _this.debug === "function") { + _this.debug("connected to server " + frame.headers.server); + } + _this.connected = true; + _this._setupHeartbeat(frame.headers); + _results.push( + typeof _this.connectCallback === "function" + ? _this.connectCallback(frame) + : void 0 + ); + break; + case "MESSAGE": + subscription = frame.headers.subscription; + onreceive = + _this.subscriptions[subscription] || _this.onreceive; + if (onreceive) { + client = _this; + messageID = frame.headers["message-id"]; + frame.ack = function(headers) { + if (headers == null) { + headers = {}; + } + return client.ack(messageID, subscription, headers); + }; + frame.nack = function(headers) { + if (headers == null) { + headers = {}; + } + return client.nack(messageID, subscription, headers); + }; + _results.push(onreceive(frame)); + } else { + _results.push( + typeof _this.debug === "function" + ? _this.debug("Unhandled received MESSAGE: " + frame) + : void 0 + ); + } + break; + case "RECEIPT": + _results.push( + typeof _this.onreceipt === "function" + ? _this.onreceipt(frame) + : void 0 + ); + break; + case "ERROR": + _results.push( + typeof errorCallback === "function" + ? errorCallback(frame) + : void 0 + ); + break; + default: + _results.push( + typeof _this.debug === "function" + ? _this.debug("Unhandled frame: " + frame) + : void 0 + ); + } + } + return _results; + }; + })(this); + this.ws.onclose = (function(_this) { + return function() { + var msg; + msg = "Whoops! Lost connection to " + _this.ws.url; + if (typeof _this.debug === "function") { + _this.debug(msg); + } + _this._cleanUp(); + return typeof errorCallback === "function" + ? errorCallback(msg) + : void 0; + }; + })(this); + return (this.ws.onopen = (function(_this) { + return function() { + if (typeof _this.debug === "function") { + _this.debug("Web Socket Opened..."); + } + headers["accept-version"] = Stomp.VERSIONS.supportedVersions(); + headers["heart-beat"] = [ + _this.heartbeat.outgoing, + _this.heartbeat.incoming + ].join(","); + return _this._transmit("CONNECT", headers); + }; + })(this)); + }; + + Client.prototype.disconnect = function(disconnectCallback, headers) { + if (headers == null) { + headers = {}; + } + this._transmit("DISCONNECT", headers); + this.ws.onclose = null; + this.ws.close(); + this._cleanUp(); + return typeof disconnectCallback === "function" + ? disconnectCallback() + : void 0; + }; + + Client.prototype._cleanUp = function() { + this.connected = false; + if (this.pinger) { + Stomp.clearInterval(this.pinger); + } + if (this.ponger) { + return Stomp.clearInterval(this.ponger); + } + }; + + Client.prototype.send = function(destination, headers, body) { + if (headers == null) { + headers = {}; + } + if (body == null) { + body = ""; + } + headers.destination = destination; + return this._transmit("SEND", headers, body); + }; + + Client.prototype.subscribe = function(destination, callback, headers) { + var client; + if (headers == null) { + headers = {}; + } + if (!headers.id) { + headers.id = "sub-" + this.counter++; + } + headers.destination = destination; + this.subscriptions[headers.id] = callback; + this._transmit("SUBSCRIBE", headers); + client = this; + return { + id: headers.id, + unsubscribe: function() { + return client.unsubscribe(headers.id); + } + }; + }; + + Client.prototype.unsubscribe = function(id) { + delete this.subscriptions[id]; + return this._transmit("UNSUBSCRIBE", { + id: id + }); + }; + + Client.prototype.begin = function(transaction) { + var client, txid; + txid = transaction || "tx-" + this.counter++; + this._transmit("BEGIN", { + transaction: txid + }); + client = this; + return { + id: txid, + commit: function() { + return client.commit(txid); + }, + abort: function() { + return client.abort(txid); + } + }; + }; + + Client.prototype.commit = function(transaction) { + return this._transmit("COMMIT", { + transaction: transaction + }); + }; + + Client.prototype.abort = function(transaction) { + return this._transmit("ABORT", { + transaction: transaction + }); + }; + + Client.prototype.ack = function(messageID, subscription, headers) { + if (headers == null) { + headers = {}; + } + headers["message-id"] = messageID; + headers.subscription = subscription; + return this._transmit("ACK", headers); + }; + + Client.prototype.nack = function(messageID, subscription, headers) { + if (headers == null) { + headers = {}; + } + headers["message-id"] = messageID; + headers.subscription = subscription; + return this._transmit("NACK", headers); + }; + + return Client; + })(); + + Stomp = { + VERSIONS: { + V1_0: "1.0", + V1_1: "1.1", + V1_2: "1.2", + supportedVersions: function() { + return "1.1,1.0"; + } + }, + client: function(url, protocols) { + var klass, ws; + if (protocols == null) { + protocols = ["v10.stomp", "v11.stomp"]; + } + klass = Stomp.WebSocketClass || WebSocket; + ws = new klass(url, protocols); + return new Client(ws); + }, + over: function(ws) { + return new Client(ws); + }, + Frame: Frame + }; + + if (typeof exports !== "undefined" && exports !== null) { + exports.Stomp = Stomp; + } + + if (typeof window !== "undefined" && window !== null) { + Stomp.setInterval = function(interval, f) { + return window.setInterval(f, interval); + }; + Stomp.clearInterval = function(id) { + return window.clearInterval(id); + }; + window.Stomp = Stomp; + } else if (!exports) { + self.Stomp = Stomp; + } +}.call(this)); diff --git a/examples/bunny/temp-queue.html b/examples/bunny/temp-queue.html new file mode 100644 index 0000000..244eb15 --- /dev/null +++ b/examples/bunny/temp-queue.html @@ -0,0 +1,122 @@ + + + + + + + RabbitMQ Web STOMP Examples : Temporary Queue + + + +

+ RabbitMQ Web STOMP Examples > Temporary Queue +

+ +

+ When you type text in the form's input, the application will send a + message to the /queue/test destination with the + reply-to header set to /temp-queue/foo. +

+

+ The STOMP client sets a default onreceive callback to receive + messages from this temporary queue and display the message's text. +

+

+ Finally, the client subscribes to the + /queue/test destination. When it receives message from this + destination, it reverses the message's text and reply by sending the + reversed text to the destination defined by the message's + reply-to header. +

+ +
+

Received

+
+
+
+ +
+

Logs

+
+
+ + + + diff --git a/examples/bunny/wait-for-it.sh b/examples/bunny/wait-for-it.sh new file mode 100755 index 0000000..071c2be --- /dev/null +++ b/examples/bunny/wait-for-it.sh @@ -0,0 +1,178 @@ +#!/usr/bin/env bash +# Use this script to test if a given TCP host/port are available + +WAITFORIT_cmdname=${0##*/} + +echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } + +usage() +{ + cat << USAGE >&2 +Usage: + $WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args] + -h HOST | --host=HOST Host or IP under test + -p PORT | --port=PORT TCP port under test + Alternatively, you specify the host and port as host:port + -s | --strict Only execute subcommand if the test succeeds + -q | --quiet Don't output any status messages + -t TIMEOUT | --timeout=TIMEOUT + Timeout in seconds, zero for no timeout + -- COMMAND ARGS Execute command with args after the test finishes +USAGE + exit 1 +} + +wait_for() +{ + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" + else + echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout" + fi + WAITFORIT_start_ts=$(date +%s) + while : + do + if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then + nc -z $WAITFORIT_HOST $WAITFORIT_PORT + WAITFORIT_result=$? + else + (echo > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1 + WAITFORIT_result=$? + fi + if [[ $WAITFORIT_result -eq 0 ]]; then + WAITFORIT_end_ts=$(date +%s) + echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds" + break + fi + sleep 1 + done + return $WAITFORIT_result +} + +wait_for_wrapper() +{ + # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 + if [[ $WAITFORIT_QUIET -eq 1 ]]; then + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & + else + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & + fi + WAITFORIT_PID=$! + trap "kill -INT -$WAITFORIT_PID" INT + wait $WAITFORIT_PID + WAITFORIT_RESULT=$? + if [[ $WAITFORIT_RESULT -ne 0 ]]; then + echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" + fi + return $WAITFORIT_RESULT +} + +# process arguments +while [[ $# -gt 0 ]] +do + case "$1" in + *:* ) + WAITFORIT_hostport=(${1//:/ }) + WAITFORIT_HOST=${WAITFORIT_hostport[0]} + WAITFORIT_PORT=${WAITFORIT_hostport[1]} + shift 1 + ;; + --child) + WAITFORIT_CHILD=1 + shift 1 + ;; + -q | --quiet) + WAITFORIT_QUIET=1 + shift 1 + ;; + -s | --strict) + WAITFORIT_STRICT=1 + shift 1 + ;; + -h) + WAITFORIT_HOST="$2" + if [[ $WAITFORIT_HOST == "" ]]; then break; fi + shift 2 + ;; + --host=*) + WAITFORIT_HOST="${1#*=}" + shift 1 + ;; + -p) + WAITFORIT_PORT="$2" + if [[ $WAITFORIT_PORT == "" ]]; then break; fi + shift 2 + ;; + --port=*) + WAITFORIT_PORT="${1#*=}" + shift 1 + ;; + -t) + WAITFORIT_TIMEOUT="$2" + if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi + shift 2 + ;; + --timeout=*) + WAITFORIT_TIMEOUT="${1#*=}" + shift 1 + ;; + --) + shift + WAITFORIT_CLI=("$@") + break + ;; + --help) + usage + ;; + *) + echoerr "Unknown argument: $1" + usage + ;; + esac +done + +if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then + echoerr "Error: you need to provide a host and port to test." + usage +fi + +WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15} +WAITFORIT_STRICT=${WAITFORIT_STRICT:-0} +WAITFORIT_CHILD=${WAITFORIT_CHILD:-0} +WAITFORIT_QUIET=${WAITFORIT_QUIET:-0} + +# check to see if timeout is from busybox? +WAITFORIT_TIMEOUT_PATH=$(type -p timeout) +WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH) +if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then + WAITFORIT_ISBUSY=1 + WAITFORIT_BUSYTIMEFLAG="-t" + +else + WAITFORIT_ISBUSY=0 + WAITFORIT_BUSYTIMEFLAG="" +fi + +if [[ $WAITFORIT_CHILD -gt 0 ]]; then + wait_for + WAITFORIT_RESULT=$? + exit $WAITFORIT_RESULT +else + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + wait_for_wrapper + WAITFORIT_RESULT=$? + else + wait_for + WAITFORIT_RESULT=$? + fi +fi + +if [[ $WAITFORIT_CLI != "" ]]; then + if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then + echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess" + exit $WAITFORIT_RESULT + fi + exec "${WAITFORIT_CLI[@]}" +else + exit $WAITFORIT_RESULT +fi diff --git a/pyproject.toml b/pyproject.toml index 6b705a5..2a49a4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "shattered" -version = "0.3.0" +version = "0.4.0" description = "STOMP meets bottle.py" authors = ["Jimmy Bradshaw "] repository = "https://github.com/bradshjg/shattered" diff --git a/src/shattered/__init__.py b/src/shattered/__init__.py index 831105d..e7b791f 100644 --- a/src/shattered/__init__.py +++ b/src/shattered/__init__.py @@ -1,3 +1,3 @@ -__version__ = "0.3.0" +__version__ = "0.4.0" from shattered.shattered import Shattered diff --git a/src/shattered/shattered.py b/src/shattered/shattered.py index c5884b3..c4cc9e6 100644 --- a/src/shattered/shattered.py +++ b/src/shattered/shattered.py @@ -9,13 +9,16 @@ class ShatteredListener(stomp.ConnectionListener): + sub_id = 1 + def __init__(self, app): self.app = app def on_connected(self, headers, body): logger.info("STOMP connection established") for destination in self.app.subscriptions: - self.app.conn.subscribe(destination, 1) + self.app.conn.subscribe(destination, self.sub_id) + self.sub_id += 1 def on_message(self, headers, body): logger.info("STOMP message received") diff --git a/tests/test_shattered.py b/tests/test_shattered.py index 100303d..79181c0 100644 --- a/tests/test_shattered.py +++ b/tests/test_shattered.py @@ -68,7 +68,7 @@ def test_listener_on_connected_subscribes(app, mocker): app._run() assert app.conn.subscribe.mock_calls == [] app.listener.on_connected({}, "") - assert app.conn.subscribe.mock_calls == [call("foo", 1), call("bar", 1)] + assert app.conn.subscribe.mock_calls == [call("foo", 1), call("bar", 2)] def test_listener_on_message(app, mocker):