Skip to content

Commit

Permalink
socket_manager: add feature to share sockets with another server
Browse files Browse the repository at this point in the history
Another process can take over UDP/TCP sockets without downtime.

    server = ServerEngine::SocketManager::Server.share_sockets_with_another_server(path)

This starts a new server that shares all UDP/TCP sockets with
the existing server.
The old process should stop without removing the file for the
socket after the new process starts.

This allows us to replace both the server and the
workers with new processes without socket downtime.
(The existing live restart feature does not support network
servers. We can restart workers without downtime, but there is
no way to restart the network server without downtime.)

ref: fluent/fluentd#4622

Limitation: This feature would not work well if the process
opens new TCP ports frequently.

Signed-off-by: Daijiro Fukuda <[email protected]>
Co-authored-by: Shizuo Fujita <[email protected]>
  • Loading branch information
daipom and Watson1978 committed Oct 22, 2024
1 parent 5e9d11e commit 9a42d6b
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 26 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,15 @@ se = ServerEngine.create(MyServer, MyWorker, {
se.run
```

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).
Other features:

- `socket_manager_server = SocketManager::Server.share_sockets_with_another_server(path)`
- It starts a new manager server that shares all UDP/TCP sockets with the existing manager.
- We can use this for live restart for network servers.
- The old process should stop without removing the file for the socket after the new process starts.
- Limitation: This feature would not work well if the process opens new TCP ports frequently.

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).

## Module API

Expand Down
21 changes: 17 additions & 4 deletions lib/serverengine/socket_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,32 @@ def self.open(path = nil)
end
end

def initialize(path)
def self.share_sockets_with_another_server(path)
raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows?
server = new(path, start: false)
server.share_sockets_with_another_server
server
end

def initialize(path, start: true)
@tcp_sockets = {}
@udp_sockets = {}
@mutex = Mutex.new
@path = start_server(path)
@path = start ? start_server(path) : path
end

attr_reader :path
attr_reader :tcp_sockets, :udp_sockets # for tests

def new_client
Client.new(@path)
end

def start
start_server(path)
nil
end

def close
stop_server
nil
Expand Down Expand Up @@ -159,9 +172,9 @@ def process_peer(peer)
res = SocketManager.recv_peer(peer)
return if res.nil?

pid, method, bind, port = *res
pid, method, *opts = res
begin
send_socket(peer, pid, method, bind, port)
send_socket(peer, pid, method, *opts)
rescue => e
SocketManager.send_peer(peer, e)
end
Expand Down
91 changes: 70 additions & 21 deletions lib/serverengine/socket_manager_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,38 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def share_sockets_with_another_server
another_server = UNIXSocket.new(@path)
begin
idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :share_udp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@udp_sockets[key] = another_server.recv_io UDPSocket
idx += 1
end

idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :share_tcp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@tcp_sockets[key] = another_server.recv_io TCPServer
idx += 1
end

SocketManager.send_peer(another_server, [Process.pid, :share_unix])
res = SocketManager.recv_peer(another_server)
raise res if res.is_a?(Exception)
@server = another_server.recv_io UNIXServer

start_server(@path)
ensure
another_server.close
end
end

private

def listen_tcp_new(bind_ip, port)
Expand Down Expand Up @@ -77,15 +109,17 @@ def listen_udp_new(bind_ip, port)
end

def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)
unless @server
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end
end

@thread = Thread.new do
Expand All @@ -111,19 +145,34 @@ def stop_server
@thread.join if RUBY_VERSION >= "2.2"
end

def send_socket(peer, pid, method, bind, port)
sock = case method
when :listen_tcp
listen_tcp(bind, port)
when :listen_udp
listen_udp(bind, port)
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end

SocketManager.send_peer(peer, nil)

peer.send_io sock
def send_socket(peer, pid, method, *opts)
case method
when :listen_tcp
bind, port = opts
sock = listen_tcp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :listen_udp
bind, port = opts
sock = listen_udp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :share_tcp
idx, = opts
key = @tcp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@tcp_sockets.values[idx]) if key
when :share_udp
idx, = opts
key = @udp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@udp_sockets.values[idx]) if key
when :share_unix
SocketManager.send_peer(peer, nil)
peer.send_io @server
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end
end
end

Expand Down
168 changes: 168 additions & 0 deletions spec/socket_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
expect(server.path).to be_between(49152, 65535)
end
end

context 'Server.share_sockets_with_another_server' do
it 'not supported' do
server = SocketManager::Server.open(server_path)
expect { SocketManager::Server.share_sockets_with_another_server(server_path) }.to raise_error(NotImplementedError)
ensure
server.close
end
end
else
context 'Server.generate_path' do
it 'returns socket path under /tmp' do
Expand All @@ -76,6 +85,165 @@
expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_')
end
end

context 'Server.share_sockets_with_another_server' do
it 'shares listen sockets to another server' do
server = SocketManager::Server.open(server_path)

client = SocketManager::Client.new(server_path)
tcp1 = client.listen_tcp('127.0.0.1', 55551)
udp1 = client.listen_udp('127.0.0.1', 55561)
udp2 = client.listen_udp('127.0.0.1', 55562)

another_server = SocketManager::Server.share_sockets_with_another_server(server_path)

expect([
another_server.tcp_sockets.keys,
another_server.tcp_sockets.values.map(&:addr),
another_server.udp_sockets.keys,
another_server.udp_sockets.values.map(&:addr),
]).to eq([
server.tcp_sockets.keys,
server.tcp_sockets.values.map(&:addr),
server.udp_sockets.keys,
server.udp_sockets.values.map(&:addr),
])
ensure
tcp1&.close
udp1&.close
udp2&.close
server&.close
another_server&.close
end

it 'takes over TCP sockets without downtime' do
manager_server = SocketManager::Server.open(server_path)
manager_client = SocketManager::Client.new(server_path)

has_server_started = false
# The old server starts listening
thread_server = Thread.new do
server = manager_client.listen_tcp('127.0.0.1', test_port)
has_server_started = true
while socket = server.accept
incr_test_state(:count)
socket.close
end
ensure
server&.close
end

sleep 0.1 until has_server_started

# The client starts sending data
thread_client = Thread.new do
100.times do |i|
socket = TCPSocket.new('127.0.0.1', test_port)
begin
socket.write("Hello #{i}\n")
ensure
socket.close
end
sleep 0.01
end
end

sleep 0.5

# The new server shares the sockets and starts listening in parallel with the old one
thread_new_server = Thread.new do
new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path)
server = manager_client.listen_tcp('127.0.0.1', test_port)
while socket = server.accept
incr_test_state(:count)
socket.close
end
ensure
new_manager_server&.close
server&.close
end

# Stop the old server
sleep 0.1
thread_server.kill
thread_server.join

thread_client.join
wait_for_stop

# Confirm that server switching was completed without data loss
expect(test_state(:count)).to eq(100)
ensure
manager_server&.close
thread_server&.kill
thread_new_server&.kill
thread_server&.join
thread_new_server&.join
end

it 'takes over UDP sockets without downtime' do
manager_server = SocketManager::Server.open(server_path)
manager_client = SocketManager::Client.new(server_path)

has_server_started = false
# The old server starts listening
thread_server = Thread.new do
server = manager_client.listen_udp('127.0.0.1', test_port)
has_server_started = true
while server.recv(10)
incr_test_state(:count)
end
ensure
server&.close
end

sleep 0.1 until has_server_started

# The client starts sending data
thread_client = Thread.new do
100.times do |i|
socket = UDPSocket.new
begin
socket.send("Hello #{i}\n", 0, "127.0.0.1", test_port)
ensure
socket.close
end
sleep 0.01
end
end

sleep 0.5

# The new server shares the sockets and starts listening in parallel with the old one
thread_new_server = Thread.new do
new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path)
server = manager_client.listen_udp('127.0.0.1', test_port)
while server.recv(10)
incr_test_state(:count)
end
ensure
new_manager_server&.close
server&.close
end

# Stop the old server
sleep 0.1
thread_server.kill
thread_server.join

thread_client.join
wait_for_stop

# Confirm that server switching was completed without data loss
expect(test_state(:count)).to eq(100)
ensure
manager_server&.close
thread_server&.kill
thread_new_server&.kill
thread_server&.join
thread_new_server&.join
end
end
end

context 'with thread' do
Expand Down

0 comments on commit 9a42d6b

Please sign in to comment.