Skip to content

Commit

Permalink
Various bug fixes and XML comments (see README for 1.0.1)
Browse files Browse the repository at this point in the history
  • Loading branch information
haneytron committed Nov 13, 2013
1 parent aafe973 commit 77ff168
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 15 deletions.
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
SIMPLSOCKETS 1.0.0
SIMPLSOCKETS 1.0.1
===========


A spinoff library of Dache that provides highly efficient, scalable, simple socket communication.

http://www.getdache.net
Expand All @@ -12,6 +13,23 @@ VERSION HISTORY
============================================


1.0.1
------------------


-Fixed logic error in BlockingQueue constructor where _queue wasn't actually assigned

-Fixed logic error in Pool where resetItemMethod was not always called when Popping an item

-Fixed atomicity of Error event so that it is raised exactly once on disconnection regardless of multithreaded use

-On error, communication methods now exit gracefully after Error event is raised (no bubbled exceptions)

-Exposed CurrentlyConnectedClients property on ISimplSocketServer

-Added XML comments to a few classes


1.0.0
------------------

Expand Down
23 changes: 22 additions & 1 deletion SimplSockets/BlockingQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,38 @@

namespace SimplSockets
{
/// <summary>
/// A queue that wraps a regular generic queue but when empty will block Dequeue threads until an item is available.
/// </summary>
/// <typeparam name="T">The type of the object contained in the queue.</typeparam>
internal sealed class BlockingQueue<T>
{
// The underlying queue
Queue<T> _queue = null;
// The semaphore used for blocking
Semaphore _semaphore = new Semaphore(0, Int32.MaxValue);

/// <summary>
/// The constructor.
/// </summary>
public BlockingQueue()
{
new Queue<T>();
_queue = new Queue<T>();
}

/// <summary>
/// The constructor.
/// </summary>
/// <param name="capacity">Sets the initial queue capacity.</param>
public BlockingQueue(int capacity)
{
_queue = new Queue<T>(capacity);
}

/// <summary>
/// Enqueues an item.
/// </summary>
/// <param name="item">An item.</param>
public void Enqueue(T item)
{
lock (_queue)
Expand All @@ -29,6 +46,10 @@ public void Enqueue(T item)
_semaphore.Release();
}

/// <summary>
/// Dequeues an item. Will block if the queue is empty until an item becomes available.
/// </summary>
/// <returns>An item.</returns>
public T Dequeue()
{
_semaphore.WaitOne();
Expand Down
5 changes: 5 additions & 0 deletions SimplSockets/ISimplSocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ public interface ISimplSocketServer : IDisposable
/// </summary>
void Close();

/// <summary>
/// Gets the currently connected client count.
/// </summary>
int CurrentlyConnectedClients { get; }

/// <summary>
/// An event that is fired whenever a message is received. Hook into this to process messages and potentially call Reply to send a response.
/// </summary>
Expand Down
31 changes: 30 additions & 1 deletion SimplSockets/Pool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,27 @@

namespace SimplSockets
{
/// <summary>
/// A pool of objects that can be reused to manage memory efficiently.
/// </summary>
/// <typeparam name="T">The type of object that is pooled.</typeparam>
internal sealed class Pool<T> where T : class
{
// The queue that holds the items
private readonly Queue<T> _queue = null;
// The initial pool count
private readonly int _initialPoolCount = 0;
// The method that creates a new item
private readonly Func<T> _newItemMethod = null;
// The method that resets an item's state
private readonly Action<T> _resetItemMethod = null;

/// <summary>
/// The constructor.
/// </summary>
/// <param name="poolCount">The count of items in the pool.</param>
/// <param name="newItemMethod">The method that creates a new item.</param>
/// <param name="resetItemMethod">The method that resets an item's state.</param>
public Pool(int poolCount, Func<T> newItemMethod, Action<T> resetItemMethod)
{
_queue = new Queue<T>(poolCount);
Expand All @@ -18,6 +32,10 @@ public Pool(int poolCount, Func<T> newItemMethod, Action<T> resetItemMethod)
_resetItemMethod = resetItemMethod;
}

/// <summary>
/// Pushes an item into the pool for later re-use.
/// </summary>
/// <param name="item">The item.</param>
public void Push(T item)
{
// Limit queue size
Expand All @@ -32,10 +50,15 @@ public void Push(T item)
}
}

/// <summary>
/// Pops an item out of the pool for use. The item will have its state reset.
/// </summary>
/// <returns></returns>
public T Pop()
{
T result = null;

// Cheap check
if (_queue.Count == 0)
{
result = _newItemMethod();
Expand All @@ -48,9 +71,15 @@ public T Pop()

lock (_queue)
{
// Double lock check
if (_queue.Count == 0)
{
return _newItemMethod();
result = _newItemMethod();
if (_resetItemMethod != null)
{
_resetItemMethod(result);
}
return result;
}

result = _queue.Dequeue();
Expand Down
85 changes: 73 additions & 12 deletions SimplSockets/SimplSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ public void Connect(EndPoint endPoint)
{
HandleCommunicationError(_socket, ex);
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
return;
}

// Get a message state from the pool
var messageState = _messageStatePool.Pop();
Expand Down Expand Up @@ -222,7 +227,7 @@ public void Listen(EndPoint localEndpoint)

// Create socket
_socket = _socketFunc();

try
{
_socket.Bind(localEndpoint);
Expand All @@ -234,7 +239,10 @@ public void Listen(EndPoint localEndpoint)
catch (SocketException ex)
{
HandleCommunicationError(_socket, ex);
return;
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
}
}

Expand Down Expand Up @@ -288,6 +296,11 @@ public void Send(byte[] message)
{
HandleCommunicationError(_socket, ex);
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
return;
}
}

/// <summary>
Expand Down Expand Up @@ -324,6 +337,11 @@ public byte[] SendReceive(byte[] message)
HandleCommunicationError(_socket, ex);
return null;
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
return null;
}

// Wait for our message to go ahead from the receive callback
multiplexerData.ManualResetEvent.WaitOne();
Expand Down Expand Up @@ -370,6 +388,11 @@ public void Reply(byte[] message, ReceivedMessage receivedMessage)
HandleCommunicationError(receivedMessage.Socket, ex);
return;
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
return;
}

// Put received message back in the pool
_receiveMessagePool.Push(receivedMessage);
Expand Down Expand Up @@ -409,6 +432,12 @@ private void AcceptCallback(IAsyncResult asyncResult)
HandleCommunicationError(_socket, ex);
return;
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
return;
}

// Turn on or off Nagle algorithm
handler.NoDelay = !_useNagleAlgorithm;

Expand All @@ -422,6 +451,11 @@ private void AcceptCallback(IAsyncResult asyncResult)
HandleCommunicationError(_socket, ex);
return;
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
return;
}

// Do not proceed until we have room to do so
_maxConnectionsSemaphore.WaitOne();
Expand All @@ -442,6 +476,11 @@ private void AcceptCallback(IAsyncResult asyncResult)
HandleCommunicationError(handler, ex);
return;
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
return;
}

// Create receive queue for this client
_receiveBufferQueueLock.EnterWriteLock();
Expand Down Expand Up @@ -473,6 +512,11 @@ private void SendCallback(IAsyncResult asyncResult)
HandleCommunicationError(socket, ex);
return;
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
return;
}
}

