diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5c6c71e --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.gradle +.idea +build +gen +*.iml +local.properties diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..9405f3f --- /dev/null +++ b/build.gradle @@ -0,0 +1,19 @@ +// Top-level build file where you can add configuration options common to all sub-projects/modules. + +buildscript { + repositories { + jcenter() + } + dependencies { + classpath 'com.android.tools.build:gradle:1.2.3' + + // NOTE: Do not place your application dependencies here; they belong + // in the individual module build.gradle files + } +} + +allprojects { + repositories { + jcenter() + } +} diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..8c0fb64 Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..0c71e76 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Wed Apr 10 15:27:10 PDT 2013 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-2.2.1-all.zip diff --git a/gradlew b/gradlew new file mode 100755 index 0000000..91a7e26 --- /dev/null +++ b/gradlew @@ -0,0 +1,164 @@ +#!/usr/bin/env bash + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn ( ) { + echo "$*" +} + +die ( ) { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; +esac + +# For Cygwin, ensure paths are in UNIX format before anything is touched. +if $cygwin ; then + [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` +fi + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >&- +APP_HOME="`pwd -P`" +cd "$SAVED" >&- + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules +function splitJvmOpts() { + JVM_OPTS=("$@") +} +eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS +JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" + +exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..aec9973 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,90 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windowz variants + +if not "%OS%" == "Windows_NT" goto win9xME_args +if "%@eval[2+2]" == "4" goto 4NT_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* +goto execute + +:4NT_args +@rem Get arguments from the 4NT Shell from JP Software +set CMD_LINE_ARGS=%$ + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/lib/.gitignore b/lib/.gitignore new file mode 100644 index 0000000..748cfc0 --- /dev/null +++ b/lib/.gitignore @@ -0,0 +1,2 @@ +build +*.iml diff --git a/lib/build.gradle b/lib/build.gradle new file mode 100644 index 0000000..ad2baf1 --- /dev/null +++ b/lib/build.gradle @@ -0,0 +1,23 @@ +apply plugin: 'com.android.library' + +android { + compileSdkVersion 22 + buildToolsVersion "22.0.1" + + defaultConfig { + minSdkVersion 10 + targetSdkVersion 22 + } + + buildTypes { + release { + minifyEnabled false + proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.txt' + } + } +} + +dependencies { + compile 'org.codehaus.jackson:jackson-core-asl:1.9.13' + compile 'org.codehaus.jackson:jackson-mapper-asl:1.9.13' +} diff --git a/lib/proguard-project.txt b/lib/proguard-project.txt new file mode 100644 index 0000000..f2fe155 --- /dev/null +++ b/lib/proguard-project.txt @@ -0,0 +1,20 @@ +# To enable ProGuard in your project, edit project.properties +# to define the proguard.config property as described in that file. +# +# Add project specific ProGuard rules here. +# By default, the flags in this file are appended to flags specified +# in ${sdk.dir}/tools/proguard/proguard-android.txt +# You can edit the include path and order by changing the ProGuard +# include property in project.properties. +# +# For more details, see +# http://developer.android.com/guide/developing/tools/proguard.html + +# Add any project specific keep options here: + +# If your project uses WebView with JS, uncomment the following +# and specify the fully qualified class name to the JavaScript interface +# class: +#-keepclassmembers class fqcn.of.javascript.interface.for.webview { +# public *; +#} diff --git a/lib/src/main/AndroidManifest.xml b/lib/src/main/AndroidManifest.xml new file mode 100644 index 0000000..1b6a5c4 --- /dev/null +++ b/lib/src/main/AndroidManifest.xml @@ -0,0 +1 @@ + diff --git a/lib/src/main/java/de/tavendo/autobahn/ByteBufferInputStream.java b/lib/src/main/java/de/tavendo/autobahn/ByteBufferInputStream.java new file mode 100644 index 0000000..780293d --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/ByteBufferInputStream.java @@ -0,0 +1,90 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package de.tavendo.autobahn; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * InputStream wrapping a ByteBuffer. This class can be used i.e. to wrap + * ByteBuffers allocated direct in NIO for socket I/O. The class does not + * allocate ByteBuffers itself, but assumes the user has already one that + * just needs to be wrapped to use with InputStream based processing. + */ +public class ByteBufferInputStream extends InputStream { + + /// ByteBuffer backing this input stream. + private final ByteBuffer mBuffer; + + /** + * Create input stream over ByteBuffer. + * + * @param buffer ByteBuffer to wrap as input stream. + */ + public ByteBufferInputStream(ByteBuffer buffer) { + mBuffer = buffer; + } + + /** + * Read one byte from input stream and advance. + * + * @return Byte read or -1 when stream end reached. + */ + @Override + public synchronized int read() throws IOException { + + if (!mBuffer.hasRemaining()) { + return -1; + } else { + return mBuffer.get() & 0xFF; + } + } + + /** + * Read chunk of bytes from input stream and advance. Read either as many + * bytes specified or input stream end reached. + * + * @param bytes Read bytes into byte array. + * @param off Read bytes into byte array beginning at this offset. + * @param len Read at most this many bytes. + * @return Actual number of bytes read. + */ + @Override + public synchronized int read(byte[] bytes, int off, int len) + throws IOException { + + if (bytes == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > bytes.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + int length = Math.min(mBuffer.remaining(), len); + if (length == 0) { + return -1; + } + + mBuffer.get(bytes, off, length); + return length; + } + +} diff --git a/lib/src/main/java/de/tavendo/autobahn/ByteBufferOutputStream.java b/lib/src/main/java/de/tavendo/autobahn/ByteBufferOutputStream.java new file mode 100644 index 0000000..98a27bc --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/ByteBufferOutputStream.java @@ -0,0 +1,176 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package de.tavendo.autobahn; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +/** + * OutputStream wrapping a ByteBuffer. This class internally allocates a + * direct ByteBuffer for use i.e. with NIO for socket I/O. The ByteBuffer + * is automatically enlarged if needed (preserving contents when enlarged). + */ +public class ByteBufferOutputStream extends OutputStream { + + /// Initial size of allocated ByteBuffer. + private final int mInitialSize; + + /// Amount to grow when ByteBuffer needs to be enlarged. + private final int mGrowSize; + + /// Internal ByteBuffer wrapped. + private ByteBuffer mBuffer; + + /** + * Create a direct allocated ByteBuffer wrapped as OutputStream. + */ + public ByteBufferOutputStream() { + this(2 * 65536, 65536); + } + + /** + * Create a direct allocated ByteBuffer wrapped as OutputStream. + * + * @param initialSize Initial size of ByteBuffer. + * @param growSize When buffer needs to grow, enlarge by this amount. + */ + public ByteBufferOutputStream(int initialSize, int growSize) { + mInitialSize = initialSize; + mGrowSize = growSize; + mBuffer = ByteBuffer.allocateDirect(mInitialSize); + mBuffer.clear(); + } + + /** + * Get the underlying ByteBuffer. + * + * @return ByteBuffer underlying this OutputStream. + */ + public ByteBuffer getBuffer() { + return mBuffer; + } + + /** + * Calls flip on the underyling ByteBuffer. + */ + public Buffer flip() { + return mBuffer.flip(); + } + + /** + * Calls clear on the underlying ByteBuffer. + */ + public Buffer clear() { + return mBuffer.clear(); + } + + /** + * Calls remaining() on underlying ByteBuffer. + */ + public int remaining() { + return mBuffer.remaining(); + } + + /** + * Expand the underlying ByteBuffer and preserve content. + * + * @param requestSize Requested new size. + */ + public synchronized void expand(int requestSize) { + + if (requestSize > mBuffer.capacity()) { + + ByteBuffer oldBuffer = mBuffer; + int oldPosition = mBuffer.position(); + int newCapacity = ((requestSize / mGrowSize) + 1) * mGrowSize; + mBuffer = ByteBuffer.allocateDirect(newCapacity); + oldBuffer.clear(); + mBuffer.clear(); + mBuffer.put(oldBuffer); + mBuffer.position(oldPosition); + } + } + + /** + * Write one byte to the underlying ByteBuffer via this OutputStream. + * + * @param b Byte to be written. + */ + @Override + public synchronized void write(int b) throws IOException { + + if (mBuffer.position() + 1 > mBuffer.capacity()) { + expand(mBuffer.capacity() + 1); + } + mBuffer.put((byte) b); + } + + /** + * Write a chunk of bytes to the underyling ByteBuffer via this + * OutputStream. + * + * @param bytes Write bytes from this byte array. + * @param off Start reading at this offset within byte array. + * @param len Write this many bytes, and enlarge underyling + * ByteBuffer when necessary, preserving the contents. + */ + @Override + public synchronized void write(byte[] bytes, int off, int len) + throws IOException { + + if (mBuffer.position() + len > mBuffer.capacity()) { + expand(mBuffer.capacity() + len); + } + mBuffer.put(bytes, off, len); + } + + /** + * Write a complete byte array to the underlying ByteBuffer via this + * OutputStream. + * + * @param bytes Byte array to be written. + */ + public synchronized void write(byte[] bytes) throws IOException { + write(bytes, 0, bytes.length); + } + + /** + * Write the UTF-8 encoding of a String to the underlying ByteBuffer + * via this OutputStream. + * + * @param str String to be written. + * @throws IOException + */ + public synchronized void write(String str) throws IOException { + write(str.getBytes("UTF-8")); + } + + /** + * Write CR-LF. + * + * @throws IOException + */ + public synchronized void crlf() throws IOException { + write(0x0d); + write(0x0a); + } + +} diff --git a/lib/src/main/java/de/tavendo/autobahn/CertificateHelper.java b/lib/src/main/java/de/tavendo/autobahn/CertificateHelper.java new file mode 100644 index 0000000..2e6d4d6 --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/CertificateHelper.java @@ -0,0 +1,58 @@ +package de.tavendo.autobahn; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; + +import javax.net.SocketFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +public class CertificateHelper { + public static SocketFactory getSocketFactoryWithCustomCA(InputStream stream) + throws CertificateException, KeyStoreException, IOException, + NoSuchAlgorithmException, KeyManagementException { + + // Load CAs from an InputStream + // (could be from a resource or ByteArrayInputStream or ...) + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + + InputStream caInput = new BufferedInputStream(stream); + Certificate ca; + try { + ca = cf.generateCertificate(caInput); + System.out.println("ca=" + ((X509Certificate) ca).getSubjectDN()); + } finally { + try { + caInput.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + // Create a KeyStore containing our trusted CAs + String keyStoreType = KeyStore.getDefaultType(); + KeyStore keyStore = KeyStore.getInstance(keyStoreType); + keyStore.load(null, null); + keyStore.setCertificateEntry("ca", ca); + + // Create a TrustManager that trusts the CAs in our KeyStore + String tmfAlgorithm = TrustManagerFactory.getDefaultAlgorithm(); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm); + tmf.init(keyStore); + + // Create an SSLContext that uses our TrustManager + SSLContext context = SSLContext.getInstance("TLS"); + context.init(null, tmf.getTrustManagers(), null); + + return context.getSocketFactory(); + } +} diff --git a/lib/src/main/java/de/tavendo/autobahn/Doxygen.java b/lib/src/main/java/de/tavendo/autobahn/Doxygen.java new file mode 100644 index 0000000..910bac8 --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/Doxygen.java @@ -0,0 +1,85 @@ +package de.tavendo.autobahn; + +/*! +\mainpage +\section intro_sec AutobahnAndroid API Reference + +AutobahnAndroid provides a Java client library implementing +The WebSocket Protocol and +The WebSocket Application Messaging Protocol for use +in native Android apps. + + +\section websocket_features WebSocket Support + +AutobahnAndroid implements the WebSocket protocol +with a couple of distinct features: + +\li full RFC6455 and Draft Hybi-10 to -17 support +\li very good standards conformance +\li performant +\li easy to use API +\li designed to work with Android UI applications +\li Open-Source, licensed under the Apache 2.0 license + +The implementation passes all (nearly 300) tests from the +AutobahnTestSuite. + +The basic API is modeled after the WebSocket JavaScript API for +ease of use and familarity. + +The API enables the use of common Android idioms for event handling (using +anonymous inner classes) and integrates with Android UI applications (by +communicating via messages and message loops between the UI thread and back- +ground reader/writer threads and by avoiding _any_ network activity on the +UI thread). + +The implementation uses Java NIO to reduce network processing overhead and +is on-par or faster performance-wise compared to Firefox 8 Mobile, a C++ +implementation of WebSockets. + +\section rpc_pubsub WAMP (RPC/PubSub) Support + +AutobahnAndroid also +includes an implementation of The WebSocket Application Messaging Protocol (WAMP) +which can be used to build applications around Remote Procedure Call and +Publish & Subscribe messaging patterns. + +It features: + +\li RPC and PubSub, fully asynchronous design +\li built on JSON and WebSockets +\li simple, efficient and open protocol +\li automatic mapping to user-defined POJOs +\li seamless integration in Android UI apps +\li Open-Source, licensed under the Apache 2.0 license + +Call results and events which travel the wire as JSON payload are automatically +converted and mapped to Java primitive types or user-defined POJOs (Plain-old Java Objects). + +The latter is a very convenient and powerful feature made possible by the use of +Jackson, a high-performance JSON processor. +This works even for container types, such as lists or maps over POJOs. + +For example, it is possible to issue a RPC and get a List as a result, where Person is +a user-defined class. + +\section usage Usage + +The only dependency of +AutobahnAndroid +is Jackson. +To use, all one needs to do is to include the built JARs into an Android +project. + +\section more More Information + +For more information, please visit the project page, +the forum or the +code repository. +*/ + +/// Empty class file to hold Doxygen documentation. +abstract class Doxygen { + +} diff --git a/lib/src/main/java/de/tavendo/autobahn/NoCopyByteArrayOutputStream.java b/lib/src/main/java/de/tavendo/autobahn/NoCopyByteArrayOutputStream.java new file mode 100644 index 0000000..0007dd3 --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/NoCopyByteArrayOutputStream.java @@ -0,0 +1,66 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package de.tavendo.autobahn; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; + +/** + * OutputStream backed by a byte array. This class provides copyless access + * to byte array backing the ByteArrayOutputStream + */ +public class NoCopyByteArrayOutputStream extends ByteArrayOutputStream { + + /** + * Create new OutputStream backed by byte array. + */ + public NoCopyByteArrayOutputStream() { + super(); + } + + /** + * Create new OutputStream backed by byte array. + * + * @param size Initial size of underlying byte array. + */ + public NoCopyByteArrayOutputStream(int size) { + super(size); + } + + /** + * Wraps the underyling byte array into an InputStream. + * + * @return New InputStream wrapping byte buffer underlying this stream. + */ + public InputStream getInputStream() { + return new ByteArrayInputStream(buf, 0, count); + } + + /** + * Get byte array underlying this OutputStream. This + * does not copy any data, but return reference to the + * underlying byte array. + * + * @return Underlying byte array by reference. + */ + public byte[] getByteArray() { + return buf; + } +} diff --git a/lib/src/main/java/de/tavendo/autobahn/PrefixMap.java b/lib/src/main/java/de/tavendo/autobahn/PrefixMap.java new file mode 100644 index 0000000..1fb5df6 --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/PrefixMap.java @@ -0,0 +1,139 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package de.tavendo.autobahn; + +import java.util.HashMap; + +/** + * Mapping between CURIEs and URIs. + * Provides a two-way mapping between CURIEs (Compact URI Expressions) and + * full URIs. + * + * \see http://www.w3.org/TR/curie/ + * + * \todo Prefixes MUST be NCNames (http://www.w3.org/TR/1999/REC-xml-names-19990114/#NT-NCName) + * + * \todo Work in the details of http://www.w3.org/TR/curie/ (default prefixes, ..) + */ +public class PrefixMap { + + private final HashMap mPrefixes = new HashMap(); + private final HashMap mUris = new HashMap(); + + /** + * Set mapping of prefix to URI. + * + * @param prefix Prefix to be mapped. + * @param uri URI the prefix is to be mapped to. + */ + public void set(String prefix, String uri) { + mPrefixes.put(prefix, uri); + mUris.put(uri, prefix); + } + + /** + * Returns the URI for the prefix or None if prefix has no mapped URI. + * + * @param prefix Prefix to look up. + * @return Mapped URI for prefix or None. + */ + public String get(String prefix) { + return mPrefixes.get(prefix); + } + + /** + * Remove mapping of prefix to URI. + * + * @param prefix Prefix for which mapping should be removed. + * @return The URI the prefix was mapped to (when removed), + * or null when prefix is unmapped (so there wasn't + * anything to remove). + */ + public String remove(String prefix) { + if (mPrefixes.containsKey(prefix)) { + String uri = mPrefixes.get(prefix); + mPrefixes.remove(prefix); + mUris.remove(uri); + return uri; + } else { + return null; + } + } + + /** + * Remove all prefix mappings. + */ + public void clear() { + mPrefixes.clear(); + mUris.clear(); + } + + /** + * Resolve given CURIE to full URI. + * + * @param curie CURIE (i.e. "rdf:label"). + * @return Full URI for CURIE or None. + */ + public String resolve(String curie) { + int i = curie.indexOf(':'); + if (i > 0) { + String prefix = curie.substring(0, i); + if (mPrefixes.containsKey(prefix)) { + return mPrefixes.get(prefix) + curie.substring(i + 1); + } + } + return null; + } + + /** + * Resolve given CURIE/URI and return string verbatim if cannot be resolved. + * + * @param curieOrUri CURIE or URI. + * @return Full URI for CURIE or original string. + */ + public String resolveOrPass(String curieOrUri) { + + String u = resolve(curieOrUri); + if (u != null) { + return u; + } else { + return curieOrUri; + } + } + + /** + * Shrink given URI to CURIE. If no appropriate prefix mapping is available, + * return original URI. + * + * @param uri URI to shrink. + * @return CURIE or original URI. + */ + public String shrink(String uri) { + + for (int i = uri.length(); i > 0; --i) { + String u = uri.substring(0, i); + String p = mUris.get(u); + if (p != null) { + return p + ':' + uri.substring(i); + } + } + return uri; + } + +} diff --git a/lib/src/main/java/de/tavendo/autobahn/Utf8Validator.java b/lib/src/main/java/de/tavendo/autobahn/Utf8Validator.java new file mode 100644 index 0000000..68a2fa9 --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/Utf8Validator.java @@ -0,0 +1,129 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Implements the algorithm "Flexible and Economical UTF-8 Decoder" by + * Bjoern Hoehrmann (http://bjoern.hoehrmann.de/utf-8/decoder/dfa/). + * + ******************************************************************************/ + +package de.tavendo.autobahn; + + +/** + * Incremental UTF-8 validator. The validator runs with constant memory + * consumption (minimal state). Purpose is to validate UTF-8, not to + * decode (which could be done easily also, but we rely on Java built in + * facilities for that). + * + * Implements the algorithm "Flexible and Economical UTF-8 Decoder" by + * Bjoern Hoehrmann (http://bjoern.hoehrmann.de/utf-8/decoder/dfa/). + */ +public class Utf8Validator { + + /// DFA state transitions (14 x 32 = 448). + private static final int[] DFA = { + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, // 00..1f + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, // 20..3f + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, // 40..5f + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, // 60..7f + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9, // 80..9f + 7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7, // a0..bf + 8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, // c0..df + 0xa,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x4,0x3,0x3, // e0..ef + 0xb,0x6,0x6,0x6,0x5,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8, // f0..ff + 0x0,0x1,0x2,0x3,0x5,0x8,0x7,0x1,0x1,0x1,0x4,0x6,0x1,0x1,0x1,0x1, // s0..s0 + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,0,1,0,1,1,1,1,1,1, // s1..s2 + 1,2,1,1,1,1,1,2,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1, // s3..s4 + 1,2,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,3,1,3,1,1,1,1,1,1, // s5..s6 + 1,3,1,1,1,1,1,3,1,3,1,1,1,1,1,1,1,3,1,1,1,1,1,1,1,1,1,1,1,1,1,1 // s7..s8 + }; + + private static final int ACCEPT = 0; + private static final int REJECT = 1; + + private int mState; + private int mPos; + + /** + * Create new incremental UTF-8 validator. The validator is already + * resetted and thus immediately usable. + */ + public Utf8Validator() { + reset(); + } + + /** + * Reset validator state to begin validation of new + * UTF-8 stream. + */ + public void reset() { + mState = ACCEPT; + mPos = 0; + } + + /** + * Get end of validated position within stream. When validate() + * returns false, indicating an UTF-8 error, this function can + * be used to get the exact position within the stream upon + * which the violation was encountered. + * + * @return Current position with stream validated. + */ + public int position() { + return mPos; + } + + /** + * Check if incremental validation (currently) has ended on + * a complete encoded Unicode codepoint. + * + * @return True, iff currently ended on codepoint. + */ + public boolean isValid() { + return mState == ACCEPT; + } + + /** + * Validate a chunk of octets for UTF-8. + * + * @param data Buffer which contains chunk to validate. + * @param off Offset within buffer where to continue with validation. + * @param len Length in octets to validate within buffer. + * @return False as soon as UTF-8 violation occurs, true otherwise. + */ + public boolean validate(byte[] data, int off, int len) { + for (int i = off; i < off + len; ++i) { + mState = DFA[256 + (mState << 4) + DFA[(int) (0xff & data[i])]]; + if (mState == REJECT) { + mPos += i; + return false; + } + } + mPos += len; + return true; + } + + /** + * Validate a chunk of octets for UTF-8. + * + * @param data Buffer which contains chunk to validate. + * @return False as soon as UTF-8 violation occurs, true otherwise. + */ + public boolean validate(byte[] data) { + return validate(data, 0, data.length); + } + +} diff --git a/lib/src/main/java/de/tavendo/autobahn/WebSocket.java b/lib/src/main/java/de/tavendo/autobahn/WebSocket.java new file mode 100644 index 0000000..0b04f60 --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/WebSocket.java @@ -0,0 +1,68 @@ +package de.tavendo.autobahn; + +import java.net.URI; + +public interface WebSocket { + public static final String UTF8_ENCODING = "UTF-8"; + + /** + * Session handler for WebSocket sessions. + */ + public interface WebSocketConnectionObserver { + public static enum WebSocketCloseNotification { + NORMAL, + CANNOT_CONNECT, + CONNECTION_LOST, + PROTOCOL_ERROR, + INTERNAL_ERROR, + SERVER_ERROR, + RECONNECT + } + + /** + * Fired when the WebSockets connection has been established. + * After this happened, messages may be sent. + */ + public void onOpen(); + + /** + * Fired when the WebSockets connection has deceased (or could + * not established in the first place). + * + * @param code Close code. + * @param reason Close reason (human-readable). + */ + public void onClose(WebSocketCloseNotification code, String reason); + + /** + * Fired when a text message has been received (and text + * messages are not set to be received raw). + * + * @param payload Text message payload or null (empty payload). + */ + public void onTextMessage(String payload); + + /** + * Fired when a text message has been received (and text + * messages are set to be received raw). + * + * @param payload Text message payload as raw UTF-8 or null (empty payload). + */ + public void onRawTextMessage(byte[] payload); + + /** + * Fired when a binary message has been received. + * + * @param payload Binar message payload or null (empty payload). + */ + public void onBinaryMessage(byte[] payload); + } + + public void connect(URI uri, WebSocketConnectionObserver observer) throws WebSocketException; + public void connect(URI uri, WebSocketConnectionObserver observer, WebSocketOptions options) throws WebSocketException; + public void disconnect(); + public boolean isConnected(); + public void sendBinaryMessage(byte[] payload); + public void sendRawTextMessage(byte[] payload); + public void sendTextMessage(String payload); +} diff --git a/lib/src/main/java/de/tavendo/autobahn/WebSocketConnection.java b/lib/src/main/java/de/tavendo/autobahn/WebSocketConnection.java new file mode 100644 index 0000000..6bf5460 --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/WebSocketConnection.java @@ -0,0 +1,548 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package de.tavendo.autobahn; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.net.Socket; +import java.net.URI; + +import javax.net.SocketFactory; + +import android.net.SSLCertificateSocketFactory; +import android.os.Handler; +import android.os.Looper; +import android.os.Message; +import android.util.Log; +import de.tavendo.autobahn.WebSocket.WebSocketConnectionObserver.WebSocketCloseNotification; +import de.tavendo.autobahn.WebSocketMessage.WebSocketCloseCode; + +public class WebSocketConnection implements WebSocket { + private static final String TAG = WebSocketConnection.class.getName(); + private static final String WS_URI_SCHEME = "ws"; + private static final String WSS_URI_SCHEME = "wss"; + private static final String WS_WRITER = "WebSocketWriter"; + private static final String WS_READER = "WebSocketReader"; + + private final Handler mHandler; + + private WebSocketReader mWebSocketReader; + private WebSocketWriter mWebSocketWriter; + + private Socket mSocket; + private SocketThread mSocketThread; + + private URI mWebSocketURI; + private String[] mWebSocketSubprotocols; + + private WeakReference mWebSocketConnectionObserver; + + private WebSocketOptions mWebSocketOptions; + private boolean mPreviousConnection = false; + + + + public WebSocketConnection() { + Log.d(TAG, "WebSocket connection created."); + + this.mHandler = new ThreadHandler(this); + } + + + + // + // Forward to the writer thread + public void sendTextMessage(String payload) { + mWebSocketWriter.forward(new WebSocketMessage.TextMessage(payload)); + } + + + public void sendRawTextMessage(byte[] payload) { + mWebSocketWriter.forward(new WebSocketMessage.RawTextMessage(payload)); + } + + + public void sendBinaryMessage(byte[] payload) { + mWebSocketWriter.forward(new WebSocketMessage.BinaryMessage(payload)); + } + + + + public boolean isConnected() { + return mSocket != null && mSocket.isConnected() && !mSocket.isClosed(); + } + + + + private void failConnection(WebSocketCloseNotification code, String reason) { + Log.d(TAG, "fail connection [code = " + code + ", reason = " + reason); + + if (mWebSocketReader != null) { + mWebSocketReader.quit(); + + try { + mWebSocketReader.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + Log.d(TAG, "mReader already NULL"); + } + + if (mWebSocketWriter != null) { + mWebSocketWriter.forward(new WebSocketMessage.Quit()); + + try { + mWebSocketWriter.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + Log.d(TAG, "mWriter already NULL"); + } + + if (mSocket != null) { + mSocketThread.getHandler().post(new Runnable() { + + @Override + public void run() { + mSocketThread.stopConnection(); + } + }); + } else { + Log.d(TAG, "mTransportChannel already NULL"); + } + + mSocketThread.getHandler().post(new Runnable() { + + @Override + public void run() { + Looper.myLooper().quit(); + } + }); + + onClose(code, reason); + + Log.d(TAG, "worker threads stopped"); + } + + + + public void connect(URI webSocketURI, WebSocket.WebSocketConnectionObserver connectionObserver) throws WebSocketException { + connect(webSocketURI, connectionObserver, new WebSocketOptions()); + } + + public void connect(URI webSocketURI, WebSocket.WebSocketConnectionObserver connectionObserver, WebSocketOptions options) throws WebSocketException { + connect(webSocketURI, null, connectionObserver, options); + } + + public void connect(URI webSocketURI, String[] subprotocols, WebSocket.WebSocketConnectionObserver connectionObserver, WebSocketOptions options) throws WebSocketException { + if (isConnected()) { + throw new WebSocketException("already connected"); + } + + if (webSocketURI == null) { + throw new WebSocketException("WebSockets URI null."); + } else { + this.mWebSocketURI = webSocketURI; + if (!mWebSocketURI.getScheme().equals(WS_URI_SCHEME) && !mWebSocketURI.getScheme().equals(WSS_URI_SCHEME)) { + throw new WebSocketException("unsupported scheme for WebSockets URI"); + } + + this.mWebSocketSubprotocols = subprotocols; + this.mWebSocketConnectionObserver = new WeakReference(connectionObserver); + this.mWebSocketOptions = new WebSocketOptions(options); + + connect(); + } + } + + public void disconnect() { + if (mWebSocketWriter != null && mWebSocketWriter.isAlive()) { + mWebSocketWriter.forward(new WebSocketMessage.Close()); + } else { + Log.d(TAG, "Could not send WebSocket Close .. writer already null"); + } + + this.mPreviousConnection = false; + } + + /** + * Reconnect to the server with the latest options + * @return true if reconnection performed + */ + public boolean reconnect() { + if (!isConnected() && (mWebSocketURI != null)) { + connect(); + return true; + } + return false; + } + + private void connect() { + mSocketThread = new SocketThread(mWebSocketURI, mWebSocketOptions); + + mSocketThread.start(); + synchronized (mSocketThread) { + try { + mSocketThread.wait(); + } catch (InterruptedException e) { + } + } + mSocketThread.getHandler().post(new Runnable() { + + @Override + public void run() { + mSocketThread.startConnection(); + } + }); + + synchronized (mSocketThread) { + try { + mSocketThread.wait(); + } catch (InterruptedException e) { + } + } + + this.mSocket = mSocketThread.getSocket(); + + if (mSocket == null) { + onClose(WebSocketCloseNotification.CANNOT_CONNECT, mSocketThread.getFailureMessage()); + } else if (mSocket.isConnected()) { + try { + createReader(); + createWriter(); + + WebSocketMessage.ClientHandshake clientHandshake = new WebSocketMessage.ClientHandshake(mWebSocketURI, null, mWebSocketSubprotocols); + mWebSocketWriter.forward(clientHandshake); + } catch (Exception e) { + onClose(WebSocketCloseNotification.INTERNAL_ERROR, e.getLocalizedMessage()); + } + } else { + onClose(WebSocketCloseNotification.CANNOT_CONNECT, "could not connect to WebSockets server"); + } + } + + /** + * Perform reconnection + * + * @return true if reconnection was scheduled + */ + protected boolean scheduleReconnect() { + /** + * Reconnect only if: + * - connection active (connected but not disconnected) + * - has previous success connections + * - reconnect interval is set + */ + int interval = mWebSocketOptions.getReconnectInterval(); + boolean shouldReconnect = mSocket != null + && mSocket.isConnected() + && mPreviousConnection + && (interval > 0); + if (shouldReconnect) { + Log.d(TAG, "WebSocket reconnection scheduled"); + mHandler.postDelayed(new Runnable() { + + public void run() { + Log.d(TAG, "WebSocket reconnecting..."); + reconnect(); + } + }, interval); + } + return shouldReconnect; + } + + /** + * Common close handler + * + * @param code Close code. + * @param reason Close reason (human-readable). + */ + private void onClose(WebSocketCloseNotification code, String reason) { + boolean reconnecting = false; + + if ((code == WebSocketCloseNotification.CANNOT_CONNECT) || (code == WebSocketCloseNotification.CONNECTION_LOST)) { + reconnecting = scheduleReconnect(); + } + + WebSocket.WebSocketConnectionObserver webSocketObserver = mWebSocketConnectionObserver.get(); + if (webSocketObserver != null) { + try { + if (reconnecting) { + webSocketObserver.onClose(WebSocketConnectionObserver.WebSocketCloseNotification.RECONNECT, reason); + } else { + webSocketObserver.onClose(code, reason); + } + } catch (Exception e) { + e.printStackTrace(); + } + } else { + Log.d(TAG, "WebSocketObserver null"); + } + } + + + + + protected void processAppMessage(Object message) { + } + + + /** + * Create WebSockets background writer. + */ + protected void createWriter() { + mWebSocketWriter = new WebSocketWriter(mHandler, mSocket, mWebSocketOptions, WS_WRITER); + mWebSocketWriter.start(); + + synchronized (mWebSocketWriter) { + try { + mWebSocketWriter.wait(); + } catch (InterruptedException e) { + } + } + + Log.d(TAG, "WebSocket writer created and started."); + } + + + /** + * Create WebSockets background reader. + */ + protected void createReader() { + + mWebSocketReader = new WebSocketReader(mHandler, mSocket, mWebSocketOptions, WS_READER); + mWebSocketReader.start(); + + synchronized (mWebSocketReader) { + try { + mWebSocketReader.wait(); + } catch (InterruptedException e) { + } + } + + Log.d(TAG, "WebSocket reader created and started."); + } + + private void handleMessage(Message message) { + WebSocket.WebSocketConnectionObserver webSocketObserver = mWebSocketConnectionObserver.get(); + + if (message.obj instanceof WebSocketMessage.TextMessage) { + WebSocketMessage.TextMessage textMessage = (WebSocketMessage.TextMessage) message.obj; + + if (webSocketObserver != null) { + webSocketObserver.onTextMessage(textMessage.mPayload); + } else { + Log.d(TAG, "could not call onTextMessage() .. handler already NULL"); + } + + } else if (message.obj instanceof WebSocketMessage.RawTextMessage) { + WebSocketMessage.RawTextMessage rawTextMessage = (WebSocketMessage.RawTextMessage) message.obj; + + if (webSocketObserver != null) { + webSocketObserver.onRawTextMessage(rawTextMessage.mPayload); + } else { + Log.d(TAG, "could not call onRawTextMessage() .. handler already NULL"); + } + + } else if (message.obj instanceof WebSocketMessage.BinaryMessage) { + WebSocketMessage.BinaryMessage binaryMessage = (WebSocketMessage.BinaryMessage) message.obj; + + if (webSocketObserver != null) { + webSocketObserver.onBinaryMessage(binaryMessage.mPayload); + } else { + Log.d(TAG, "could not call onBinaryMessage() .. handler already NULL"); + } + + } else if (message.obj instanceof WebSocketMessage.Ping) { + WebSocketMessage.Ping ping = (WebSocketMessage.Ping) message.obj; + Log.d(TAG, "WebSockets Ping received"); + + WebSocketMessage.Pong pong = new WebSocketMessage.Pong(); + pong.mPayload = ping.mPayload; + mWebSocketWriter.forward(pong); + + } else if (message.obj instanceof WebSocketMessage.Pong) { + WebSocketMessage.Pong pong = (WebSocketMessage.Pong) message.obj; + + Log.d(TAG, "WebSockets Pong received" + pong.mPayload); + + } else if (message.obj instanceof WebSocketMessage.Close) { + WebSocketMessage.Close close = (WebSocketMessage.Close) message.obj; + + Log.d(TAG, "WebSockets Close received (" + close.getCode() + " - " + close.getReason() + ")"); + + mWebSocketWriter.forward(new WebSocketMessage.Close(WebSocketCloseCode.NORMAL)); + + } else if (message.obj instanceof WebSocketMessage.ServerHandshake) { + WebSocketMessage.ServerHandshake serverHandshake = (WebSocketMessage.ServerHandshake) message.obj; + + Log.d(TAG, "opening handshake received"); + + if (serverHandshake.mSuccess) { + if (webSocketObserver != null) { + webSocketObserver.onOpen(); + } else { + Log.d(TAG, "could not call onOpen() .. handler already NULL"); + } + mPreviousConnection = true; + } + + } else if (message.obj instanceof WebSocketMessage.ConnectionLost) { + // WebSocketMessage.ConnectionLost connectionLost = (WebSocketMessage.ConnectionLost) message.obj; + failConnection(WebSocketCloseNotification.CONNECTION_LOST, "WebSockets connection lost"); + + } else if (message.obj instanceof WebSocketMessage.ProtocolViolation) { + // WebSocketMessage.ProtocolViolation protocolViolation = (WebSocketMessage.ProtocolViolation) message.obj; + failConnection(WebSocketCloseNotification.PROTOCOL_ERROR, "WebSockets protocol violation"); + + } else if (message.obj instanceof WebSocketMessage.Error) { + WebSocketMessage.Error error = (WebSocketMessage.Error) message.obj; + failConnection(WebSocketCloseNotification.INTERNAL_ERROR, "WebSockets internal error (" + error.mException.toString() + ")"); + + } else if (message.obj instanceof WebSocketMessage.ServerError) { + WebSocketMessage.ServerError error = (WebSocketMessage.ServerError) message.obj; + failConnection(WebSocketCloseNotification.SERVER_ERROR, "Server error " + error.mStatusCode + " (" + error.mStatusMessage + ")"); + + } else { + processAppMessage(message.obj); + + } + } + + + + public static class SocketThread extends Thread { + private static final String WS_CONNECTOR = "WebSocketConnector"; + + private final URI mWebSocketURI; + private final WebSocketOptions mWebSocketOptions; + + private Socket mSocket = null; + private String mFailureMessage = null; + + private Handler mHandler; + + + + public SocketThread(URI uri, WebSocketOptions options) { + this.setName(WS_CONNECTOR); + + this.mWebSocketURI = uri; + this.mWebSocketOptions = options; + } + + + + @Override + public void run() { + Looper.prepare(); + this.mHandler = new Handler(); + synchronized (this) { + notifyAll(); + } + + Looper.loop(); + Log.d(TAG, "SocketThread exited."); + } + + + + public void startConnection() { + try { + String host = mWebSocketURI.getHost(); + int port = mWebSocketURI.getPort(); + + if (port == -1) { + if (mWebSocketURI.getScheme().equals(WSS_URI_SCHEME)) { + port = 443; + } else { + port = 80; + } + } + + SocketFactory factory = null; + if (mWebSocketURI.getScheme().equalsIgnoreCase(WSS_URI_SCHEME)) { + if (mWebSocketOptions != null) { + factory = mWebSocketOptions.getSSLCertificateSocketFactory(); + } + + if (factory == null) { + factory = SSLCertificateSocketFactory.getDefault(); + } + } else { + factory = SocketFactory.getDefault(); + } + + // Do not replace host string with InetAddress or you lose automatic host name verification + this.mSocket = factory.createSocket(host, port); + } catch (IOException e) { + this.mFailureMessage = e.getLocalizedMessage(); + } + + synchronized (this) { + notifyAll(); + } + } + + public void stopConnection() { + try { + mSocket.close(); + this.mSocket = null; + } catch (IOException e) { + this.mFailureMessage = e.getLocalizedMessage(); + } + } + + public Handler getHandler() { + return mHandler; + } + public Socket getSocket() { + return mSocket; + } + public String getFailureMessage() { + return mFailureMessage; + } + } + + + + private static class ThreadHandler extends Handler { + private final WeakReference mWebSocketConnection; + + + + public ThreadHandler(WebSocketConnection webSocketConnection) { + super(); + + this.mWebSocketConnection = new WeakReference(webSocketConnection); + } + + + + @Override + public void handleMessage(Message message) { + WebSocketConnection webSocketConnection = mWebSocketConnection.get(); + if (webSocketConnection != null) { + webSocketConnection.handleMessage(message); + } + } + } +} diff --git a/lib/src/main/java/de/tavendo/autobahn/WebSocketConnectionHandler.java b/lib/src/main/java/de/tavendo/autobahn/WebSocketConnectionHandler.java new file mode 100644 index 0000000..42181a1 --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/WebSocketConnectionHandler.java @@ -0,0 +1,70 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package de.tavendo.autobahn; + +/** + * WebSockets event handler. Users will usually provide an instance of a class + * derived from this to handle WebSockets received messages and open/close events + */ +public class WebSocketConnectionHandler implements WebSocket.WebSocketConnectionObserver { + + /** + * Fired when the WebSockets connection has been established. + * After this happened, messages may be sent. + */ + public void onOpen() { + } + + /** + * Fired when the WebSockets connection has deceased (or could + * not established in the first place). + * + * @param code Close code. + * @param reason Close reason (human-readable). + */ + public void onClose(WebSocketCloseNotification code, String reason) { + } + + /** + * Fired when a text message has been received (and text + * messages are not set to be received raw). + * + * @param payload Text message payload or null (empty payload). + */ + public void onTextMessage(String payload) { + } + + /** + * Fired when a text message has been received (and text + * messages are set to be received raw). + * + * @param payload Text message payload as raw UTF-8 or null (empty payload). + */ + public void onRawTextMessage(byte[] payload) { + } + + /** + * Fired when a binary message has been received. + * + * @param payload Binar message payload or null (empty payload). + */ + public void onBinaryMessage(byte[] payload) { + } + +} diff --git a/lib/src/main/java/de/tavendo/autobahn/WebSocketException.java b/lib/src/main/java/de/tavendo/autobahn/WebSocketException.java new file mode 100644 index 0000000..3387925 --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/WebSocketException.java @@ -0,0 +1,32 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package de.tavendo.autobahn; + +public class WebSocketException extends Exception { + + private static final long serialVersionUID = 1L; + + public WebSocketException(String message) { + super(message); + } + + public WebSocketException(String message, Throwable t) { + super(message, t); + } +} diff --git a/lib/src/main/java/de/tavendo/autobahn/WebSocketFrameHeader.java b/lib/src/main/java/de/tavendo/autobahn/WebSocketFrameHeader.java new file mode 100644 index 0000000..d9a068f --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/WebSocketFrameHeader.java @@ -0,0 +1,54 @@ +package de.tavendo.autobahn; + +public class WebSocketFrameHeader { + private int mOpcode; + private boolean mFin; + private int mReserved; + private int mHeaderLen; + private int mPayloadLen; + private int mTotalLen; + private byte[] mMask; + + public int getOpcode() { + return mOpcode; + } + public void setOpcode(int opcode) { + this.mOpcode = opcode; + } + public boolean isFin() { + return mFin; + } + public void setFin(boolean fin) { + this.mFin = fin; + } + public int getReserved() { + return mReserved; + } + public void setReserved(int reserved) { + this.mReserved = reserved; + } + public int getHeaderLength() { + return mHeaderLen; + } + public void setHeaderLength(int headerLength) { + this.mHeaderLen = headerLength; + } + public int getPayloadLength() { + return mPayloadLen; + } + public void setPayloadLength(int payloadLength) { + this.mPayloadLen = payloadLength; + } + public int getTotalLength() { + return mTotalLen; + } + public void setTotalLen(int totalLength) { + this.mTotalLen = totalLength; + } + public byte[] getMask() { + return mMask; + } + public void setMask(byte[] mask) { + this.mMask = mask; + } +} \ No newline at end of file diff --git a/lib/src/main/java/de/tavendo/autobahn/WebSocketMessage.java b/lib/src/main/java/de/tavendo/autobahn/WebSocketMessage.java new file mode 100644 index 0000000..677c60c --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/WebSocketMessage.java @@ -0,0 +1,219 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package de.tavendo.autobahn; + +import java.net.URI; + +/** + * WebSockets message classes. + * The master thread and the background reader/writer threads communicate using these messages + * for WebSockets connections. + */ +public class WebSocketMessage { + public static class WebSocketCloseCode { + public static final int NORMAL = 1000; + public static final int ENDPOINT_GOING_AWAY = 1001; + public static final int ENDPOINT_PROTOCOL_ERROR = 1002; + public static final int ENDPOINT_UNSUPPORTED_DATA_TYPE = 1003; + public static final int RESERVED = 1004; + public static final int RESERVED_NO_STATUS = 1005; + public static final int RESERVED_NO_CLOSING_HANDSHAKE = 1006; + public static final int ENDPOINT_BAD_DATA = 1007; + public static final int POLICY_VIOLATION = 1008; + public static final int MESSAGE_TOO_BIG = 1009; + public static final int ENDPOINT_NEEDS_EXTENSION = 1010; + public static final int UNEXPECTED_CONDITION = 1011; + public static final int RESERVED_TLS_REQUIRED = 1015; + } + + + /// Base message class. + public static class Message { + } + + /// Quite background thread. + public static class Quit extends Message { + } + + /// Initial WebSockets handshake (client request). + public static class ClientHandshake extends Message { + private final URI mURI; + private final URI mOrigin; + private final String[] mSubprotocols; + + + + ClientHandshake(URI uri) { + this.mURI = uri; + this.mOrigin = null; + this.mSubprotocols = null; + } + + ClientHandshake(URI uri, URI origin, String[] subprotocols) { + this.mURI = uri; + this.mOrigin = origin; + this.mSubprotocols = subprotocols; + } + + + + public URI getURI() { + return mURI; + } + public URI getOrigin() { + return mOrigin; + } + public String[] getSubprotocols() { + return mSubprotocols; + } + } + + /// Initial WebSockets handshake (server response). + public static class ServerHandshake extends Message { + public boolean mSuccess; + + public ServerHandshake(boolean success) { + mSuccess = success; + } + } + + /// WebSockets connection lost + public static class ConnectionLost extends Message { + } + + public static class ServerError extends Message { + public int mStatusCode; + public String mStatusMessage; + + public ServerError(int statusCode, String statusMessage) { + mStatusCode = statusCode; + mStatusMessage = statusMessage; + } + + } + + /// WebSockets reader detected WS protocol violation. + public static class ProtocolViolation extends Message { + + public WebSocketException mException; + + public ProtocolViolation(WebSocketException e) { + mException = e; + } + } + + /// An exception occured in the WS reader or WS writer. + public static class Error extends Message { + + public Exception mException; + + public Error(Exception e) { + mException = e; + } + } + + /// WebSockets text message to send or received. + public static class TextMessage extends Message { + + public String mPayload; + + TextMessage(String payload) { + mPayload = payload; + } + } + + /// WebSockets raw (UTF-8) text message to send or received. + public static class RawTextMessage extends Message { + + public byte[] mPayload; + + RawTextMessage(byte[] payload) { + mPayload = payload; + } + } + + /// WebSockets binary message to send or received. + public static class BinaryMessage extends Message { + + public byte[] mPayload; + + BinaryMessage(byte[] payload) { + mPayload = payload; + } + } + + /// WebSockets close to send or received. + public static class Close extends Message { + private int mCode; + private String mReason; + + + Close() { + mCode = WebSocketCloseCode.UNEXPECTED_CONDITION; + mReason = null; + } + + Close(int code) { + mCode = code; + mReason = null; + } + + Close(int code, String reason) { + mCode = code; + mReason = reason; + } + + + public int getCode() { + return mCode; + } + public String getReason() { + return mReason; + } + } + + /// WebSockets ping to send or received. + public static class Ping extends Message { + + public byte[] mPayload; + + Ping() { + mPayload = null; + } + + Ping(byte[] payload) { + mPayload = payload; + } + } + + /// WebSockets pong to send or received. + public static class Pong extends Message { + + public byte[] mPayload; + + Pong() { + mPayload = null; + } + + Pong(byte[] payload) { + mPayload = payload; + } + } + +} diff --git a/lib/src/main/java/de/tavendo/autobahn/WebSocketOptions.java b/lib/src/main/java/de/tavendo/autobahn/WebSocketOptions.java new file mode 100644 index 0000000..de0b649 --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/WebSocketOptions.java @@ -0,0 +1,294 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package de.tavendo.autobahn; + + +import javax.net.SocketFactory; + +/** + * WebSockets connection options. This can be supplied to WebSocketConnection in connect(). + * Note that the latter copies the options provided to connect(), so any change after + * connect will have no effect. + */ +public class WebSocketOptions { + + private int mMaxFramePayloadSize; + private int mMaxMessagePayloadSize; + private boolean mReceiveTextMessagesRaw; + private boolean mTcpNoDelay; + private int mSocketReceiveTimeout; + private int mSocketConnectTimeout; + private boolean mValidateIncomingUtf8; + private boolean mMaskClientFrames; + private int mReconnectInterval; + private SocketFactory mSSLCertificateSocketFactory; + + + /** + * Construct default options. + */ + public WebSocketOptions() { + + mMaxFramePayloadSize = 128 * 1024; + mMaxMessagePayloadSize = 128 * 1024; + mReceiveTextMessagesRaw = false; + mTcpNoDelay = true; + mSocketReceiveTimeout = 200; + mSocketConnectTimeout = 6000; + mValidateIncomingUtf8 = true; + mMaskClientFrames = true; + mReconnectInterval = 0; // no reconnection by default + } + + /** + * Construct options as copy from other options object. + * + * @param other Options to copy. + */ + public WebSocketOptions(WebSocketOptions other) { + + mMaxFramePayloadSize = other.mMaxFramePayloadSize; + mMaxMessagePayloadSize = other.mMaxMessagePayloadSize; + mReceiveTextMessagesRaw = other.mReceiveTextMessagesRaw; + mTcpNoDelay = other.mTcpNoDelay; + mSocketReceiveTimeout = other.mSocketReceiveTimeout; + mSocketConnectTimeout = other.mSocketConnectTimeout; + mValidateIncomingUtf8 = other.mValidateIncomingUtf8; + mMaskClientFrames = other.mMaskClientFrames; + mReconnectInterval = other.mReconnectInterval; + mSSLCertificateSocketFactory = other.mSSLCertificateSocketFactory; + } + + /** + * Receive text message as raw byte array with verified, + * but non-decoded UTF-8. + * + * DEFAULT: false + * + * @param enabled True to enable. + */ + public void setReceiveTextMessagesRaw(boolean enabled) { + mReceiveTextMessagesRaw = enabled; + } + + /** + * When true, WebSockets text messages are provided as + * verified, but non-decoded UTF-8 in byte arrays. + * + * @return True, iff option is enabled. + */ + public boolean getReceiveTextMessagesRaw() { + return mReceiveTextMessagesRaw; + } + + /** + * Set maximum frame payload size that will be accepted + * when receiving. + * + * DEFAULT: 4MB + * + * @param size Maximum size in octets for frame payload. + */ + public void setMaxFramePayloadSize(int size) { + if (size > 0) { + mMaxFramePayloadSize = size; + if (mMaxMessagePayloadSize < mMaxFramePayloadSize) { + mMaxMessagePayloadSize = mMaxFramePayloadSize; + } + } + } + + /** + * Get maxium frame payload size that will be accepted + * when receiving. + * + * @return Maximum size in octets for frame payload. + */ + public int getMaxFramePayloadSize() { + return mMaxFramePayloadSize; + } + + /** + * Set maximum message payload size (after reassembly of fragmented + * messages) that will be accepted when receiving. + * + * DEFAULT: 4MB + * + * @param size Maximum size in octets for message payload. + */ + public void setMaxMessagePayloadSize(int size) { + if (size > 0) { + mMaxMessagePayloadSize = size; + if (mMaxMessagePayloadSize < mMaxFramePayloadSize) { + mMaxFramePayloadSize = mMaxMessagePayloadSize; + } + } + } + + /** + * Get maximum message payload size (after reassembly of fragmented + * messages) that will be accepted when receiving. + * + * @return Maximum size in octets for message payload. + */ + public int getMaxMessagePayloadSize() { + return mMaxMessagePayloadSize; + } + + /** + * Set TCP No-Delay ("Nagle") for TCP connection. + * + * DEFAULT: true + * + * @param enabled True to enable TCP No-Delay. + */ + public void setTcpNoDelay(boolean enabled) { + mTcpNoDelay = enabled; + } + + /** + * Get TCP No-Delay ("Nagle") for TCP connection. + * + * @return True, iff TCP No-Delay is enabled. + */ + public boolean getTcpNoDelay() { + return mTcpNoDelay; + } + + /** + * Set receive timeout on socket. When the TCP connection disappears, + * that will only be recognized by the reader after this timeout. + * + * DEFAULT: 200 + * + * @param timeoutMs Socket receive timeout in ms. + */ + public void setSocketReceiveTimeout(int timeoutMs) { + if (timeoutMs >= 0) { + mSocketReceiveTimeout = timeoutMs; + } + } + + /** + * Get socket receive timeout. + * + * @return Socket receive timeout in ms. + */ + public int getSocketReceiveTimeout() { + return mSocketReceiveTimeout; + } + + /** + * Set connect timeout on socket. When a WebSocket connection is + * about to be established, the TCP socket connect will timeout + * after this period. + * + * DEFAULT: 3000 + * + * @param timeoutMs Socket connect timeout in ms. + */ + public void setSocketConnectTimeout(int timeoutMs) { + if (timeoutMs >= 0) { + mSocketConnectTimeout = timeoutMs; + } + } + + /** + * Get socket connect timeout. + * + * @return Socket receive timeout in ms. + */ + public int getSocketConnectTimeout() { + return mSocketConnectTimeout; + } + + /** + * Controls whether incoming text message payload is verified + * to be valid UTF-8. + * + * DEFAULT: true + * + * @param enabled True to verify incoming UTF-8. + */ + public void setValidateIncomingUtf8(boolean enabled) { + mValidateIncomingUtf8 = enabled; + } + + /** + * Get UTF-8 validation option. + * + * @return True, iff incoming UTF-8 is validated. + */ + public boolean getValidateIncomingUtf8() { + return mValidateIncomingUtf8; + } + + /** + * Controls whether to mask client-to-server WebSocket frames. + * Beware, normally, WebSockets servers will deny non-masked c2s + * frames and fail the connection. + * + * DEFAULT: true + * + * @param enabled Set true to mask client-to-server frames. + */ + public void setMaskClientFrames(boolean enabled) { + mMaskClientFrames = enabled; + } + + /** + * Get mask client frames option. + * + * @return True, iff client-to-server frames are masked. + */ + public boolean getMaskClientFrames() { + return mMaskClientFrames; + } + + /** + * Set reconnect interval + * + * @param reconnectInterval Interval in ms, 0 - no reconnection + */ + public void setReconnectInterval(int reconnectInterval) { + mReconnectInterval = reconnectInterval; + } + + public int getReconnectInterval() { + return mReconnectInterval; + } + + /** + * Get custom ssl certificate socket factory + * + * @return socket factory or null if not set + */ + public SocketFactory getSSLCertificateSocketFactory() { + return mSSLCertificateSocketFactory; + } + + /** + * Set custom ssl certificate socket factory + * + * @param sslCertificateSocketFactory custom ssl certificate socket factory + */ + public void setSSLCertificateSocketFactory(SocketFactory sslCertificateSocketFactory) { + mSSLCertificateSocketFactory = sslCertificateSocketFactory; + } +} diff --git a/lib/src/main/java/de/tavendo/autobahn/WebSocketReader.java b/lib/src/main/java/de/tavendo/autobahn/WebSocketReader.java new file mode 100644 index 0000000..2214d9e --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/WebSocketReader.java @@ -0,0 +1,674 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package de.tavendo.autobahn; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; + +import android.os.Handler; +import android.os.Message; +import android.util.Log; +import android.util.Pair; +import de.tavendo.autobahn.WebSocketMessage.WebSocketCloseCode; + +/** + * WebSocket reader, the receiving leg of a WebSockets connection. + * This runs on it's own background thread and posts messages to master + * thread's message queue for there to be consumed by the application. + * The only method that needs to be called (from foreground thread) is quit(), + * which gracefully shuts down the background receiver thread. + */ +public class WebSocketReader extends Thread { + private static final String TAG = WebSocketReader.class.getCanonicalName(); + + private static enum ReaderState { + STATE_CLOSED, + STATE_CONNECTING, + STATE_CLOSING, + STATE_OPEN + } + + private final Handler mWebSocketConnectionHandler; + private final Socket mSocket; + private InputStream mInputStream; + private final WebSocketOptions mWebSocketOptions; + + private volatile boolean mStopped = false; + + + private final byte[] mNetworkBuffer; + private final ByteBuffer mApplicationBuffer; + private NoCopyByteArrayOutputStream mMessagePayload; + + private ReaderState mState; + + private boolean mInsideMessage = false; + private int mMessageOpcode; + + private WebSocketFrameHeader mFrameHeader; + private Utf8Validator mUTF8Validator = new Utf8Validator(); + + + + + /** + * Create new WebSockets background reader. + * + * @param master The message handler of master (foreground thread). + * @param socket The socket channel created on foreground thread. + */ + public WebSocketReader(Handler master, Socket socket, WebSocketOptions options, String threadName) { + super(threadName); + + this.mWebSocketConnectionHandler = master; + + this.mSocket = socket; + this.mWebSocketOptions = options; + + this.mNetworkBuffer = new byte[4096]; + this.mApplicationBuffer = ByteBuffer.allocateDirect(options.getMaxFramePayloadSize() + 14); + this.mMessagePayload = new NoCopyByteArrayOutputStream(options.getMaxMessagePayloadSize()); + + this.mFrameHeader = null; + this.mState = ReaderState.STATE_CONNECTING; + + Log.d(TAG, "WebSocket reader created."); + } + + + /** + * Graceful shutdown of background reader thread (called from master). + */ + public void quit() { + + mStopped = true; + + Log.d(TAG, "quit"); + } + + + /** + * Notify the master (foreground thread) of WebSockets message received + * and unwrapped. + * + * @param message Message to send to master. + */ + protected void notify(Object message) { + + Message msg = mWebSocketConnectionHandler.obtainMessage(); + msg.obj = message; + mWebSocketConnectionHandler.sendMessage(msg); + } + + + /** + * Process incoming WebSockets data (after handshake). + */ + private boolean processData() throws Exception { + + // outside frame? + if (mFrameHeader == null) { + + // need at least 2 bytes from WS frame header to start processing + if (mApplicationBuffer.position() >= 2) { + + byte b0 = mApplicationBuffer.get(0); + boolean fin = (b0 & 0x80) != 0; + int rsv = (b0 & 0x70) >> 4; + int opcode = b0 & 0x0f; + + byte b1 = mApplicationBuffer.get(1); + boolean masked = (b1 & 0x80) != 0; + int payload_len1 = b1 & 0x7f; + + // now check protocol compliance + + if (rsv != 0) { + throw new WebSocketException("RSV != 0 and no extension negotiated"); + } + + if (masked) { + // currently, we don't allow this. need to see whats the final spec. + throw new WebSocketException("masked server frame"); + } + + if (opcode > 7) { + // control frame + if (!fin) { + throw new WebSocketException("fragmented control frame"); + } + if (payload_len1 > 125) { + throw new WebSocketException("control frame with payload length > 125 octets"); + } + if (opcode != 8 && opcode != 9 && opcode != 10) { + throw new WebSocketException("control frame using reserved opcode " + opcode); + } + if (opcode == 8 && payload_len1 == 1) { + throw new WebSocketException("received close control frame with payload len 1"); + } + } else { + // message frame + if (opcode != 0 && opcode != 1 && opcode != 2) { + throw new WebSocketException("data frame using reserved opcode " + opcode); + } + if (!mInsideMessage && opcode == 0) { + throw new WebSocketException("received continuation data frame outside fragmented message"); + } + if (mInsideMessage && opcode != 0) { + throw new WebSocketException("received non-continuation data frame while inside fragmented message"); + } + } + + int mask_len = masked ? 4 : 0; + int header_len = 0; + + if (payload_len1 < 126) { + header_len = 2 + mask_len; + } else if (payload_len1 == 126) { + header_len = 2 + 2 + mask_len; + } else if (payload_len1 == 127) { + header_len = 2 + 8 + mask_len; + } else { + // should not arrive here + throw new Exception("logic error"); + } + + // continue when complete frame header is available + if (mApplicationBuffer.position() >= header_len) { + + // determine frame payload length + int i = 2; + long payload_len = 0; + if (payload_len1 == 126) { + payload_len = ((0xff & mApplicationBuffer.get(i)) << 8) | (0xff & mApplicationBuffer.get(i+1)); + if (payload_len < 126) { + throw new WebSocketException("invalid data frame length (not using minimal length encoding)"); + } + i += 2; + } else if (payload_len1 == 127) { + if ((0x80 & mApplicationBuffer.get(i+0)) != 0) { + throw new WebSocketException("invalid data frame length (> 2^63)"); + } + payload_len = ((0xff & mApplicationBuffer.get(i+0)) << 56) | + ((0xff & mApplicationBuffer.get(i+1)) << 48) | + ((0xff & mApplicationBuffer.get(i+2)) << 40) | + ((0xff & mApplicationBuffer.get(i+3)) << 32) | + ((0xff & mApplicationBuffer.get(i+4)) << 24) | + ((0xff & mApplicationBuffer.get(i+5)) << 16) | + ((0xff & mApplicationBuffer.get(i+6)) << 8) | + ((0xff & mApplicationBuffer.get(i+7)) ); + if (payload_len < 65536) { + throw new WebSocketException("invalid data frame length (not using minimal length encoding)"); + } + i += 8; + } else { + payload_len = payload_len1; + } + + // immediately bail out on frame too large + if (payload_len > mWebSocketOptions.getMaxFramePayloadSize()) { + throw new WebSocketException("frame payload too large"); + } + + // save frame header metadata + mFrameHeader = new WebSocketFrameHeader(); + mFrameHeader.setOpcode(opcode); + mFrameHeader.setFin(fin); + mFrameHeader.setReserved(rsv); + mFrameHeader.setPayloadLength((int) payload_len); + mFrameHeader.setHeaderLength(header_len); + mFrameHeader.setTotalLen(mFrameHeader.getHeaderLength() + mFrameHeader.getPayloadLength()); + + if (masked) { + byte[] mask = new byte[4]; + for (int j = 0; j < 4; ++j) { + mask[i] = (byte) (0xff & mApplicationBuffer.get(i + j)); + } + mFrameHeader.setMask(mask); + + i += 4; + } else { + mFrameHeader.setMask(null); + } + + // continue processing when payload empty or completely buffered + return mFrameHeader.getPayloadLength() == 0 || mApplicationBuffer.position() >= mFrameHeader.getTotalLength(); + + } else { + + // need more data + return false; + } + } else { + + // need more data + return false; + } + + } else { + + /// \todo refactor this for streaming processing, incl. fail fast on invalid UTF-8 within frame already + + // within frame + + // see if we buffered complete frame + if (mApplicationBuffer.position() >= mFrameHeader.getTotalLength()) { + + // cut out frame payload + byte[] framePayload = null; + int oldPosition = mApplicationBuffer.position(); + if (mFrameHeader.getPayloadLength() > 0) { + framePayload = new byte[mFrameHeader.getPayloadLength()]; + mApplicationBuffer.position(mFrameHeader.getHeaderLength()); + mApplicationBuffer.get(framePayload, 0, (int) mFrameHeader.getPayloadLength()); + } + mApplicationBuffer.position(mFrameHeader.getTotalLength()); + mApplicationBuffer.limit(oldPosition); + mApplicationBuffer.compact(); + + if (mFrameHeader.getOpcode() > 7) { + // control frame + + if (mFrameHeader.getOpcode() == 8) { + + int code = WebSocketCloseCode.RESERVED_NO_STATUS; + String reason = null; + + if (mFrameHeader.getPayloadLength() >= 2) { + + // parse and check close code + code = (framePayload[0] & 0xff) * 256 + (framePayload[1] & 0xff); + if (code < 1000 + || (code >= 1000 && code <= 2999 && + code != 1000 && code != 1001 && code != 1002 && code != 1003 && code != 1007 && code != 1008 && code != 1009 && code != 1010 && code != 1011) + || code >= 5000) { + + throw new WebSocketException("invalid close code " + code); + } + + // parse and check close reason + if (mFrameHeader.getPayloadLength() > 2) { + + byte[] ra = new byte[mFrameHeader.getPayloadLength() - 2]; + System.arraycopy(framePayload, 2, ra, 0, mFrameHeader.getPayloadLength() - 2); + + Utf8Validator val = new Utf8Validator(); + val.validate(ra); + if (!val.isValid()) { + throw new WebSocketException("invalid close reasons (not UTF-8)"); + } else { + reason = new String(ra, WebSocket.UTF8_ENCODING); + } + } + } + onClose(code, reason); + + } else if (mFrameHeader.getOpcode() == 9) { + // dispatch WS ping + onPing(framePayload); + + } else if (mFrameHeader.getOpcode() == 10) { + // dispatch WS pong + onPong(framePayload); + + } else { + + // should not arrive here (handled before) + throw new Exception("logic error"); + } + + } else { + // message frame + + if (!mInsideMessage) { + // new message started + mInsideMessage = true; + mMessageOpcode = mFrameHeader.getOpcode(); + if (mMessageOpcode == 1 && mWebSocketOptions.getValidateIncomingUtf8()) { + mUTF8Validator.reset(); + } + } + + if (framePayload != null) { + + // immediately bail out on message too large + if (mMessagePayload.size() + framePayload.length > mWebSocketOptions.getMaxMessagePayloadSize()) { + throw new WebSocketException("message payload too large"); + } + + // validate incoming UTF-8 + if (mMessageOpcode == 1 && mWebSocketOptions.getValidateIncomingUtf8() && !mUTF8Validator.validate(framePayload)) { + throw new WebSocketException("invalid UTF-8 in text message payload"); + } + + // buffer frame payload for message + mMessagePayload.write(framePayload); + } + + // on final frame .. + if (mFrameHeader.isFin()) { + + if (mMessageOpcode == 1) { + + // verify that UTF-8 ends on codepoint + if (mWebSocketOptions.getValidateIncomingUtf8() && !mUTF8Validator.isValid()) { + throw new WebSocketException("UTF-8 text message payload ended within Unicode code point"); + } + + // deliver text message + if (mWebSocketOptions.getReceiveTextMessagesRaw()) { + + // dispatch WS text message as raw (but validated) UTF-8 + onRawTextMessage(mMessagePayload.toByteArray()); + + } else { + + // dispatch WS text message as Java String (previously already validated) + String s = new String(mMessagePayload.toByteArray(), WebSocket.UTF8_ENCODING); + onTextMessage(s); + } + + } else if (mMessageOpcode == 2) { + + // dispatch WS binary message + onBinaryMessage(mMessagePayload.toByteArray()); + + } else { + + // should not arrive here (handled before) + throw new Exception("logic error"); + } + + // ok, message completed - reset all + mInsideMessage = false; + mMessagePayload.reset(); + } + } + + // reset frame + mFrameHeader = null; + + // reprocess if more data left + return mApplicationBuffer.position() > 0; + + } else { + + // need more data + return false; + } + } + } + + + /** + * WebSockets handshake reply from server received, default notifies master. + * + * @param success Success handshake flag + */ + protected void onHandshake(boolean success) { + + notify(new WebSocketMessage.ServerHandshake(success)); + } + + + /** + * WebSockets close received, default notifies master. + */ + protected void onClose(int code, String reason) { + + notify(new WebSocketMessage.Close(code, reason)); + } + + + /** + * WebSockets ping received, default notifies master. + * + * @param payload Ping payload or null. + */ + protected void onPing(byte[] payload) { + + notify(new WebSocketMessage.Ping(payload)); + } + + + /** + * WebSockets pong received, default notifies master. + * + * @param payload Pong payload or null. + */ + protected void onPong(byte[] payload) { + + notify(new WebSocketMessage.Pong(payload)); + } + + + /** + * WebSockets text message received, default notifies master. + * This will only be called when the option receiveTextMessagesRaw + * HAS NOT been set. + * + * @param payload Text message payload as Java String decoded + * from raw UTF-8 payload or null (empty payload). + */ + protected void onTextMessage(String payload) { + + notify(new WebSocketMessage.TextMessage(payload)); + } + + + /** + * WebSockets text message received, default notifies master. + * This will only be called when the option receiveTextMessagesRaw + * HAS been set. + * + * @param payload Text message payload as raw UTF-8 octets or + * null (empty payload). + */ + protected void onRawTextMessage(byte[] payload) { + + notify(new WebSocketMessage.RawTextMessage(payload)); + } + + + /** + * WebSockets binary message received, default notifies master. + * + * @param payload Binary message payload or null (empty payload). + */ + protected void onBinaryMessage(byte[] payload) { + + notify(new WebSocketMessage.BinaryMessage(payload)); + } + + + /** + * Process WebSockets handshake received from server. + */ + private boolean processHandshake() throws UnsupportedEncodingException { + + boolean res = false; + for (int pos = mApplicationBuffer.position() - 4; pos >= 0; --pos) { + if (mApplicationBuffer.get(pos+0) == 0x0d && + mApplicationBuffer.get(pos+1) == 0x0a && + mApplicationBuffer.get(pos+2) == 0x0d && + mApplicationBuffer.get(pos+3) == 0x0a) { + + /// \todo process & verify handshake from server + /// \todo forward subprotocol, if any + + int oldPosition = mApplicationBuffer.position(); + + // Check HTTP status code + boolean serverError = false; + if (mApplicationBuffer.get(0) == 'H' && + mApplicationBuffer.get(1) == 'T' && + mApplicationBuffer.get(2) == 'T' && + mApplicationBuffer.get(3) == 'P') { + + Pair status = parseHTTPStatus(); + if (status.first >= 300) { + // Invalid status code for success connection + notify(new WebSocketMessage.ServerError(status.first, status.second)); + serverError = true; + } + } + + mApplicationBuffer.position(pos + 4); + mApplicationBuffer.limit(oldPosition); + mApplicationBuffer.compact(); + + if (!serverError) { + // process further when data after HTTP headers left in buffer + res = mApplicationBuffer.position() > 0; + + mState = ReaderState.STATE_OPEN; + } else { + res = true; + mState = ReaderState.STATE_CLOSED; + mStopped = true; + } + + onHandshake(!serverError); + break; + } + } + return res; + } + + private Pair parseHTTPStatus() throws UnsupportedEncodingException { + int beg, end; + // Find first space + for (beg = 4; beg < mApplicationBuffer.position(); ++beg) { + if (mApplicationBuffer.get(beg) == ' ') break; + } + // Find second space + for (end = beg + 1; end < mApplicationBuffer.position(); ++end) { + if (mApplicationBuffer.get(end) == ' ') break; + } + // Parse status code between them + ++beg; + int statusCode = 0; + for (int i = 0; beg + i < end; ++i) { + int digit = (mApplicationBuffer.get(beg + i) - 0x30); + statusCode *= 10; + statusCode += digit; + } + // Find end of line to extract error message + ++end; + int eol; + for (eol = end; eol < mApplicationBuffer.position(); ++eol) { + if (mApplicationBuffer.get(eol) == 0x0d) break; + } + int statusMessageLength = eol - end; + byte[] statusBuf = new byte[statusMessageLength]; + mApplicationBuffer.position(end); + mApplicationBuffer.get(statusBuf, 0, statusMessageLength); + String statusMessage = new String(statusBuf, WebSocket.UTF8_ENCODING); + Log.w(TAG, String.format("Status: %d (%s)", statusCode, statusMessage)); + return new Pair(statusCode, statusMessage); + } + + + /** + * Consume data buffered in mFrameBuffer. + */ + private boolean consumeData() throws Exception { + switch (mState) { + case STATE_OPEN: + case STATE_CLOSING: + return processData(); + case STATE_CLOSED: + return false; + case STATE_CONNECTING: + return processHandshake(); + default: + return false; + } + } + + + /** + * Run the background reader thread loop. + */ + @Override + public void run() { + synchronized (this) { + notifyAll(); + } + + InputStream inputStream = null; + try { + inputStream = mSocket.getInputStream(); + } catch (IOException e) { + Log.e(TAG, e.getLocalizedMessage()); + return; + } + + this.mInputStream = inputStream; + + Log.d(TAG, "WebSocker reader running."); + mApplicationBuffer.clear(); + + while (!mStopped) { + try { + + int bytesRead = mInputStream.read(mNetworkBuffer); + if (bytesRead > 0) { + mApplicationBuffer.put(mNetworkBuffer, 0, bytesRead); + while (consumeData()) { + } + } else if (bytesRead == -1) { + Log.d(TAG, "run() : ConnectionLost"); + + notify(new WebSocketMessage.ConnectionLost()); + this.mStopped = true; + } else { + Log.e(TAG, "WebSocketReader read() failed."); + } + + } catch (WebSocketException e) { + Log.d(TAG, "run() : WebSocketException (" + e.toString() + ")"); + + // wrap the exception and notify master + notify(new WebSocketMessage.ProtocolViolation(e)); + } catch (SocketException e) { + Log.d(TAG, "run() : SocketException (" + e.toString() + ")"); + + // wrap the exception and notify master + notify(new WebSocketMessage.ConnectionLost()); + } catch (IOException e) { + Log.d(TAG, "run() : IOException (" + e.toString() + ")"); + + notify(new WebSocketMessage.ConnectionLost()); + } catch (Exception e) { + Log.d(TAG, "run() : Exception (" + e.toString() + ")"); + + // wrap the exception and notify master + notify(new WebSocketMessage.Error(e)); + } + } + + + Log.d(TAG, "WebSocket reader ended."); + } +} diff --git a/lib/src/main/java/de/tavendo/autobahn/WebSocketWriter.java b/lib/src/main/java/de/tavendo/autobahn/WebSocketWriter.java new file mode 100644 index 0000000..bdd6707 --- /dev/null +++ b/lib/src/main/java/de/tavendo/autobahn/WebSocketWriter.java @@ -0,0 +1,462 @@ +/****************************************************************************** + * + * Copyright 2011-2012 Tavendo GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package de.tavendo.autobahn; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.ref.WeakReference; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.util.Random; + +import android.os.Handler; +import android.os.Looper; +import android.os.Message; +import android.util.Base64; +import android.util.Log; + +/** + * WebSocket writer, the sending leg of a WebSockets connection. + * This is run on it's background thread with it's own message loop. + * The only method that needs to be called (from foreground thread) is forward(), + * which is used to forward a WebSockets message to this object (running on + * background thread) so that it can be formatted and sent out on the + * underlying TCP socket. + */ +public class WebSocketWriter extends Thread { + private static final String TAG = WebSocketWriter.class.getCanonicalName(); + + private static final int WEB_SOCKETS_VERSION = 13; + private static final String CRLF = "\r\n"; + + private final Random mRandom = new Random(); + private final Handler mWebSocketConnectionHandler; + private final WebSocketOptions mWebSocketOptions; + private final ByteBuffer mApplicationBuffer; + private final Socket mSocket; + + private OutputStream mOutputStream; + + private Handler mHandler; + + + /** + * Create new WebSockets background writer. + * + * @param looper The message looper of the background thread on which + * this object is running. + * @param master The message handler of master (foreground thread). + * @param socket The socket channel created on foreground thread. + * @param options WebSockets connection options. + */ + public WebSocketWriter(Handler master, Socket socket, WebSocketOptions options, String threadName) { + super(threadName); + + this.mWebSocketConnectionHandler = master; + this.mWebSocketOptions = options; + this.mSocket = socket; + + this.mApplicationBuffer = ByteBuffer.allocate(options.getMaxFramePayloadSize() + 14); + + Log.d(TAG, "WebSocket writer created."); + } + + + /** + * Call this from the foreground (UI) thread to make the writer + * (running on background thread) send a WebSocket message on the + * underlying TCP. + * + * @param message Message to send to WebSockets writer. An instance of the message + * classes inside WebSocketMessage or another type which then needs + * to be handled within processAppMessage() (in a class derived from + * this class). + */ + public void forward(Object message) { + Message msg = mHandler.obtainMessage(); + msg.obj = message; + mHandler.sendMessage(msg); + } + + + /** + * Notify the master (foreground thread). + * + * @param message Message to send to master. + */ + private void notify(Object message) { + Message msg = mWebSocketConnectionHandler.obtainMessage(); + msg.obj = message; + mWebSocketConnectionHandler.sendMessage(msg); + } + + + /** + * Create new key for WebSockets handshake. + * + * @return WebSockets handshake key (Base64 encoded). + */ + private String newHandshakeKey() { + final byte[] ba = new byte[16]; + mRandom.nextBytes(ba); + return Base64.encodeToString(ba, Base64.NO_WRAP); + } + + + /** + * Create new (random) frame mask. + * + * @return Frame mask (4 octets). + */ + private byte[] newFrameMask() { + final byte[] ba = new byte[4]; + mRandom.nextBytes(ba); + return ba; + } + + + /** + * Send WebSocket client handshake. + */ + private void sendClientHandshake(WebSocketMessage.ClientHandshake message) throws IOException { + String path = message.getURI().getPath(); + if (path == null || path.length() == 0) { + path = "/"; + } + + String query = message.getURI().getQuery(); + if (query != null && query.length() > 0) { + path = path + "?" + query; + } + + mApplicationBuffer.put(("GET " + path + " HTTP/1.1" + CRLF).getBytes()); + mApplicationBuffer.put(("Host: " + message.getURI().getHost() + CRLF).getBytes()); + mApplicationBuffer.put(("Upgrade: WebSocket" + CRLF).getBytes()); + mApplicationBuffer.put(("Connection: Upgrade" + CRLF).getBytes()); + mApplicationBuffer.put(("Sec-WebSocket-Key: " + newHandshakeKey() + CRLF).getBytes()); + + if (message.getOrigin() != null) { + mApplicationBuffer.put(("Origin: " + message.getOrigin().toString() + CRLF).getBytes()); + } + + if (message.getSubprotocols() != null && message.getSubprotocols().length > 0) { + mApplicationBuffer.put(("Sec-WebSocket-Protocol: ").getBytes()); + for (int i = 0; i < message.getSubprotocols().length; ++i) { + mApplicationBuffer.put((message.getSubprotocols()[i]).getBytes()); + mApplicationBuffer.put((", ").getBytes()); + } + mApplicationBuffer.put((CRLF).getBytes()); + } + + mApplicationBuffer.put(("Sec-WebSocket-Version: " + WEB_SOCKETS_VERSION + CRLF).getBytes()); + mApplicationBuffer.put((CRLF).getBytes()); + } + + + /** + * Send WebSockets close. + */ + private void sendClose(WebSocketMessage.Close message) throws IOException, WebSocketException { + if (message.getCode() > 0) { + byte[] payload = null; + + if (message.getReason() != null && !(message.getReason().length() > 0)) { + byte[] pReason = message.getReason().getBytes(WebSocket.UTF8_ENCODING); + payload = new byte[2 + pReason.length]; + for (int i = 0; i < pReason.length; ++i) { + payload[i + 2] = pReason[i]; + } + } else { + payload = new byte[2]; + } + + if (payload != null && payload.length > 125) { + throw new WebSocketException("close payload exceeds 125 octets"); + } + + payload[0] = (byte)((message.getCode() >> 8) & 0xff); + payload[1] = (byte)(message.getCode() & 0xff); + + sendFrame(8, true, payload); + } else { + sendFrame(8, true, null); + } + } + + + /** + * Send WebSockets ping. + */ + private void sendPing(WebSocketMessage.Ping message) throws IOException, WebSocketException { + if (message.mPayload != null && message.mPayload.length > 125) { + throw new WebSocketException("ping payload exceeds 125 octets"); + } + sendFrame(9, true, message.mPayload); + } + + + /** + * Send WebSockets pong. Normally, unsolicited Pongs are not used, + * but Pongs are only send in response to a Ping from the peer. + */ + private void sendPong(WebSocketMessage.Pong message) throws IOException, WebSocketException { + if (message.mPayload != null && message.mPayload.length > 125) { + throw new WebSocketException("pong payload exceeds 125 octets"); + } + sendFrame(10, true, message.mPayload); + } + + + /** + * Send WebSockets binary message. + */ + private void sendBinaryMessage(WebSocketMessage.BinaryMessage message) throws IOException, WebSocketException { + if (message.mPayload.length > mWebSocketOptions.getMaxMessagePayloadSize()) { + throw new WebSocketException("message payload exceeds payload limit"); + } + sendFrame(2, true, message.mPayload); + } + + + /** + * Send WebSockets text message. + */ + private void sendTextMessage(WebSocketMessage.TextMessage message) throws IOException, WebSocketException { + byte[] payload = message.mPayload.getBytes(WebSocket.UTF8_ENCODING); + if (payload.length > mWebSocketOptions.getMaxMessagePayloadSize()) { + throw new WebSocketException("message payload exceeds payload limit"); + } + sendFrame(1, true, payload); + } + + + /** + * Send WebSockets binary message. + */ + private void sendRawTextMessage(WebSocketMessage.RawTextMessage message) throws IOException, WebSocketException { + if (message.mPayload.length > mWebSocketOptions.getMaxMessagePayloadSize()) { + throw new WebSocketException("message payload exceeds payload limit"); + } + sendFrame(1, true, message.mPayload); + } + + + /** + * Sends a WebSockets frame. Only need to use this method in derived classes which implement + * more message types in processAppMessage(). You need to know what you are doing! + * + * @param opcode The WebSocket frame opcode. + * @param fin FIN flag for WebSocket frame. + * @param payload Frame payload or null. + */ + protected void sendFrame(int opcode, boolean fin, byte[] payload) throws IOException { + if (payload != null) { + sendFrame(opcode, fin, payload, 0, payload.length); + } else { + sendFrame(opcode, fin, null, 0, 0); + } + } + + + /** + * Sends a WebSockets frame. Only need to use this method in derived classes which implement + * more message types in processAppMessage(). You need to know what you are doing! + * + * @param opcode The WebSocket frame opcode. + * @param fin FIN flag for WebSocket frame. + * @param payload Frame payload or null. + * @param offset Offset within payload of the chunk to send. + * @param length Length of the chunk within payload to send. + */ + protected void sendFrame(int opcode, boolean fin, byte[] payload, int offset, int length) throws IOException { + // first octet + byte b0 = 0; + if (fin) { + b0 |= (byte) (1 << 7); + } + b0 |= (byte) opcode; + mApplicationBuffer.put(b0); + + // second octet + byte b1 = 0; + if (mWebSocketOptions.getMaskClientFrames()) { + b1 = (byte) (1 << 7); + } + + long len = length; + + // extended payload length + if (len <= 125) { + b1 |= (byte) len; + mApplicationBuffer.put(b1); + } else if (len <= 0xffff) { + b1 |= (byte) (126 & 0xff); + mApplicationBuffer.put(b1); + mApplicationBuffer.put(new byte[] {(byte)((len >> 8) & 0xff), (byte)(len & 0xff)}); + } else { + b1 |= (byte) (127 & 0xff); + mApplicationBuffer.put(b1); + mApplicationBuffer.put(new byte[] {(byte)((len >> 56) & 0xff), + (byte)((len >> 48) & 0xff), + (byte)((len >> 40) & 0xff), + (byte)((len >> 32) & 0xff), + (byte)((len >> 24) & 0xff), + (byte)((len >> 16) & 0xff), + (byte)((len >> 8) & 0xff), + (byte)(len & 0xff)}); + } + + byte mask[] = null; + if (mWebSocketOptions.getMaskClientFrames()) { + // a mask is always needed, even without payload + mask = newFrameMask(); + mApplicationBuffer.put(mask[0]); + mApplicationBuffer.put(mask[1]); + mApplicationBuffer.put(mask[2]); + mApplicationBuffer.put(mask[3]); + } + + if (len > 0) { + if (mWebSocketOptions.getMaskClientFrames()) { + /// \todo optimize masking + /// \todo masking within buffer of output stream + for (int i = 0; i < len; ++i) { + payload[i + offset] ^= mask[i % 4]; + } + } + mApplicationBuffer.put(payload, offset, length); + } + } + + /** + * Process WebSockets or control message from master. Normally, + * there should be no reason to override this. If you do, you + * need to know what you are doing. + * + * @param msg An instance of the message types within WebSocketMessage + * or a message that is handled in processAppMessage(). + */ + protected void processMessage(Object msg) throws IOException, WebSocketException { + + if (msg instanceof WebSocketMessage.TextMessage) { + sendTextMessage((WebSocketMessage.TextMessage) msg); + } else if (msg instanceof WebSocketMessage.RawTextMessage) { + sendRawTextMessage((WebSocketMessage.RawTextMessage) msg); + } else if (msg instanceof WebSocketMessage.BinaryMessage) { + sendBinaryMessage((WebSocketMessage.BinaryMessage) msg); + } else if (msg instanceof WebSocketMessage.Ping) { + sendPing((WebSocketMessage.Ping) msg); + } else if (msg instanceof WebSocketMessage.Pong) { + sendPong((WebSocketMessage.Pong) msg); + } else if (msg instanceof WebSocketMessage.Close) { + sendClose((WebSocketMessage.Close) msg); + } else if (msg instanceof WebSocketMessage.ClientHandshake) { + sendClientHandshake((WebSocketMessage.ClientHandshake) msg); + } else if (msg instanceof WebSocketMessage.Quit) { + Looper.myLooper().quit(); + + Log.d(TAG, "WebSocket writer ended."); + } else { + processAppMessage(msg); + } + } + + public void writeMessageToBuffer(Message message) { + try { + mApplicationBuffer.clear(); + processMessage(message.obj); + mApplicationBuffer.flip(); + + mOutputStream.write(mApplicationBuffer.array(), mApplicationBuffer.position(), mApplicationBuffer.limit()); + } catch (SocketException e) { + Log.e(TAG, "run() : SocketException (" + e.toString() + ")"); + + notify(new WebSocketMessage.ConnectionLost()); + } catch (IOException e) { + Log.e(TAG, "run() : IOException (" + e.toString() + ")"); + + } catch (Exception e) { + notify(new WebSocketMessage.Error(e)); + } + } + + /** + * Process message other than plain WebSockets or control message. + * This is intended to be overridden in derived classes. + * + * @param msg Message from foreground thread to process. + */ + protected void processAppMessage(Object msg) throws WebSocketException, IOException { + throw new WebSocketException("unknown message received by WebSocketWriter"); + } + + + + // Thread method overrides + @Override + public void run() { + OutputStream outputStream = null; + try { + outputStream = mSocket.getOutputStream(); + } catch (IOException e) { + Log.e(TAG, e.getLocalizedMessage()); + } + + this.mOutputStream = outputStream; + + Looper.prepare(); + + this.mHandler = new ThreadHandler(this); + + synchronized (this) { + Log.d(TAG, "WebSocker writer running."); + + notifyAll(); + } + + Looper.loop(); + } + + + + // + // Private handler class + private static class ThreadHandler extends Handler { + private final WeakReference mWebSocketWriterReference; + + + + public ThreadHandler(WebSocketWriter webSocketWriter) { + super(); + + this.mWebSocketWriterReference = new WeakReference(webSocketWriter); + } + + + + @Override + public void handleMessage(Message message) { + WebSocketWriter webSocketWriter = mWebSocketWriterReference.get(); + if (webSocketWriter != null) { + webSocketWriter.writeMessageToBuffer(message); + } + } + } +} diff --git a/lib/src/main/java/im/delight/android/ddp/Listener.java b/lib/src/main/java/im/delight/android/ddp/Listener.java new file mode 100644 index 0000000..d50f117 --- /dev/null +++ b/lib/src/main/java/im/delight/android/ddp/Listener.java @@ -0,0 +1,19 @@ +package im.delight.android.ddp; + +/** + * Copyright 2014 www.delight.im + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public interface Listener { } diff --git a/lib/src/main/java/im/delight/android/ddp/Meteor.java b/lib/src/main/java/im/delight/android/ddp/Meteor.java new file mode 100644 index 0000000..35c38b9 --- /dev/null +++ b/lib/src/main/java/im/delight/android/ddp/Meteor.java @@ -0,0 +1,1125 @@ +package im.delight.android.ddp; + +/** + * Copyright 2014 www.delight.im + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import android.content.Context; +import android.content.SharedPreferences; + +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.JsonNode; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; + +import de.tavendo.autobahn.WebSocket; +import de.tavendo.autobahn.WebSocketConnection; +import de.tavendo.autobahn.WebSocketException; +import de.tavendo.autobahn.WebSocketOptions; + +/** Client that connects to Meteor servers implementing the DDP protocol */ +public class Meteor { + + /** Supported versions of the DDP protocol in order of preference */ + private static final String[] SUPPORTED_DDP_VERSIONS = { "1", "pre2", "pre1" }; + /** Whether logging should be enabled or not (behaviour can be adjusted in log() method */ + private static final boolean LOGGING_ENABLED = true; + /** The maximum number of attempts to re-connect to the server over WebSocket */ + private static final int RECONNECT_ATTEMPTS_MAX = 5; + /** Instance of Jackson library's ObjectMapper that converts between JSON and Java objects (POJOs) */ + private static final ObjectMapper mObjectMapper = new ObjectMapper(); + /** The WebSocket connection that will be used for the data transfer */ + private final WebSocketConnection mConnection; + /** The callback that handles messages and events received from the WebSocket connection */ + private final WebSocket.WebSocketConnectionObserver mWebSocketObserver; + /** The web socket options */ + private WebSocketOptions mWebSocketOptions; + /** Map that tracks all pending Listener instances */ + private final Map mListeners; + /** Messages that couldn't be dispatched yet and thus had to be queued */ + private final Queue mQueuedMessages; + private final Context mContext; + private String mServerUri; + private String mDdpVersion; + /** The number of unsuccessful attempts to re-connect in sequence */ + private int mReconnectAttempts; + /** The callback that will handle events and receive messages from this client */ + private MeteorCallback mCallback; + private String mSessionID; + private boolean mConnected; + private String mLoggedInUserId; + + /** + * Returns a new instance for a client connecting to a server via DDP over websocket + * + * The server URI should usually be in the form of `ws://example.meteor.com/websocket` + * or `wss://example.meteor.com/websocket` + * + * @param context a `Context` reference (e.g. an `Activity` or `Service` instance) + * @param serverUri the server URI to connect to + */ + public Meteor(final Context context, final String serverUri) { + this(context, serverUri, null); + } + + /** + * Returns a new instance for a client connecting to a server via DDP over websocket + * + * The server URI should usually be in the form of `ws://example.meteor.com/websocket` + * or `wss://example.meteor.com/websocket` + * + * @param context a `Context` reference (e.g. an `Activity` or `Service` instance) + * @param serverUri the server URI to connect to + * @param protocolVersion the desired DDP protocol version, default version if null given + */ + public Meteor(final Context context, final String serverUri, final String protocolVersion) { + this(context, serverUri, protocolVersion, null); + } + + /** + * Returns a new instance for a client connecting to a server via DDP over websocket + * + * The server URI should usually be in the form of `ws://example.meteor.com/websocket` + * or `wss://example.meteor.com/websocket` + * + * @param context a `Context` reference (e.g. an `Activity` or `Service` instance) + * @param serverUri the server URI to connect to + * @param protocolVersion the desired DDP protocol version, default version if null given + * @param webSocketOptions web socket options + */ + public Meteor(final Context context, final String serverUri, String protocolVersion, + WebSocketOptions webSocketOptions) { + + if (protocolVersion == null) { + protocolVersion = SUPPORTED_DDP_VERSIONS[0]; + } else if (!isVersionSupported(protocolVersion)) { + throw new RuntimeException("DDP protocol version not supported: "+protocolVersion); + } + + if (context == null) { + throw new RuntimeException("The context reference may not be null"); + } + + // save the context reference + mContext = context.getApplicationContext(); + + // create a new WebSocket connection for the data transfer + mConnection = new WebSocketConnection(); + + mWebSocketOptions = webSocketOptions; + + // create a new handler that processes the messages and events received from the WebSocket connection + mWebSocketObserver = new WebSocket.WebSocketConnectionObserver() { + + @Override + public void onOpen() { + log("onOpen()"); + mConnected = true; + mReconnectAttempts = 0; + connect(mSessionID); + } + + @Override + public void onClose(WebSocketCloseNotification code, String reason) { + log("onClose()"); + final boolean lostConnection = mConnected; + mConnected = false; + if (lostConnection) { + mReconnectAttempts++; + if (mReconnectAttempts <= RECONNECT_ATTEMPTS_MAX) { + // try to re-connect automatically + openConnection(false); + } + else { + disconnect(); + } + } + + if (mCallback != null) { + mCallback.onDisconnect(code, reason); + } + } + + @Override + public void onTextMessage(String payload) { + handleMessage(payload); + } + + @Override + public void onRawTextMessage(byte[] payload) { + + } + + @Override + public void onBinaryMessage(byte[] payload) { + + } + + }; + + // create a map that holds the pending Listener instances + mListeners = new HashMap(); + + // create a queue that holds undispatched messages waiting to be sent + mQueuedMessages = new ConcurrentLinkedQueue(); + + // save the server URI + mServerUri = serverUri; + // try with the preferred DDP protocol version first + mDdpVersion = protocolVersion; + // count the number of failed attempts to re-connect + mReconnectAttempts = 0; + + openConnection(false); + } + + /** + * Returns whether this client is connected or not + * + * @return whether this client is connected + */ + public boolean isConnected() { + return mConnected; + } + + /** Manually attempt to re-connect if necessary */ + public void reconnect() { + openConnection(true); + } + + /** + * Opens a connection to the server over websocket + * + * @param isReconnect whether this is a re-connect attempt or not + */ + public void openConnection(final boolean isReconnect) { + if (isReconnect) { + if (mConnection.isConnected()) { + connect(mSessionID); + return; + } + } + + try { + mConnection.connect(new URI(mServerUri), mWebSocketObserver, mWebSocketOptions); + } + catch (WebSocketException | URISyntaxException e) { + if (mCallback != null) { + mCallback.onException(e); + } + } + } + + /** + * Establish the connection to the server as requested by the DDP protocol (after the websocket has been opened) + * + * @param existingSessionID an existing session ID or `null` + */ + private void connect(final String existingSessionID) { + final Map data = new HashMap<>(); + data.put(Protocol.Field.MESSAGE, Protocol.Message.CONNECT); + data.put(Protocol.Field.VERSION, mDdpVersion); + data.put(Protocol.Field.SUPPORT, SUPPORTED_DDP_VERSIONS); + if (existingSessionID != null) { + data.put(Protocol.Field.SESSION, existingSessionID); + } + send(data); + } + + /** Disconnect the client from the server */ + public void disconnect() { + mConnected = false; + mListeners.clear(); + mSessionID = null; + mCallback = null; + try { + mConnection.disconnect(); + } + catch (Exception e) { } + } + + /** + * Sends a Java object (POJO) over the websocket after serializing it with the Jackson library + * + * @param obj the Java object to send + */ + private void send(final Object obj) { + // serialize the object to JSON + final String jsonStr = toJson(obj); + + if (jsonStr == null) { + throw new RuntimeException("Object would be serialized to `null`"); + } + + // send the JSON string + send(jsonStr); + } + + /** + * Sends a string over the websocket + * + * @param message the string to send + */ + private void send(final String message) { + if (message == null) { + throw new RuntimeException("You cannot send `null` messages"); + } + + if (mConnected) { + log("SEND: "+message); + mConnection.sendTextMessage(message); + } + else { + log("QUEUE: "+message); + mQueuedMessages.add(message); + } + } + + /** + * Sets the callback that will handle events and receive messages from this client + * + * @param callback the callback instance + */ + public void setCallback(MeteorCallback callback) { + mCallback = callback; + } + + /** + * Serializes the given Java object (POJO) with the Jackson library + * + * @param obj the object to serialize + * @return the serialized object in JSON format + */ + private String toJson(Object obj) { + try { + return mObjectMapper.writeValueAsString(obj); + } + catch (Exception e) { + if (mCallback != null) { + mCallback.onException(e); + } + return null; + } + } + + /** + * Called whenever a JSON payload has been received from the websocket + * + * @param payload the JSON payload to process + */ + private void handleMessage(final String payload) { + log("RECEIVE: "+payload); + + JsonNode data; + try { + data = mObjectMapper.readTree(payload); + } + catch (JsonProcessingException e) { + if (mCallback != null) { + mCallback.onException(e); + } + return; + } + catch (IOException e) { + if (mCallback != null) { + mCallback.onException(e); + } + return; + } + + if (data != null) { + if (data.has(Protocol.Field.MESSAGE)) { + final String message = data.get(Protocol.Field.MESSAGE).getTextValue(); + + if (message.equals(Protocol.Message.CONNECTED)) { + if (data.has(Protocol.Field.SESSION)) { + mSessionID = data.get(Protocol.Field.SESSION).getTextValue(); + } + + // initialize the new session + initSession(); + } + else if (message.equals(Protocol.Message.FAILED)) { + if (data.has(Protocol.Field.VERSION)) { + final String desiredVersion = data.get(Protocol.Field.VERSION).getTextValue(); + + if (isVersionSupported(desiredVersion)) { + mDdpVersion = desiredVersion; + + openConnection(true); + } + else { + throw new RuntimeException("Protocol version not supported: "+desiredVersion); + } + } + } + else if (message.equals(Protocol.Message.PING)) { + final String id; + if (data.has(Protocol.Field.ID)) { + id = data.get(Protocol.Field.ID).getTextValue(); + } + else { + id = null; + } + + sendPong(id); + } + else if (message.equals(Protocol.Message.ADDED) || message.equals(Protocol.Message.ADDED_BEFORE)) { + final String documentID; + if (data.has(Protocol.Field.ID)) { + documentID = data.get(Protocol.Field.ID).getTextValue(); + } + else { + documentID = null; + } + + final String collectionName; + if (data.has(Protocol.Field.COLLECTION)) { + collectionName = data.get(Protocol.Field.COLLECTION).getTextValue(); + } + else { + collectionName = null; + } + + final String newValuesJson; + if (data.has(Protocol.Field.FIELDS)) { + newValuesJson = data.get(Protocol.Field.FIELDS).toString(); + } + else { + newValuesJson = null; + } + + if (mCallback != null) { + mCallback.onDataAdded(collectionName, documentID, newValuesJson); + } + } + else if (message.equals(Protocol.Message.CHANGED)) { + final String documentID; + if (data.has(Protocol.Field.ID)) { + documentID = data.get(Protocol.Field.ID).getTextValue(); + } + else { + documentID = null; + } + + final String collectionName; + if (data.has(Protocol.Field.COLLECTION)) { + collectionName = data.get(Protocol.Field.COLLECTION).getTextValue(); + } + else { + collectionName = null; + } + + final String updatedValuesJson; + if (data.has(Protocol.Field.FIELDS)) { + updatedValuesJson = data.get(Protocol.Field.FIELDS).toString(); + } + else { + updatedValuesJson = null; + } + + final String removedValuesJson; + if (data.has(Protocol.Field.CLEARED)) { + removedValuesJson = data.get(Protocol.Field.CLEARED).toString(); + } + else { + removedValuesJson = null; + } + + if (mCallback != null) { + mCallback.onDataChanged(collectionName, documentID, updatedValuesJson, removedValuesJson); + } + } + else if (message.equals(Protocol.Message.REMOVED)) { + final String documentID; + if (data.has(Protocol.Field.ID)) { + documentID = data.get(Protocol.Field.ID).getTextValue(); + } + else { + documentID = null; + } + + final String collectionName; + if (data.has(Protocol.Field.COLLECTION)) { + collectionName = data.get(Protocol.Field.COLLECTION).getTextValue(); + } + else { + collectionName = null; + } + + if (mCallback != null) { + mCallback.onDataRemoved(collectionName, documentID); + } + } + else if (message.equals(Protocol.Message.RESULT)) { + // check if we have to process any result data internally + if (data.has(Protocol.Field.RESULT)) { + final JsonNode resultData = data.get(Protocol.Field.RESULT); + + // if the result is from a previous login attempt + if (isLoginResult(resultData)) { + // extract the login token for subsequent automatic re-login + final String loginToken = resultData.get(Protocol.Field.TOKEN).getTextValue(); + saveLoginToken(loginToken); + + // extract the user's ID + mLoggedInUserId = resultData.get(Protocol.Field.ID).getTextValue(); + } + } + + final String id; + if (data.has(Protocol.Field.ID)) { + id = data.get(Protocol.Field.ID).getTextValue(); + } + else { + id = null; + } + + final Listener listener = mListeners.get(id); + + if (listener instanceof ResultListener) { + mListeners.remove(id); + + final String result; + if (data.has(Protocol.Field.RESULT)) { + result = data.get(Protocol.Field.RESULT).toString(); + } + else { + result = null; + } + + if (data.has(Protocol.Field.ERROR)) { + final Protocol.Error error = Protocol.Error.fromJson(data.get(Protocol.Field.ERROR)); + ((ResultListener) listener).onError(error.getError(), error.getReason(), error.getDetails()); + } + else { + ((ResultListener) listener).onSuccess(result); + } + } + } + else if (message.equals(Protocol.Message.READY)) { + if (data.has(Protocol.Field.SUBS)) { + final Iterator elements = data.get(Protocol.Field.SUBS).getElements(); + String subscriptionId; + while (elements.hasNext()) { + subscriptionId = elements.next().getTextValue(); + + final Listener listener = mListeners.get(subscriptionId); + + if (listener instanceof SubscribeListener) { + mListeners.remove(subscriptionId); + + ((SubscribeListener) listener).onSuccess(); + } + } + } + } + else if (message.equals(Protocol.Message.NOSUB)) { + final String subscriptionId; + if (data.has(Protocol.Field.ID)) { + subscriptionId = data.get(Protocol.Field.ID).getTextValue(); + } + else { + subscriptionId = null; + } + + final Listener listener = mListeners.get(subscriptionId); + + if (listener instanceof SubscribeListener) { + mListeners.remove(subscriptionId); + + if (data.has(Protocol.Field.ERROR)) { + final Protocol.Error error = Protocol.Error.fromJson(data.get(Protocol.Field.ERROR)); + ((SubscribeListener) listener).onError(error.getError(), error.getReason(), error.getDetails()); + } + else { + ((SubscribeListener) listener).onError(null, null, null); + } + } + else if (listener instanceof UnsubscribeListener) { + mListeners.remove(subscriptionId); + + ((UnsubscribeListener) listener).onSuccess(); + } + } + } + } + } + + /** + * Returns whether the given JSON result is from a previous login attempt + * + * @param result the JSON result + * @return whether the result is from a login attempt (`true`) or not (`false`) + */ + private static boolean isLoginResult(final JsonNode result) { + return result.has(Protocol.Field.TOKEN) && result.has(Protocol.Field.ID); + } + + /** + * Returns whether the client is currently logged in as some user + * + * @return whether the client is logged in (`true`) or not (`false`) + */ + public boolean isLoggedIn() { + return mLoggedInUserId != null; + } + + /** + * Returns the ID of the user who is currently logged in + * + * @return the ID or `null` + */ + public String getUserId() { + return mLoggedInUserId; + } + + /** + * Returns whether the specified version of the DDP protocol is supported or not + * + * @param protocolVersion the DDP protocol version + * @return whether the version is supported or not + */ + public static boolean isVersionSupported(final String protocolVersion) { + return Arrays.asList(SUPPORTED_DDP_VERSIONS).contains(protocolVersion); + } + + /** + * Sends a `pong` over the websocket as a reply to an incoming `ping` + * + * @param id the ID extracted from the `ping` or `null` + */ + private void sendPong(final String id) { + final Map data = new HashMap<>(); + data.put(Protocol.Field.MESSAGE, Protocol.Message.PONG); + if (id != null) { + data.put(Protocol.Field.ID, id); + } + send(data); + } + + /** + * Logs a message if logging has been enabled + * + * @param message the message to log + */ + public static void log(final String message) { + if (LOGGING_ENABLED) { + System.out.println(message); + } + } + + /** + * Creates and returns a new unique ID + * + * @return the new unique ID + */ + public static String uniqueID() { + return UUID.randomUUID().toString(); + } + + /** + * Insert given data into the specified collection + * + * @param collectionName the collection to insert the data into + * @param data the data to insert + */ + public void insert(final String collectionName, final Map data) { + insert(collectionName, data, null); + } + + /** + * Insert given data into the specified collection + * + * @param collectionName the collection to insert the data into + * @param data the data to insert + * @param listener the listener to call on success/error + */ + public void insert(final String collectionName, final Map data, final ResultListener listener) { + call("/"+collectionName+"/insert", new Object[] { data }, listener); + } + + /** + * Insert given data into the specified collection + * + * @param collectionName the collection to insert the data into + * @param query the query to select the document to update with + * @param data the list of keys and values that should be set + */ + public void update(final String collectionName, final Map query, final Map data) { + update(collectionName, query, data, emptyMap()); + } + + /** + * Insert given data into the specified collection + * + * @param collectionName the collection to insert the data into + * @param query the query to select the document to update with + * @param data the list of keys and values that should be set + * @param options the list of option parameters + */ + public void update(final String collectionName, final Map query, final Map data, final Map options) { + update(collectionName, query, data, options, null); + } + + /** + * Insert given data into the specified collection + * + * @param collectionName the collection to insert the data into + * @param query the query to select the document to update with + * @param data the list of keys and values that should be set + * @param options the list of option parameters + * @param listener the listener to call on success/error + */ + public void update(final String collectionName, final Map query, final Map data, final Map options, final ResultListener listener) { + call("/"+collectionName+"/update", new Object[] { query, data, options }, listener); + } + + /** + * Insert given data into the specified collection + * + * @param collectionName the collection to insert the data into + * @param documentID the ID of the document to remove + */ + public void remove(final String collectionName, final String documentID) { + remove(collectionName, documentID, null); + } + + /** + * Insert given data into the specified collection + * + * @param collectionName the collection to insert the data into + * @param documentId the ID of the document to remove + * @param listener the listener to call on success/error + */ + public void remove(final String collectionName, final String documentId, final ResultListener listener) { + Map query = new HashMap(); + query.put(MongoDb.Field.ID, documentId); + call("/"+collectionName+"/remove", new Object[] { query }, listener); + } + + /** + * Sign in the user with the given username and password + * + * Please note that this requires the `accounts-password` package + * + * @param username the username to sign in with + * @param password the password to sign in with + * @param listener the listener to call on success/error + */ + public void loginWithUsername(final String username, final String password, final ResultListener listener) { + login(username, null, password, listener); + } + + /** + * Sign in the user with the given email address and password + * + * Please note that this requires the `accounts-password` package + * + * @param email the email address to sign in with + * @param password the password to sign in with + * @param listener the listener to call on success/error + */ + public void loginWithEmail(final String email, final String password, final ResultListener listener) { + login(null, email, password, listener); + } + + /** + * Sign in the user with the given username or email address and the specified password + * + * Please note that this requires the `accounts-password` package + * + * @param username the username to sign in with (either this or `email` is required) + * @param email the email address to sign in with (either this or `username` is required) + * @param password the password to sign in with + * @param listener the listener to call on success/error + */ + private void login(final String username, final String email, final String password, final ResultListener listener) { + final Map userData = new HashMap(); + if (username != null) { + userData.put("username", username); + } + else if (email != null) { + userData.put("email", email); + } + else { + throw new RuntimeException("You must provide either a username or an email address"); + } + + final Map authData = new HashMap(); + authData.put("user", userData); + authData.put("password", password); + + call("login", new Object[] { authData }, listener); + } + + /** + * Attempts to sign in with the given login token + * + * @param token the login token + * @param listener the listener to call on success/error + */ + private void loginWithToken(final String token, final ResultListener listener) { + final Map authData = new HashMap(); + authData.put("resume", token); + + call("login", new Object[] { authData }, listener); + } + + public void logout() { + logout(null); + } + + public void logout(final ResultListener listener) { + call("logout", new Object[] { }, new ResultListener() { + + @Override + public void onSuccess(final String result) { + // remember that we're not logged in anymore + mLoggedInUserId = null; + + // delete the last login token which is now invalid + saveLoginToken(null); + + if (listener != null) { + listener.onSuccess(result); + } + } + + @Override + public void onError(final String error, final String reason, final String details) { + if (listener != null) { + listener.onError(error, reason, details); + } + } + + }); + } + + /** + * Registers a new user with the specified username, email address and password + * + * This method will automatically login as the new user on success + * + * Please note that this requires the `accounts-password` package + * + * @param username the username to register with (either this or `email` is required) + * @param email the email address to register with (either this or `username` is required) + * @param password the password to register with + * @param listener the listener to call on success/error + */ + public void registerAndLogin(final String username, final String email, final String password, final ResultListener listener) { + registerAndLogin(username, email, password, null, listener); + } + + /** + * Registers a new user with the specified username, email address and password + * + * This method will automatically login as the new user on success + * + * Please note that this requires the `accounts-password` package + * + * @param username the username to register with (either this or `email` is required) + * @param email the email address to register with (either this or `username` is required) + * @param password the password to register with + * @param profile the user's profile data, typically including a `name` field + * @param listener the listener to call on success/error + */ + public void registerAndLogin(final String username, final String email, final String password, final HashMap profile, final ResultListener listener) { + if (username == null && email == null) { + throw new RuntimeException("You must provide either a username or an email address"); + } + + final Map accountData = new HashMap(); + if (username != null) { + accountData.put("username", username); + } + if (email != null) { + accountData.put("email", email); + } + accountData.put("password", password); + if (profile != null) { + accountData.put("profile", profile); + } + + call("createUser", new Object[] { accountData }, listener); + } + + /** + * Executes a remote procedure call (any Java objects (POJOs) will be serialized to JSON by the Jackson library) + * + * @param methodName the name of the method to call, e.g. `/someCollection.insert` + */ + public void call(final String methodName) { + call(methodName, null, null); + } + + /** + * Executes a remote procedure call (any Java objects (POJOs) will be serialized to JSON by the Jackson library) + * + * @param methodName the name of the method to call, e.g. `/someCollection.insert` + * @param params the objects that should be passed to the method as parameters + */ + public void call(final String methodName, final Object[] params) { + call(methodName, params, null); + } + + /** + * Executes a remote procedure call (any Java objects (POJOs) will be serialized to JSON by the Jackson library) + * + * @param methodName the name of the method to call, e.g. `/someCollection.insert` + * @param listener the listener to trigger when the result has been received or `null` + */ + public void call(final String methodName, final ResultListener listener) { + call(methodName, null, listener); + } + + /** + * Executes a remote procedure call (any Java objects (POJOs) will be serialized to JSON by the Jackson library) + * + * @param methodName the name of the method to call, e.g. `/someCollection.insert` + * @param params the objects that should be passed to the method as parameters + * @param listener the listener to trigger when the result has been received or `null` + */ + public void call(final String methodName, final Object[] params, final ResultListener listener) { + callWithSeed(methodName, null, params, listener); + } + + /** + * Executes a remote procedure call (any Java objects (POJOs) will be serialized to JSON by the Jackson library) + * + * @param methodName the name of the method to call, e.g. `/someCollection.insert` + * @param randomSeed an arbitrary seed for pseudo-random generators or `null` + */ + public void callWithSeed(final String methodName, final String randomSeed) { + callWithSeed(methodName, randomSeed, null, null); + } + + /** + * Executes a remote procedure call (any Java objects (POJOs) will be serialized to JSON by the Jackson library) + * + * @param methodName the name of the method to call, e.g. `/someCollection.insert` + * @param randomSeed an arbitrary seed for pseudo-random generators or `null` + * @param params the objects that should be passed to the method as parameters + */ + public void callWithSeed(final String methodName, final String randomSeed, final Object[] params) { + callWithSeed(methodName, randomSeed, params, null); + } + + /** + * Executes a remote procedure call (any Java objects (POJOs) will be serialized to JSON by the Jackson library) + * + * @param methodName the name of the method to call, e.g. `/someCollection.insert` + * @param randomSeed an arbitrary seed for pseudo-random generators or `null` + * @param params the objects that should be passed to the method as parameters + * @param listener the listener to trigger when the result has been received or `null` + */ + public void callWithSeed(final String methodName, final String randomSeed, final Object[] params, final ResultListener listener) { + // create a new unique ID for this request + final String callId = uniqueID(); + + // save a reference to the listener to be executed later + if (listener != null) { + mListeners.put(callId, listener); + } + + // send the request + final Map data = new HashMap<>(); + data.put(Protocol.Field.MESSAGE, Protocol.Message.METHOD); + data.put(Protocol.Field.METHOD, methodName); + data.put(Protocol.Field.ID, callId); + if (params != null) { + data.put(Protocol.Field.PARAMS, params); + } + if (randomSeed != null) { + data.put(Protocol.Field.RANDOM_SEED, randomSeed); + } + send(data); + } + + /** + * Subscribes to a specific subscription from the server + * + * @param subscriptionName the name of the subscription + * @return the generated subscription ID (must be used when unsubscribing) + */ + public String subscribe(final String subscriptionName) { + return subscribe(subscriptionName, null); + } + + /** + * Subscribes to a specific subscription from the server + * + * @param subscriptionName the name of the subscription + * @param params the subscription parameters + * @return the generated subscription ID (must be used when unsubscribing) + */ + public String subscribe(final String subscriptionName, final Object[] params) { + return subscribe(subscriptionName, params, null); + } + + /** + * Subscribes to a specific subscription from the server + * + * @param subscriptionName the name of the subscription + * @param params the subscription parameters + * @param listener the listener to call on success/error + * @return the generated subscription ID (must be used when unsubscribing) + */ + public String subscribe(final String subscriptionName, final Object[] params, final SubscribeListener listener) { + // create a new unique ID for this request + final String subscriptionId = uniqueID(); + + // save a reference to the listener to be executed later + if (listener != null) { + mListeners.put(subscriptionId, listener); + } + + // send the request + final Map data = new HashMap<>(); + data.put(Protocol.Field.MESSAGE, Protocol.Message.SUBSCRIBE); + data.put(Protocol.Field.NAME, subscriptionName); + data.put(Protocol.Field.ID, subscriptionId); + if (params != null) { + data.put(Protocol.Field.PARAMS, params); + } + send(data); + + // return the generated subscription ID + return subscriptionId; + } + + /** + * Unsubscribes from the subscription with the specified name + * + * @param subscriptionId the ID of the subscription + */ + public void unsubscribe(final String subscriptionId) { + unsubscribe(subscriptionId, null); + } + + /** + * Unsubscribes from the subscription with the specified name + * + * @param subscriptionId the ID of the subscription + * @param listener the listener to call on success/error + */ + public void unsubscribe(final String subscriptionId, final UnsubscribeListener listener) { + // save a reference to the listener to be executed later + if (listener != null) { + mListeners.put(subscriptionId, listener); + } + + // send the request + final Map data = new HashMap<>(); + data.put(Protocol.Field.MESSAGE, Protocol.Message.UNSUBSCRIBE); + data.put(Protocol.Field.ID, subscriptionId); + send(data); + } + + /** + * Creates an empty map for use as default parameter + * + * @return an empty map + */ + private static Map emptyMap() { + return new HashMap(); + } + + /** + * Saves the given login token to the preferences + * + * @param token the login token to save + */ + private void saveLoginToken(final String token) { + final SharedPreferences prefs = getSharedPreferences(); + final SharedPreferences.Editor editor = prefs.edit(); + editor.putString(Preferences.Keys.LOGIN_TOKEN, token); + editor.apply(); + } + + /** + * Retrieves the last login token from the preferences + * + * @return the last login token or `null` + */ + private String getLoginToken() { + return getSharedPreferences().getString(Preferences.Keys.LOGIN_TOKEN, null); + } + + /** + * Returns a reference to the preferences for internal use + * + * @return the `SharedPreferences` instance + */ + private SharedPreferences getSharedPreferences() { + return mContext.getSharedPreferences(Preferences.FILE_NAME, Context.MODE_PRIVATE); + } + + private void initSession() { + // get the last login token + final String loginToken = getLoginToken(); + + // if we found a login token that might work + if (loginToken != null) { + // try to sign in with that token + loginWithToken(loginToken, new ResultListener() { + + @Override + public void onSuccess(final String result) { + announceSessionReady(true); + } + + @Override + public void onError(final String error, final String reason, final String details) { + announceSessionReady(false); + } + + }); + } + // if we didn't find any login token + else { + announceSessionReady(false); + } + } + + /** + * Announces that the new session is now ready to use + * + * @param signedInAutomatically whether we have already signed in automatically (`true`) or not (`false)` + */ + private void announceSessionReady(final boolean signedInAutomatically) { + // run the callback that waits for the connection to open + if (mCallback != null) { + mCallback.onConnect(signedInAutomatically); + } + + // try to dispatch queued messages now + for (String queuedMessage : mQueuedMessages) { + send(queuedMessage); + } + } + +} diff --git a/lib/src/main/java/im/delight/android/ddp/MeteorCallback.java b/lib/src/main/java/im/delight/android/ddp/MeteorCallback.java new file mode 100644 index 0000000..9a4e447 --- /dev/null +++ b/lib/src/main/java/im/delight/android/ddp/MeteorCallback.java @@ -0,0 +1,31 @@ +package im.delight.android.ddp; + +/** + * Copyright 2014 www.delight.im + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import de.tavendo.autobahn.WebSocket; + +/** Callback for asynchronous events caused by the WebSocket connection or received from the DDP server */ +public interface MeteorCallback { + + public void onConnect(boolean signedInAutomatically); + public void onDisconnect(WebSocket.WebSocketConnectionObserver.WebSocketCloseNotification code, String reason); + public void onDataAdded(String collectionName, String documentID, String newValuesJson); + public void onDataChanged(String collectionName, String documentID, String updatedValuesJson, String removedValuesJson); + public void onDataRemoved(String collectionName, String documentID); + public void onException(Exception e); + +} diff --git a/lib/src/main/java/im/delight/android/ddp/MongoDb.java b/lib/src/main/java/im/delight/android/ddp/MongoDb.java new file mode 100644 index 0000000..fdba881 --- /dev/null +++ b/lib/src/main/java/im/delight/android/ddp/MongoDb.java @@ -0,0 +1,45 @@ +package im.delight.android.ddp; + +/** + * Copyright 2014 www.delight.im + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Constants used in MongoDB (the database behind Meteor) */ +public class MongoDb { + + /** Constants defining field names in documents */ + public static class Field { + + public static final String ID = "_id"; + public static final String VALUE = "_value"; + public static final String PRIORITY = "_priority"; + + } + + /** Constants defining modifiers that can be used in requests */ + public static class Modifier { + + public static final String SET = "$set"; + + } + + /** Constants definining options that may be sent along with requests */ + public static class Option { + + public static final String UPSERT = "upsert"; + + } + +} diff --git a/lib/src/main/java/im/delight/android/ddp/Preferences.java b/lib/src/main/java/im/delight/android/ddp/Preferences.java new file mode 100644 index 0000000..f35573b --- /dev/null +++ b/lib/src/main/java/im/delight/android/ddp/Preferences.java @@ -0,0 +1,35 @@ +package im.delight.android.ddp; + +/** + * Copyright 2015 www.delight.im + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Constants and utilities used for access to the preferences/settings */ +public class Preferences { + + /** Name of the file where the preferences for this library will be stored */ + public static final String FILE_NAME = "android_ddp"; + + private Preferences() { } + + public static class Keys { + + private Keys() { } + + /** Name of the preference where the current login token will be stored */ + public static final String LOGIN_TOKEN = "login_token"; + } + +} diff --git a/lib/src/main/java/im/delight/android/ddp/Protocol.java b/lib/src/main/java/im/delight/android/ddp/Protocol.java new file mode 100644 index 0000000..d2ee097 --- /dev/null +++ b/lib/src/main/java/im/delight/android/ddp/Protocol.java @@ -0,0 +1,133 @@ +package im.delight.android.ddp; + +/** + * Copyright 2014 www.delight.im + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.codehaus.jackson.JsonNode; + +/** Constants used in the Distributed Data Protocol (DDP) */ +public class Protocol { + + /** Constants defining message types in message objects' values */ + public static class Message { + + public static final String ADDED = "added"; + public static final String ADDED_BEFORE = "addedBefore"; + public static final String CHANGED = "changed"; + public static final String CONNECT = "connect"; + public static final String CONNECTED = "connected"; + public static final String FAILED = "failed"; + public static final String METHOD = "method"; + public static final String NOSUB = "nosub"; + public static final String PING = "ping"; + public static final String PONG = "pong"; + public static final String READY = "ready"; + public static final String REMOVED = "removed"; + public static final String RESULT = "result"; + public static final String SUBSCRIBE = "sub"; + public static final String UNSUBSCRIBE = "unsub"; + + } + + /** Constants defining field names in message objects' keys */ + public static class Field { + + public static final String CLEARED = "cleared"; + public static final String COLLECTION = "collection"; + public static final String DETAILS = "details"; + public static final String ERROR = "error"; + public static final String FIELDS = "fields"; + public static final String ID = "id"; + public static final String MESSAGE = "msg"; + public static final String METHOD = "method"; + public static final String NAME = "name"; + public static final String PARAMS = "params"; + public static final String RANDOM_SEED = "randomSeed"; + public static final String REASON = "reason"; + public static final String RESULT = "result"; + public static final String SESSION = "session"; + public static final String SUBS = "subs"; + public static final String SUPPORT = "support"; + public static final String VERSION = "version"; + public static final String TOKEN = "token"; + + } + + /** Wrapper and utility class to store errors from the DDP protocol */ + public static class Error { + + private final String mError; + private final String mReason; + private final String mDetails; + + private Error(final String error, final String reason, final String details) { + mError = error; + mReason = reason; + mDetails = details; + } + + public static Error fromJson(final JsonNode json) { + final String error; + if (json.has(Protocol.Field.REASON)) { + final JsonNode errorJson = json.get(Protocol.Field.ERROR); + if (errorJson.isTextual()) { + error = errorJson.getTextValue(); + } + else if (errorJson.isNumber()) { + error = errorJson.getNumberValue().toString(); + } + else { + throw new RuntimeException("Unexpected data type of error.error"); + } + } + else { + error = null; + } + + final String reason; + if (json.has(Protocol.Field.REASON)) { + reason = json.get(Protocol.Field.REASON).getTextValue(); + } + else { + reason = null; + } + + final String details; + if (json.has(Protocol.Field.DETAILS)) { + details = json.get(Protocol.Field.DETAILS).getTextValue(); + } + else { + details = null; + } + + return new Error(error, reason, details); + } + + public String getError() { + return mError; + } + + public String getReason() { + return mReason; + } + + public String getDetails() { + return mDetails; + } + + } + +} diff --git a/lib/src/main/java/im/delight/android/ddp/ResultListener.java b/lib/src/main/java/im/delight/android/ddp/ResultListener.java new file mode 100644 index 0000000..65a0571 --- /dev/null +++ b/lib/src/main/java/im/delight/android/ddp/ResultListener.java @@ -0,0 +1,24 @@ +package im.delight.android.ddp; + +/** + * Copyright 2014 www.delight.im + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public interface ResultListener extends Listener { + + public void onSuccess(String result); + public void onError(String error, String reason, String details); + +} diff --git a/lib/src/main/java/im/delight/android/ddp/SubscribeListener.java b/lib/src/main/java/im/delight/android/ddp/SubscribeListener.java new file mode 100644 index 0000000..bc1567b --- /dev/null +++ b/lib/src/main/java/im/delight/android/ddp/SubscribeListener.java @@ -0,0 +1,24 @@ +package im.delight.android.ddp; + +/** + * Copyright 2014 www.delight.im + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public interface SubscribeListener extends Listener { + + public void onSuccess(); + public void onError(String error, String reason, String details); + +} diff --git a/lib/src/main/java/im/delight/android/ddp/UnsubscribeListener.java b/lib/src/main/java/im/delight/android/ddp/UnsubscribeListener.java new file mode 100644 index 0000000..f7f107a --- /dev/null +++ b/lib/src/main/java/im/delight/android/ddp/UnsubscribeListener.java @@ -0,0 +1,23 @@ +package im.delight.android.ddp; + +/** + * Copyright 2014 www.delight.im + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public interface UnsubscribeListener extends Listener { + + public void onSuccess(); + +} diff --git a/settings.gradle b/settings.gradle new file mode 100644 index 0000000..8c2a2e0 --- /dev/null +++ b/settings.gradle @@ -0,0 +1 @@ +include ':lib'