Skip to content

Commit

Permalink
router: make failover check health of the storages
Browse files Browse the repository at this point in the history
Before this patch router didn't take into account the state of
box.info.replication of the storage, when routing requests to it.
From now on router automatically lowers the priority of replica,
when router supposes, that connection from the master to a replica
is dead (status or idle > 30) or too slow (lag is > 30 sec).

We also change REPLICA_NOACTIVITY_TIMEOUT from 5 minutes to 30 seconds.
This is needed to speed up how quickly a replica notices the master's
change. Before the patch the non-master never knew, where the master
currently is. Now, since we try to check status of the master's upstream,
we need to find this master in service_info via conn_manager. Since after
that replica doesn't do any requests to master, the connection is collected
by conn_manager in collect_idle_conns after 30 seconds. Then router's
failover calls service_info one more time and non-master locates master,
which may have already changed.

This patch allows to increase the consistency of read requests and
decreases the probability of reading a stale data.

Closes tarantool#453
Closes tarantool#487

NO_DOC=bugfix
Serpentian committed Dec 3, 2024
1 parent bcef4f5 commit 1cd42d6
Showing 8 changed files with 495 additions and 8 deletions.
2 changes: 2 additions & 0 deletions test/luatest_helpers/server.lua
Original file line number Diff line number Diff line change
@@ -281,6 +281,8 @@ function Server:grep_log(what, bytes, opts)
-- if instance has crashed provide filename to use grep_log
local filename = opts.filename or self:eval('return box.cfg.log')
local file = fio.open(filename, {'O_RDONLY', 'O_NONBLOCK'})
-- This is needed to find UUID with string.match.
what = string.gsub(what, "%-", "%%-")

local function fail(msg)
local err = errno.strerror()
File renamed without changes.
360 changes: 360 additions & 0 deletions test/router-luatest/router_3_3_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,360 @@
local t = require('luatest')
local vtest = require('test.luatest_helpers.vtest')
local wait_timeout = vtest.wait_timeout

local g = t.group()

local cfg_template = {
sharding = {
{
replicas = {
replica_1_a = {
master = true,
},
replica_1_b = {},
replica_1_c = {},
},
},
{
replicas = {
replica_2_a = {
master = true,
},
replica_2_b = {},
replica_2_c = {},
},
},
},
bucket_count = 100,
test_user_grant_range = 'super',
}

local global_cfg

g.before_all(function(g)
global_cfg = vtest.config_new(cfg_template)
vtest.cluster_new(g, global_cfg)

t.assert_equals(g.replica_1_a:exec(function()
return #ivshard.storage.info().alerts
end), 0, 'no alerts after boot')

local router = vtest.router_new(g, 'router', global_cfg)
g.router = router
local res, err = router:exec(function()
return ivshard.router.bootstrap({timeout = iwait_timeout})
end)
t.assert(res and not err, 'bootstrap buckets')

vtest.cluster_exec_each_master(g, function()
local s = box.schema.space.create('test', {
format = {
{'id', 'unsigned'},
{'bucket_id', 'unsigned'},
},
})
s:create_index('id', {parts = {'id'}})
s:create_index('bucket_id', {
parts = {'bucket_id'}, unique = false
})
end)
end)

g.after_all(function(g)
g.cluster:stop()
end)

local function prepare_failover_health_check(g)
local new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.sharding[1].replicas.replica_1_a.zone = 4
new_cfg_template.sharding[1].replicas.replica_1_b.zone = 3
new_cfg_template.sharding[1].replicas.replica_1_c.zone = 2
new_cfg_template.zone = 1
new_cfg_template.weights = {
[1] = {
[1] = 0,
[2] = 1,
[3] = 2,
[4] = 3,
},
}

local new_cluster_cfg = vtest.config_new(new_cfg_template)
vtest.router_cfg(g.router, new_cluster_cfg)
g.router:exec(function()
-- Speed up switching replicas.
rawset(_G, 'old_failover_up_timeout', ivconst.FAILOVER_UP_TIMEOUT)
ivconst.FAILOVER_UP_TIMEOUT = 0.5
end)
end

local function router_assert_prioritized(g, replica)
local uuid = g.router:exec(function(rs_uuid)
local router = ivshard.router.internal.static_router
local replica = router.replicasets[rs_uuid].replica
t.assert_not_equals(replica, nil)
return replica.uuid
end, {replica:replicaset_uuid()})
t.assert_equals(uuid, replica:instance_uuid())
end

local function router_wakeup_failover(g)
g.router:exec(function()
ivshard.router.internal.static_router.failover_fiber:wakeup()
end)
end

