From 17b19812d9b27e3a5af5a6d823af94c943d7d3f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sat, 11 May 2024 23:25:25 +0200 Subject: [PATCH] send Connection.Close-ok before closing socket Do that by dropping the write_loop fiber --- src/amqproxy/client.cr | 43 ++++++++++++++++-------------------------- 1 file changed, 16 insertions(+), 27 deletions(-) diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index 7c3d032..655b2c9 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -9,7 +9,7 @@ module AMQProxy Log = ::Log.for(self) getter credentials : Credentials @channel_map = Hash(UInt16, UpstreamChannel?).new - @outgoing_frames = Channel(AMQ::Protocol::Frame).new(128) + @lock = Mutex.new @frame_max : UInt32 @channel_max : UInt16 @heartbeat : UInt16 @@ -21,7 +21,6 @@ module AMQProxy @frame_max = tune_ok.frame_max @channel_max = tune_ok.channel_max @heartbeat = tune_ok.heartbeat - spawn write_loop end # Keep a buffer of publish frames @@ -125,36 +124,27 @@ module AMQProxy else Log.debug { "Disconnected" } ensure - @outgoing_frames.close socket.close rescue nil close_all_upstream_channels end - private def write_loop(socket = @socket) - while frame = @outgoing_frames.receive? - socket.write_bytes frame, IO::ByteFormat::NetworkEndian - socket.flush unless expect_more_publish_frames?(frame) - case frame - when AMQ::Protocol::Frame::Channel::Close - @channel_map[frame.channel] = nil - when AMQ::Protocol::Frame::Channel::CloseOk - @channel_map.delete(frame.channel) - when AMQ::Protocol::Frame::Connection::CloseOk - break - end + # Send frame to client, channel id should already be remapped by the caller + def write(frame : AMQ::Protocol::Frame) + @lock.synchronize do + @socket.write_bytes frame, IO::ByteFormat::NetworkEndian + @socket.flush unless expect_more_publish_frames?(frame) + end + case frame + when AMQ::Protocol::Frame::Channel::Close + @channel_map[frame.channel] = nil + when AMQ::Protocol::Frame::Channel::CloseOk + @channel_map.delete(frame.channel) + when AMQ::Protocol::Frame::Connection::CloseOk + @socket.close rescue nil end rescue ex : IO::Error # Client closed connection, suppress error - ensure - @outgoing_frames.close - socket.close rescue nil - end - - # Send frame to client, channel id should already be remapped by the caller - def write(frame : AMQ::Protocol::Frame) - @outgoing_frames.send frame - rescue Channel::ClosedError - # do nothing + @socket.close rescue nil end def close_connection(code, text, frame = nil) @@ -196,9 +186,8 @@ module AMQProxy # @socket.read_timeout = 1.seconds end - # Close the outgoing frames channel which will let write_loop close the socket def close_socket - @outgoing_frames.close + @socket.close rescue nil end private def set_socket_options(socket = @socket)