Skip to content

Commit

Permalink
Pass CloseOk from upstream down to client
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed May 11, 2024
1 parent 1da1a2c commit a451c5e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
14 changes: 8 additions & 6 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,14 @@ module AMQProxy
while frame = @outgoing_frames.receive?
socket.write_bytes frame, IO::ByteFormat::NetworkEndian
socket.flush unless expect_more_publish_frames?(frame)

break if frame.is_a? AMQ::Protocol::Frame::Connection::CloseOk
case frame
when AMQ::Protocol::Frame::Channel::Close
@channel_map[frame.channel] = nil
when AMQ::Protocol::Frame::Channel::CloseOk
@channel_map.delete(frame.channel)
when AMQ::Protocol::Frame::Connection::CloseOk
break
end
end
rescue ex : IO::Error
# Client closed connection, suppress error
Expand All @@ -146,10 +152,6 @@ module AMQProxy

# Send frame to client, channel id should already be remapped by the caller
def write(frame : AMQ::Protocol::Frame)
case frame
when AMQ::Protocol::Frame::Channel::Close
@channel_map[frame.channel] = nil
end
@outgoing_frames.send frame
rescue Channel::ClosedError
# do nothing
Expand Down
4 changes: 3 additions & 1 deletion src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ module AMQProxy
downstream_channel.write frame
end
when AMQ::Protocol::Frame::Channel::CloseOk # when client requested channel close
@channels_lock.synchronize { @channels.delete(frame.channel) }
if downstream_channel = @channels_lock.synchronize { @channels.delete(frame.channel) }
downstream_channel.write(frame)
end
else
if downstream_channel = @channels_lock.synchronize { @channels[frame.channel]? }
downstream_channel.write(frame)
Expand Down

0 comments on commit a451c5e

Please sign in to comment.