local function router_wait_prioritized(g, replica)
t.helpers.retrying({timeout = wait_timeout}, function()
router_assert_prioritized(g, replica)
router_wakeup_failover(g)
end)
end

local function failover_health_check_missing_upstream(g)
router_wait_prioritized(g, g.replica_1_c)
-- Reconfigure box.cfg.replication.
local box_cfg = g.replica_1_c:get_box_cfg()
g.replica_1_c:update_box_cfg({replication = {
g.replica_1_c.net_box_uri,
g.replica_1_b.net_box_uri,
}})
-- Prioritized replica is changed to another one.
router_wait_prioritized(g, g.replica_1_b)
local msg = 'Replica %s is unhealthy: Missing upstream'
msg = string.format(msg, g.replica_1_c:instance_uuid())
t.assert(g.router:grep_log(msg), msg)
-- Restore replication. Replica returns.
g.replica_1_c:update_box_cfg({replication = box_cfg.replication})
router_wait_prioritized(g, g.replica_1_c)
msg = 'Replica %s is healthy'
msg = string.format(msg, g.replica_1_c:instance_uuid())
t.assert(g.router:grep_log(msg), msg)
end

local function failover_health_check_broken_upstream(g)
router_wait_prioritized(g, g.replica_1_c)
-- Break replication. Replica changes.
g.replica_1_c:exec(function()
rawset(_G, 'on_replace', function()
box.error(box.error.READONLY)
end)
box.space.test:on_replace(_G.on_replace)
end)
g.replica_1_a:exec(function()
local bid = _G.get_first_bucket()
box.space.test:replace{1, bid}
end)
g.replica_1_c:exec(function(id)
rawset(_G, 'old_idle', ivconst.REPLICA_MAX_IDLE)
ivconst.REPLICA_MAX_IDLE = 0.1
-- On 1.10 and 2.8 upstreaim goes into never ending
-- retry of connecting and applying the failing row.
-- The status of upstream is 'loading' in such case.
-- It should consider itself non-healthy according to
-- it's idle time.
ilt.helpers.retrying({timeout = iwait_timeout}, function()
local upstream = box.info.replication[id].upstream
ilt.assert_not_equals(upstream, nil)
ilt.assert(upstream.status == 'stopped' or
upstream.status == 'loading')
end)
end, {g.replica_1_a:instance_id()})
router_wait_prioritized(g, g.replica_1_b)
-- Either status or idle, depending on version.
local msg = 'Replica %s is unhealthy'
msg = string.format(msg, g.replica_1_c:instance_uuid())
t.assert(g.router:grep_log(msg), msg)

-- Drop on_replace trigger. Replica returns.
g.replica_1_c:exec(function()
ivconst.REPLICA_MAX_IDLE = _G.old_idle
box.space.test:on_replace(nil, _G.on_replace)
local replication = box.cfg.replication
box.cfg{replication = {}}
box.cfg{replication = replication}
end)
router_wait_prioritized(g, g.replica_1_c)
msg = 'Replica %s is healthy'
msg = string.format(msg, g.replica_1_c:instance_uuid())
t.assert(g.router:grep_log(msg), msg)
g.replica_1_a:exec(function()
box.space.test:truncate()
end)
end

local function failover_health_check_big_lag(g)
router_wait_prioritized(g, g.replica_1_c)
g.replica_1_c:exec(function()
-- There's no other simple way to increase lag without debug build.
rawset(_G, 'old_call', ivshard.storage._call)
ivshard.storage._call = function(service_name, ...)
if service_name == 'info' then
return {
health = {
master_upstream = {
status = 'follow',
idle = 0,
lag = 100500,
},
}
}
end
return _G.old_call(service_name, ...)
end
end)
router_wait_prioritized(g, g.replica_1_b)
local msg = 'Replica %s is unhealthy: Upstream to master has lag'
msg = string.format(msg, g.replica_1_c:instance_uuid())
t.assert(g.router:grep_log(msg), msg)

g.replica_1_c:exec(function()
ivshard.storage._call = _G.old_call
end)
router_wait_prioritized(g, g.replica_1_c)
msg = 'Replica %s is healthy'
msg = string.format(msg, g.replica_1_c:instance_uuid())
t.assert(g.router:grep_log(msg), msg)
end

