Skip to content

Commit

Permalink
Update slotmap immediately when a connection is lost
Browse files Browse the repository at this point in the history
The purpose is to detect the scenario that a failover has happened
and the connection to the old master (now replica or no longer part
of the cluster) doesn't necessarily mean that we have lost the
connection to a master.

Thus we postpone signaling 'cluster_not_ok' until the node down
timeout has fired. This also means that reconnect failures don't
immediately trigger 'cluster_not_ok'.

A few test cases are affected by this change. Additionally some
refactoring of test code is done to be able to reuse parts of it.
  • Loading branch information
zuiderkwast committed Sep 18, 2023
1 parent 326952c commit 7259cde
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 35 deletions.
26 changes: 21 additions & 5 deletions src/ered_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,25 @@ handle_info(Msg = {connection_status, {_Pid, Addr, _Id} , Status}, State) ->
IsMaster = sets:is_element(Addr, State#st.masters),
ered_info_msg:connection_status(Msg, IsMaster, State#st.info_pid),
State1 = case Status of
{connection_down, {socket_closed, _}} ->
{connection_down, {Reason, _}} when Reason =:= socket_closed;
Reason =:= connect_error ->
%% Avoid triggering the alarm for a socket closed by the
%% peer. The cluster will be marked down on failed
%% reconnect or node down event.
State#st{reconnecting = sets:add_element(Addr, State#st.reconnecting)};
%% peer. The cluster will be marked down on the node down
%% timeout.
Reconnecting = sets:add_element(Addr, State#st.reconnecting),
NewState = State#st{reconnecting = Reconnecting},
case (sets:is_element(Addr, State#st.masters) andalso
sets:is_element(Addr, State#st.up) andalso
not sets:is_element(Addr, State#st.reconnecting)) of
true ->
%% Update the slotmap now, just in case the node
%% which is failing is no longer a master, so we
%% don't need to signal 'cluster_not_ok' if we can
%% avoid it.
start_periodic_slot_info_request(NewState);
false ->
NewState
end;
{connection_down,_} ->
State#st{up = sets:del_element(Addr, State#st.up),
pending = sets:del_element(Addr, State#st.pending),
Expand Down Expand Up @@ -440,7 +454,9 @@ node_is_available(Addr, State) ->

-spec replicas_of_unavailable_masters(#st{}) -> [addr()].
replicas_of_unavailable_masters(State) ->
DownMasters = sets:subtract(State#st.masters, State#st.up),
DownMasters = sets:subtract(State#st.masters,
sets:subtract(State#st.up,
State#st.reconnecting)),
case sets:is_empty(DownMasters) of
true ->
[];
Expand Down
135 changes: 105 additions & 30 deletions test/ered_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ all() ->
t_scan_delete_keys,
t_hard_failover,
t_manual_failover,
t_manual_failover_then_old_master_down,
t_blackhole,
t_init_timeout,
t_empty_slotmap,
Expand All @@ -35,6 +36,13 @@ all() ->

-define(MSG(Pattern), ?MSG(Pattern, 1000)).

-define(OPTIONAL_MSG(Pattern),
receive
Pattern -> ok
after
0 -> ok
end).

-define(PORTS, [30001, 30002, 30003, 30004, 30005, 30006]).

-define(DEFAULT_REDIS_DOCKER_IMAGE, "redis:6.2.7").
Expand All @@ -60,6 +68,16 @@ init_per_suite(_Config) ->
wait_for_consistent_cluster(),
[].

init_per_testcase(_Testcase, Config) ->
%% Quick check that cluster is OK; otherwise restart everything.
case catch check_cluster_consistent(?PORTS) of
ok ->
[];
_ ->
ct:pal("Re-initialize the cluster"),
init_per_suite(Config)
end.

create_cluster() ->
Image = os:getenv("REDIS_DOCKER_IMAGE", ?DEFAULT_REDIS_DOCKER_IMAGE),
Hosts = [io_lib:format("127.0.0.1:~p ", [P]) || P <- ?PORTS],
Expand All @@ -81,30 +99,39 @@ reset_cluster() ->
%% Wait until cluster is consistent, i.e all nodes have the same single view
%% of the slot map and all cluster nodes are included in the slot map.
wait_for_consistent_cluster() ->
wait_for_consistent_cluster(?PORTS).

wait_for_consistent_cluster(Ports) ->
fun Loop(N) ->
SlotMaps = [fun(Port) ->
{ok, Pid} = ered_client:start_link("127.0.0.1", Port, []),
{ok, SlotMap} = ered_client:command(Pid, [<<"CLUSTER">>, <<"SLOTS">>]),
ered_client:stop(Pid),
SlotMap
end(P) || P <- ?PORTS],
Consistent = case lists:usort(SlotMaps) of
[SlotMap] ->
?PORTS =:= [Port || {_Ip, Port} <- ered_lib:slotmap_all_nodes(SlotMap)];
_NotAllIdentical ->
false
end,
case Consistent of
true ->
case check_cluster_consistent(Ports) of
ok ->
true;
false when N > 0 ->
{error, _} when N > 0 ->
timer:sleep(500),
Loop(N-1);
false ->
{error, SlotMaps} ->
error({timeout_consistent_cluster, SlotMaps})
end
end(20).

check_cluster_consistent(Ports) ->
SlotMaps = [fun(Port) ->
{ok, Pid} = ered_client:start_link("127.0.0.1", Port, []),
{ok, SlotMap} = ered_client:command(Pid, [<<"CLUSTER">>, <<"SLOTS">>]),
ered_client:stop(Pid),
SlotMap
end(P) || P <- Ports],
Consistent = case lists:usort(SlotMaps) of
[SlotMap] ->
Ports =:= [Port || {_Ip, Port} <- ered_lib:slotmap_all_nodes(SlotMap)];
_NotAllIdentical ->
false
end,
case Consistent of
true -> ok;
false -> {error, SlotMaps}
end.

end_per_suite(_Config) ->
stop_containers().

Expand Down Expand Up @@ -173,11 +200,10 @@ t_hard_failover(_) ->
ct:pal(os:cmd("docker stop " ++ Pod)),

?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", Port}, reason := {recv_exit, closed}}),

?MSG(#{msg_type := cluster_not_ok, reason := master_down}),
?MSG(#{msg_type := connect_error, addr := {"127.0.0.1", Port}, reason := econnrefused}),

?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}, 2500),
?MSG(#{msg_type := cluster_not_ok, reason := master_down}),

?MSG(#{msg_type := slot_map_updated}, 5000),
?MSG(#{msg_type := node_deactivated, addr := {"127.0.0.1", Port}}),
Expand All @@ -195,22 +221,24 @@ t_hard_failover(_) ->
%% a client is already connected to the node, so cluster is ok immediately
?MSG(#{msg_type := cluster_ok}),

%% Ignore any previous failed reconnect attempts
?OPTIONAL_MSG(#{msg_type := connect_error, addr := {"127.0.0.1", Port}, reason := econnrefused}),
?OPTIONAL_MSG(#{msg_type := connect_error, addr := {"127.0.0.1", Port}, reason := econnrefused}),
no_more_msgs().

t_manual_failover(_) ->
R = start_cluster(),
SlotMaps = ered:command_all(R, [<<"CLUSTER">>, <<"SLOTS">>]),
ct:pal("~p\n", [SlotMaps]),
do_manual_failover(R) ->
[Client|_] = ered:get_clients(R),
{ok, SlotMap} = ered:command_client(Client, [<<"CLUSTER">>, <<"SLOTS">>]),

ct:pal("~p\n", [SlotMap]),
%% Get the port of a replica node
[[_SlotStart, _SlotEnd, _Master, [_Ip, Port |_] | _] | _] = SlotMap,
[[_SlotStart, _SlotEnd, [_, OldMasterPort |_], [_Ip, Port |_] | _] | _] = SlotMap,

%% sometimes the manual failover is not successful so loop until it is
fun Loop() ->
"OK\n" = os:cmd("redis-cli -p " ++ integer_to_list(Port) ++ " CLUSTER FAILOVER"),
%% Wait for failover to start, otherwise the commands might be sent
%% too early to detect it.
fun Wait(0) ->
Loop();
Wait(N) ->
Expand All @@ -225,8 +253,11 @@ t_manual_failover(_) ->
end
end(10)
end(),
{OldMasterPort, Port}.

%% Wait for failover to start, otherwise the commands might be sent to early to detect
t_manual_failover(_) ->
R = start_cluster(),
{_OldMasterPort, Port} = do_manual_failover(R),
lists:foreach(fun(N) ->
{ok, _} = ered:command(R, [<<"SET">>, N, N], N)
end,
Expand All @@ -236,6 +267,54 @@ t_manual_failover(_) ->
?MSG(#{msg_type := slot_map_updated}),
no_more_msgs().

t_manual_failover_then_old_master_down(_) ->
%% Check that if a manual failover is triggered and the old master is taken
%% down before ered was triggered to update the slotpam, ered still doesn't
%% report 'cluster_not_ok'.
R = start_cluster([{min_replicas, 0}]),
{Port, _NewMasterPort} = do_manual_failover(R),

%% Failover done. Now shutdown the old master and stop the container. The
%% connection to it is lost. Ered still believes it's a master because it
%% has not yet learnt about the failover.
cmd_log(io_lib:format("redis-cli -p ~p SHUTDOWN", [Port])),
cmd_log(io_lib:format("docker stop redis-~p", [Port])),
?MSG(#{addr := {"127.0.0.1", Port},
master := true,
msg_type := socket_closed,
reason := {recv_exit, closed}}),

%% Wait for the cluster to become constentent without the stopped node.
wait_for_consistent_cluster(?PORTS -- [Port]),

%% Start container and wait for it to come up.
cmd_log(io_lib:format("docker start redis-~p", [Port])),
timer:sleep(2000),

%% Wait until the cluster is consistent again.
wait_for_consistent_cluster(),

%% Wait for ered to reconnect to the restarted node.
?MSG(#{msg_type := connected,
addr := {"127.0.0.1", Port},
master := false}),

%% We allow a number of info messages during this procedure, but not the
%% status change events 'cluster_not_ok' and 'cluster_ok'.
fun Loop() ->
receive
#{addr := {"127.0.0.1", Port}, msg_type := Type}
when Type =:= connect_error;
Type =:= node_down_timeout;
Type =:= node_deactivated ->
Loop();
#{msg_type := slot_map_updated} ->
Loop()
after 0 ->
ok
end
end(),
no_more_msgs().

t_blackhole(_) ->
%% Simulate that a Redis node is unreachable, e.g. a network failure. We use
Expand Down Expand Up @@ -267,14 +346,10 @@ t_blackhole(_) ->

?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, master := true},
ResponseTimeout + 1000),
?MSG({ping_reply, {error, node_down}},
?MSG({ping_reply, {error, _Reason}}, % node_down or node_deactivated
NodeDownTimeout + 1000),
?MSG(#{msg_type := node_down_timeout, master := true}),
?MSG(#{msg_type := cluster_not_ok, reason := master_down}),
?MSG(#{msg_type := slot_map_updated},
5000),
?MSG(#{msg_type := slot_map_updated}),
?MSG(#{msg_type := node_deactivated, addr := {"127.0.0.1", Port}}),
?MSG(#{msg_type := cluster_ok}),
?MSG(#{msg_type := client_stopped, reason := normal, master := false},
CloseWait + 1000),

Expand Down

0 comments on commit 7259cde

Please sign in to comment.