private void ReceiveCallback(IAsyncResult asyncResult)
Expand All @@ -493,6 +537,11 @@ private void ReceiveCallback(IAsyncResult asyncResult)
HandleCommunicationError(messageState.Handler, ex);
return;
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
return;
}

if (bytesRead > 0)
{
Expand Down Expand Up @@ -522,7 +571,10 @@ private void ReceiveCallback(IAsyncResult asyncResult)
catch (SocketException ex)
{
HandleCommunicationError(messageState.Handler, ex);
return;
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
}
}
}
Expand Down Expand Up @@ -664,17 +716,26 @@ private void ProcessReceivedMessage(MessageState messageState)
/// <param name="ex">The exception that the socket raised.</param>
private void HandleCommunicationError(Socket socket, Exception ex)
{
// Close the socket
try
lock (socket)
{
socket.Shutdown(SocketShutdown.Both);
}
catch
{
// Ignore
}
// Close the socket
try
{
socket.Shutdown(SocketShutdown.Both);
}
catch (ObjectDisposedException)
{
// Socket was already closed/disposed, so return out to prevent raising the Error event multiple times
// This is most likely to happen when an error occurs during heavily multithreaded use
return;
}
catch
{
// Ignore
}

socket.Close();
socket.Close();
}

// Release all multiplexer clients by signalling them
_clientMultiplexerLock.EnterReadLock();
Expand Down

0 comments on commit 77ff168

Please sign in to comment.