local function failover_health_check_small_failover_timeout(g)
local old_cfg = g.router:exec(function()
return ivshard.router.internal.static_router.current_cfg
end)
local new_global_cfg = table.deepcopy(old_cfg)
new_global_cfg.failover_ping_timeout = 0.0000001
vtest.router_cfg(g.router, new_global_cfg)
g.router:exec(function()
local r = ivshard.router.internal.static_router
ivtest.service_wait_for_new_ok(r.failover_service,
{on_yield = function() r.failover_f:wakeup() end})
end)
-- Since all nodes are broken, prioritized replica is not changed.
router_assert_prioritized(g, g.replica_1_c)
vtest.router_cfg(g.router, old_cfg)
end

local function failover_health_check_missing_master(g)
router_wait_prioritized(g, g.replica_1_c)
local old_cfg = g.replica_1_c:exec(function()
return ivshard.storage.internal.current_cfg
end)
local new_cfg = table.deepcopy(old_cfg)
for _, rs in pairs(new_cfg.sharding) do
for _, r in pairs(rs.replicas) do
r.master = false
end
end
g.replica_1_c:exec(function(cfg)
ivshard.storage.cfg(cfg, box.info.uuid)
end, {new_cfg})

local msg = 'The healthiness of the replica %s is unknown:'
msg = string.format(msg, g.replica_1_c:instance_uuid())
t.helpers.retrying({timeout = wait_timeout}, function()
t.assert(g.router:grep_log(msg))
end)

-- Not changed. Not enough info.
router_assert_prioritized(g, g.replica_1_c)
g.replica_1_c:exec(function(cfg)
ivshard.storage.cfg(cfg, box.info.uuid)
end, {old_cfg})
end

local function failover_health_check_master_switch(g)
router_wait_prioritized(g, g.replica_1_c)
local box_cfg = g.replica_1_c:get_box_cfg()
g.replica_1_c:update_box_cfg({replication = {
g.replica_1_a.net_box_uri,
g.replica_1_c.net_box_uri,
}})
-- Since the connection is not to master, it doesn't affect health.
router_assert_prioritized(g, g.replica_1_c)
-- In real life it will take 30 seconds to notice master change and
-- unhealthiness of itself. Before the patch the non-master never knew,
-- where the master currently is. Now, since we try to check status of
-- master's upstream, we need to find this master in service_info via
-- conn_manager. Since after that replica doesn't do any requests to
-- master, the connection is collected by conn_manager in
-- collect_idle_conns after 30 seconds. Then router's failover calls
-- service_info one more time and non-master locates master, which
-- may have already changed.
local function replica_drop_conn_to_master(replica)
replica:exec(function()
local old_noactivity = ivconst.REPLICA_NOACTIVITY_TIMEOUT
ivconst.REPLICA_NOACTIVITY_TIMEOUT = 0.1
local rs = ivshard.storage.internal.this_replicaset
ilt.helpers.retrying({timeout = iwait_timeout}, function()
ivshard.storage.internal.conn_manager_fiber:wakeup()
ilt.assert_equals(rs.master, nil)
end)
ivconst.REPLICA_NOACTIVITY_TIMEOUT = old_noactivity
end)
end
-- Change master and check that replica notices, that it's not healthy.
replica_drop_conn_to_master(g.replica_1_c)
g.replica_1_a:update_box_cfg{read_only = true}
g.replica_1_b:update_box_cfg{read_only = false}
router_wait_prioritized(g, g.replica_1_b)
replica_drop_conn_to_master(g.replica_1_c)
g.replica_1_a:update_box_cfg{read_only = false}
g.replica_1_b:update_box_cfg{read_only = true}
router_wait_prioritized(g, g.replica_1_c)

g.replica_1_c:update_box_cfg{replication = box_cfg.replication}
end

--
-- gh-453: router should not route requests to replicas, which
-- are not up to date. This requires properly working connection
-- with the master.
--
local function failover_health_check(g, auto_master)
prepare_failover_health_check(g)

if auto_master then
local current_cfg = g.router:exec(function()
return ivshard.router.internal.static_router.current_cfg
end)
for _, rs in pairs(current_cfg.sharding) do
rs.master = 'auto'
for _, r in pairs(rs.replicas) do
r.master = nil
end
end
vtest.router_cfg(g.router, current_cfg)

current_cfg.weights = nil
current_cfg.zone = nil
vtest.cluster_cfg(g, current_cfg)
g.replica_1_a:update_box_cfg{read_only = false}
g.replica_1_b:update_box_cfg{read_only = true}
g.replica_2_a:update_box_cfg{read_only = false}
g.replica_2_b:update_box_cfg{read_only = true}
end

failover_health_check_missing_upstream(g)
failover_health_check_broken_upstream(g)
failover_health_check_big_lag(g)
failover_health_check_small_failover_timeout(g)
if not auto_master then
failover_health_check_missing_master(g)
else
failover_health_check_master_switch(g)
vtest.cluster_cfg(g, global_cfg)
end

