Skip to content

Commit

Permalink
Update slotmap after longer interruptions
Browse files Browse the repository at this point in the history
Includes a new testcase simulating that all nodes are unreachable.

Signed-off-by: Björn Svensson <bjorn.a.svensson@est.tech>
  • Loading branch information
bjosv committed Oct 31, 2024
1 parent 49983bf commit 1a691fd
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 5 deletions.
18 changes: 13 additions & 5 deletions src/ered_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
convergence_check = nok :: convergence_check(),

info_pid = [] :: [pid()],
update_delay = 1000, % 1s delay between slot map update requests
client_opts = [],
update_slot_wait = 500,
min_replicas = 0,
Expand Down Expand Up @@ -256,9 +255,18 @@ handle_info(Msg = {connection_status, {Pid, Addr, _Id}, Status}, State0) ->
pending = sets:del_element(Addr, State#st.pending),
reconnecting = sets:del_element(Addr, State#st.reconnecting)};
connection_up ->
State#st{up = sets:add_element(Addr, State#st.up),
pending = sets:del_element(Addr, State#st.pending),
reconnecting = sets:del_element(Addr, State#st.reconnecting)};
NewState = State#st{up = sets:add_element(Addr, State#st.up),
pending = sets:del_element(Addr, State#st.pending)},
%% If we already have known masters, but this node is the
%% first to be up, then we have had a longer interruption
%% that requires a slotmap update.
case (sets:is_empty(State#st.up) andalso
not sets:is_empty(State#st.masters)) of
true ->
start_periodic_slot_info_request(NewState);
false ->
NewState
end;
queue_full ->
State#st{queue_full = sets:add_element(Addr, State#st.queue_full)};
queue_ok ->
Expand Down Expand Up @@ -664,7 +672,7 @@ start_clients(Addrs, State) ->
{State#st.nodes, State#st.closing},
Addrs),

State#st{nodes = maps:merge(State#st.nodes, NewNodes),
State#st{nodes = NewNodes,
pending = sets:union(State#st.pending,
sets:subtract(new_set(maps:keys(NewNodes)),
State#st.up)),
Expand Down
58 changes: 58 additions & 0 deletions test/ered_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ all() ->
t_manual_failover,
t_manual_failover_then_old_master_down,
t_blackhole,
t_blackhole_all_nodes,
t_init_timeout,
t_empty_slotmap,
t_empty_initial_slotmap,
Expand Down Expand Up @@ -467,6 +468,63 @@ t_blackhole(_) ->

no_more_msgs().

t_blackhole_all_nodes(_) ->
%% Simulate that all nodes are unreachable, e.g. a network failure. We use
%% 'docket pause', similar to sending SIGSTOP to a process, to make the
%% nodes unresponsive. This makes TCP recv() and connect() time out.
CloseWait = 2000, % default is 10000
NodeDownTimeout = 2000, % default is 2000
ResponseTimeout = 10000, % default is 10000
R = start_cluster([{close_wait, CloseWait},
%% Require replicas for 'cluster OK'.
{min_replicas, 1},
{client_opts,
[{node_down_timeout, NodeDownTimeout},
{connection_opts,
[{response_timeout, ResponseTimeout}]}]}
]),

%% Pause all nodes
lists:foreach(fun(Port) ->
Pod = get_pod_name_from_port(Port),
ct:pal("Pausing container: " ++ os:cmd("docker pause " ++ Pod))
end, ?PORTS),

%% Send PING to all nodes and expect closed sockets, error replies for sent requests,
%% and a report that the cluster is not ok.
TestPid = self(),
AddrToPid = ered:get_addr_to_client_map(R),
maps:foreach(fun(_ClientAddr, ClientPid) ->
ered:command_client_async(ClientPid, [<<"PING">>],
fun(Reply) -> TestPid ! {ping_reply, Reply} end)
end, AddrToPid),

[?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, addr := {"127.0.0.1", Port}},
ResponseTimeout + 1000) || Port <- ?PORTS],
?MSG({ping_reply, {error, _Reason1}}, NodeDownTimeout + 1000),
?MSG({ping_reply, {error, _Reason2}}, NodeDownTimeout + 1000),
?MSG({ping_reply, {error, _Reason3}}, NodeDownTimeout + 1000),
?MSG({ping_reply, {error, _Reason4}}, NodeDownTimeout + 1000),
?MSG({ping_reply, {error, _Reason5}}, NodeDownTimeout + 1000),
?MSG({ping_reply, {error, _Reason6}}, NodeDownTimeout + 1000),
[?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}) || Port <- ?PORTS],
?MSG(#{msg_type := cluster_not_ok, reason := master_down}),

%% Unpause all nodes
lists:foreach(fun(Port) ->
Pod = get_pod_name_from_port(Port),
ct:pal("Unpausing container: " ++ os:cmd("docker unpause " ++ Pod))
end, ?PORTS),
timer:sleep(500),

wait_for_consistent_cluster(),

%% Expect connects and a cluster ok.
[?MSG(#{msg_type := connected, addr := {"127.0.0.1", Port}}, 10000) || Port <- ?PORTS],
?MSG(#{msg_type := cluster_ok}, 10000),

no_more_msgs().


t_init_timeout(_) ->
Opts = [
Expand Down

0 comments on commit 1a691fd

Please sign in to comment.