vtest.router_cfg(g.router, global_cfg)
g.router:exec(function()
ivconst.FAILOVER_UP_TIMEOUT = _G.old_failover_up_timeout
end)
end

g.test_failover_health_check = function(g)
failover_health_check(g, false)
end

g.test_failover_health_check_auto_master = function(g)
failover_health_check(g, true)
end
1 change: 1 addition & 0 deletions test/router/router.result
Original file line number Diff line number Diff line change
@@ -1533,6 +1533,7 @@ error_messages
- Use replica:detach_conn(...) instead of replica.detach_conn(...)
- Use replica:is_connected(...) instead of replica.is_connected(...)
- Use replica:safe_uri(...) instead of replica.safe_uri(...)
- Use replica:update_health_status(...) instead of replica.update_health_status(...)
...
--
-- gh-117: Preserve route_map on router.cfg.
4 changes: 3 additions & 1 deletion vshard/consts.lua
Original file line number Diff line number Diff line change
@@ -60,9 +60,11 @@ return {
RECOVERY_BACKOFF_INTERVAL = 5,
RECOVERY_GET_STAT_TIMEOUT = 5,
REPLICA_BACKOFF_INTERVAL = 5,
REPLICA_NOACTIVITY_TIMEOUT = 60 * 5,
REPLICA_NOACTIVITY_TIMEOUT = 30,
DEFAULT_BUCKET_SEND_TIMEOUT = 10,
DEFAULT_BUCKET_RECV_TIMEOUT = 10,
REPLICA_MAX_LAG = 30,
REPLICA_MAX_IDLE = 30,

DEFAULT_SCHED_REF_QUOTA = 300,
DEFAULT_SCHED_MOVE_QUOTA = 1,
30 changes: 27 additions & 3 deletions vshard/replicaset.lua
Original file line number Diff line number Diff line change
@@ -26,6 +26,9 @@
-- net_sequential_fail = <count of sequential failed
-- requests to the replica>,
-- is_outdated = nil/true,
-- health_status = <STATUS.GREEN - replica is healthy,
-- STATUS.YELLOW - health of replica is unknown,
-- STATUS.RED - replica is unhealthy>,
-- }
-- },
-- master = <master server from the array above>,
@@ -486,7 +489,8 @@ local function replicaset_down_replica_priority(replicaset)
assert(old_replica and ((old_replica.down_ts and
not old_replica:is_connected()) or
old_replica.net_sequential_fail >=
consts.FAILOVER_DOWN_SEQUENTIAL_FAIL))
consts.FAILOVER_DOWN_SEQUENTIAL_FAIL or
old_replica.health_status == consts.STATUS.RED))
local new_replica = old_replica.next_by_priority
if new_replica then
assert(new_replica ~= old_replica)
@@ -513,7 +517,8 @@ local function replicaset_up_replica_priority(replicaset)
-- Failed to up priority.
return
end
local is_healthy = replica.net_sequential_ok > 0
local is_healthy = replica.health_status == consts.STATUS.GREEN
is_healthy = is_healthy and replica.net_sequential_ok > 0
if replica:is_connected() and (is_healthy or not old_replica) then
assert(replica.net_sequential_fail == 0)
replicaset.replica = replica
@@ -1213,6 +1218,24 @@ local function locate_masters(replicasets)
return is_all_done, is_all_nop, last_err
end

local function replica_update_health_status(replica, status, reason)
if replica.health_status == status then
return
end

replica.health_status = status
if status == consts.STATUS.GREEN then
log.info("Replica %s is healthy", replica.id)
elseif status == consts.STATUS.RED then
assert(reason ~= nil)
log.warn("Replica %s is unhealthy: %s", replica.id, reason)
else
assert(status == consts.STATUS.YELLOW and reason ~= nil)
log.warn("The healthiness of the replica %s is unknown: %s",
replica.id, reason)
end
end

--
-- Meta-methods
--
@@ -1271,6 +1294,7 @@ local replica_mt = {
end,
detach_conn = replica_detach_conn,
call = replica_call,
update_health_status = replica_update_health_status,
},
__tostring = function(replica)
if replica.name then
@@ -1469,7 +1493,7 @@ local function buildall(sharding_cfg)
zone = replica.zone, net_timeout = consts.CALL_TIMEOUT_MIN,
net_sequential_ok = 0, net_sequential_fail = 0,
down_ts = curr_ts, backoff_ts = nil, backoff_err = nil,
id = replica_id,
id = replica_id, health_status = consts.STATUS.YELLOW,
}, replica_mt)
new_replicaset.replicas[replica_id] = new_replica
if replica.master then
49 changes: 47 additions & 2 deletions vshard/router/init.lua
Original file line number Diff line number Diff line change
@@ -1190,7 +1190,11 @@ local function failover_need_down_priority(replicaset, curr_ts)
-- is wrong with it. Temporary lower its priority.
local is_sequential_fails =
r.net_sequential_fail >= consts.FAILOVER_DOWN_SEQUENTIAL_FAIL
return is_down_ts or is_sequential_fails
-- If replica doesn't have proper connection with its master: connection
-- is dead, lag or idle time is too big - then replica has outdated data
-- and shouldn't be used as prioritized.
local is_unhealthy = r.health_status == consts.STATUS.RED
return is_down_ts or is_sequential_fails or is_unhealthy
end

--
@@ -1218,8 +1222,49 @@ local function failover_collect_to_update(router)
return id_to_update
end

--
-- Check, whether the instance has properly working connection to the master.
-- Returns the status of the replica and the reason, if it's STATUS.RED.
--
local function failover_check_upstream(upstream)
if not upstream then
return consts.STATUS.RED, 'Missing upstream from the master'
end
local status = upstream.status
if not status or status == 'stopped' or status == 'disconnected' then
-- All other states mean either that everything is ok ('follow')
-- or that replica is connecting. In all these cases replica
-- is considered healthy.
local msg = string.format('Upstream to master has status "%s"', status)
return consts.STATUS.RED, msg
end
if upstream.idle and upstream.idle > consts.REPLICA_MAX_IDLE then
local msg = string.format('Upstream to master has idle %.15f > %d',
upstream.idle, consts.REPLICA_MAX_IDLE)
return consts.STATUS.RED, msg
end
if upstream.lag and upstream.lag > consts.REPLICA_MAX_LAG then
local msg = string.format('Upstream to master has lag %.15f > %d',
upstream.lag, consts.REPLICA_MAX_LAG)
return consts.STATUS.RED, msg
end
return consts.STATUS.GREEN
end

local function failover_ping(replica, opts)
return replica:call('vshard.storage._call', {'info'}, opts)
local info_opts = {timeout = opts.timeout, with_health = true}
local net_status, info, err =
replica:call('vshard.storage._call', {'info', info_opts}, opts)
if not info then
replica:update_health_status(consts.STATUS.YELLOW, err)
return net_status, info, err
end

assert(type(info.health) == 'table')
local health_status, reason =
failover_check_upstream(info.health.master_upstream)
replica:update_health_status(health_status, reason)
return net_status, info, err
end

local function failover_ping_round(router, curr_ts)
57 changes: 55 additions & 2 deletions vshard/storage/init.lua
Original file line number Diff line number Diff line change
@@ -3297,11 +3297,64 @@ local function storage_map(rid, name, args)
return true
end

local function storage_service_info()
return {
--
-- Returns info about the state of the replica. Used in router failover.
-- Currently returns only the state of replication from master.
--
local function storage_health_state(timeout)
if this_is_master() then
-- Master is always up to date with itself.
return {
master_upstream = {
status = 'follow',
idle = 0,
lag = 0,
},
}
end
-- Find master. In case of 'auto' master it may be unknown.
local master = M.this_replicaset:wait_master(timeout)
if not master then
return nil, lerror.timeout()
end
-- Find master in box.info.replication.
for _, replica in ipairs(box.info.replication) do
if ((master.name ~= nil and replica.name == master.name) or
(master.uuid ~= nil and replica.uuid == master.uuid)) then
-- The instance was started without master in box.cfg.replication.
-- Should not happen on vshard storage at all.
if not replica.upstream then
return {}
end
return {
master_upstream = {
status = replica.upstream.status,
idle = replica.upstream.idle,
lag = replica.upstream.lag,
}
}
end
end
-- We may not find the master at all, e.g. when name identification is used
-- names have not been set yet and uuid have not been passed to the config.
return nil, lerror.vshard(lerror.code.MISSING_MASTER, M.this_replicaset.id)
end

local function storage_service_info(opts)
local info = {
is_master = this_is_master(),
name = box.info.name,
}
if opts and opts.with_health then
local timeout = opts.timeout or consts.MASTER_SEARCH_TIMEOUT
local health, err = storage_health_state(timeout)
if not health then
-- Info is atomic, partial result is not returned.
return nil, err
end
info.health = health
end
return info
end

local service_call_api

0 comments on commit 1cd42d6

Please sign in to comment.