From 7259cdeda79f97e50fcb2429e1e8240384050816 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 18 Sep 2023 23:52:17 +0200 Subject: [PATCH] Update slotmap immediately when a connection is lost The purpose is to detect the scenario that a failover has happened and the connection to the old master (now replica or no longer part of the cluster) doesn't necessarily mean that we have lost the connection to a master. Thus we postpone signaling 'cluster_not_ok' until the node down timeout has fired. This also means that reconnect failures don't immediately trigger 'cluster_not_ok'. A few test cases are affected by this change. Additionally some refactoring of test code is done to be able to reuse parts of it. --- src/ered_cluster.erl | 26 +++++++-- test/ered_SUITE.erl | 135 +++++++++++++++++++++++++++++++++---------- 2 files changed, 126 insertions(+), 35 deletions(-) diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index 70e77b8..1b3e43c 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -190,11 +190,25 @@ handle_info(Msg = {connection_status, {_Pid, Addr, _Id} , Status}, State) -> IsMaster = sets:is_element(Addr, State#st.masters), ered_info_msg:connection_status(Msg, IsMaster, State#st.info_pid), State1 = case Status of - {connection_down, {socket_closed, _}} -> + {connection_down, {Reason, _}} when Reason =:= socket_closed; + Reason =:= connect_error -> %% Avoid triggering the alarm for a socket closed by the - %% peer. The cluster will be marked down on failed - %% reconnect or node down event. - State#st{reconnecting = sets:add_element(Addr, State#st.reconnecting)}; + %% peer. The cluster will be marked down on the node down + %% timeout. + Reconnecting = sets:add_element(Addr, State#st.reconnecting), + NewState = State#st{reconnecting = Reconnecting}, + case (sets:is_element(Addr, State#st.masters) andalso + sets:is_element(Addr, State#st.up) andalso + not sets:is_element(Addr, State#st.reconnecting)) of + true -> + %% Update the slotmap now, just in case the node + %% which is failing is no longer a master, so we + %% don't need to signal 'cluster_not_ok' if we can + %% avoid it. + start_periodic_slot_info_request(NewState); + false -> + NewState + end; {connection_down,_} -> State#st{up = sets:del_element(Addr, State#st.up), pending = sets:del_element(Addr, State#st.pending), @@ -440,7 +454,9 @@ node_is_available(Addr, State) -> -spec replicas_of_unavailable_masters(#st{}) -> [addr()]. replicas_of_unavailable_masters(State) -> - DownMasters = sets:subtract(State#st.masters, State#st.up), + DownMasters = sets:subtract(State#st.masters, + sets:subtract(State#st.up, + State#st.reconnecting)), case sets:is_empty(DownMasters) of true -> []; diff --git a/test/ered_SUITE.erl b/test/ered_SUITE.erl index 55f8491..ac5cf67 100644 --- a/test/ered_SUITE.erl +++ b/test/ered_SUITE.erl @@ -12,6 +12,7 @@ all() -> t_scan_delete_keys, t_hard_failover, t_manual_failover, + t_manual_failover_then_old_master_down, t_blackhole, t_init_timeout, t_empty_slotmap, @@ -35,6 +36,13 @@ all() -> -define(MSG(Pattern), ?MSG(Pattern, 1000)). +-define(OPTIONAL_MSG(Pattern), + receive + Pattern -> ok + after + 0 -> ok + end). + -define(PORTS, [30001, 30002, 30003, 30004, 30005, 30006]). -define(DEFAULT_REDIS_DOCKER_IMAGE, "redis:6.2.7"). @@ -60,6 +68,16 @@ init_per_suite(_Config) -> wait_for_consistent_cluster(), []. +init_per_testcase(_Testcase, Config) -> + %% Quick check that cluster is OK; otherwise restart everything. + case catch check_cluster_consistent(?PORTS) of + ok -> + []; + _ -> + ct:pal("Re-initialize the cluster"), + init_per_suite(Config) + end. + create_cluster() -> Image = os:getenv("REDIS_DOCKER_IMAGE", ?DEFAULT_REDIS_DOCKER_IMAGE), Hosts = [io_lib:format("127.0.0.1:~p ", [P]) || P <- ?PORTS], @@ -81,30 +99,39 @@ reset_cluster() -> %% Wait until cluster is consistent, i.e all nodes have the same single view %% of the slot map and all cluster nodes are included in the slot map. wait_for_consistent_cluster() -> + wait_for_consistent_cluster(?PORTS). + +wait_for_consistent_cluster(Ports) -> fun Loop(N) -> - SlotMaps = [fun(Port) -> - {ok, Pid} = ered_client:start_link("127.0.0.1", Port, []), - {ok, SlotMap} = ered_client:command(Pid, [<<"CLUSTER">>, <<"SLOTS">>]), - ered_client:stop(Pid), - SlotMap - end(P) || P <- ?PORTS], - Consistent = case lists:usort(SlotMaps) of - [SlotMap] -> - ?PORTS =:= [Port || {_Ip, Port} <- ered_lib:slotmap_all_nodes(SlotMap)]; - _NotAllIdentical -> - false - end, - case Consistent of - true -> + case check_cluster_consistent(Ports) of + ok -> true; - false when N > 0 -> + {error, _} when N > 0 -> timer:sleep(500), Loop(N-1); - false -> + {error, SlotMaps} -> error({timeout_consistent_cluster, SlotMaps}) end end(20). +check_cluster_consistent(Ports) -> + SlotMaps = [fun(Port) -> + {ok, Pid} = ered_client:start_link("127.0.0.1", Port, []), + {ok, SlotMap} = ered_client:command(Pid, [<<"CLUSTER">>, <<"SLOTS">>]), + ered_client:stop(Pid), + SlotMap + end(P) || P <- Ports], + Consistent = case lists:usort(SlotMaps) of + [SlotMap] -> + Ports =:= [Port || {_Ip, Port} <- ered_lib:slotmap_all_nodes(SlotMap)]; + _NotAllIdentical -> + false + end, + case Consistent of + true -> ok; + false -> {error, SlotMaps} + end. + end_per_suite(_Config) -> stop_containers(). @@ -173,11 +200,10 @@ t_hard_failover(_) -> ct:pal(os:cmd("docker stop " ++ Pod)), ?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", Port}, reason := {recv_exit, closed}}), - - ?MSG(#{msg_type := cluster_not_ok, reason := master_down}), ?MSG(#{msg_type := connect_error, addr := {"127.0.0.1", Port}, reason := econnrefused}), ?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}, 2500), + ?MSG(#{msg_type := cluster_not_ok, reason := master_down}), ?MSG(#{msg_type := slot_map_updated}, 5000), ?MSG(#{msg_type := node_deactivated, addr := {"127.0.0.1", Port}}), @@ -195,22 +221,24 @@ t_hard_failover(_) -> %% a client is already connected to the node, so cluster is ok immediately ?MSG(#{msg_type := cluster_ok}), + %% Ignore any previous failed reconnect attempts + ?OPTIONAL_MSG(#{msg_type := connect_error, addr := {"127.0.0.1", Port}, reason := econnrefused}), + ?OPTIONAL_MSG(#{msg_type := connect_error, addr := {"127.0.0.1", Port}, reason := econnrefused}), no_more_msgs(). -t_manual_failover(_) -> - R = start_cluster(), - SlotMaps = ered:command_all(R, [<<"CLUSTER">>, <<"SLOTS">>]), - ct:pal("~p\n", [SlotMaps]), +do_manual_failover(R) -> [Client|_] = ered:get_clients(R), {ok, SlotMap} = ered:command_client(Client, [<<"CLUSTER">>, <<"SLOTS">>]), ct:pal("~p\n", [SlotMap]), %% Get the port of a replica node - [[_SlotStart, _SlotEnd, _Master, [_Ip, Port |_] | _] | _] = SlotMap, + [[_SlotStart, _SlotEnd, [_, OldMasterPort |_], [_Ip, Port |_] | _] | _] = SlotMap, %% sometimes the manual failover is not successful so loop until it is fun Loop() -> "OK\n" = os:cmd("redis-cli -p " ++ integer_to_list(Port) ++ " CLUSTER FAILOVER"), + %% Wait for failover to start, otherwise the commands might be sent + %% too early to detect it. fun Wait(0) -> Loop(); Wait(N) -> @@ -225,8 +253,11 @@ t_manual_failover(_) -> end end(10) end(), + {OldMasterPort, Port}. - %% Wait for failover to start, otherwise the commands might be sent to early to detect +t_manual_failover(_) -> + R = start_cluster(), + {_OldMasterPort, Port} = do_manual_failover(R), lists:foreach(fun(N) -> {ok, _} = ered:command(R, [<<"SET">>, N, N], N) end, @@ -236,6 +267,54 @@ t_manual_failover(_) -> ?MSG(#{msg_type := slot_map_updated}), no_more_msgs(). +t_manual_failover_then_old_master_down(_) -> + %% Check that if a manual failover is triggered and the old master is taken + %% down before ered was triggered to update the slotpam, ered still doesn't + %% report 'cluster_not_ok'. + R = start_cluster([{min_replicas, 0}]), + {Port, _NewMasterPort} = do_manual_failover(R), + + %% Failover done. Now shutdown the old master and stop the container. The + %% connection to it is lost. Ered still believes it's a master because it + %% has not yet learnt about the failover. + cmd_log(io_lib:format("redis-cli -p ~p SHUTDOWN", [Port])), + cmd_log(io_lib:format("docker stop redis-~p", [Port])), + ?MSG(#{addr := {"127.0.0.1", Port}, + master := true, + msg_type := socket_closed, + reason := {recv_exit, closed}}), + + %% Wait for the cluster to become constentent without the stopped node. + wait_for_consistent_cluster(?PORTS -- [Port]), + + %% Start container and wait for it to come up. + cmd_log(io_lib:format("docker start redis-~p", [Port])), + timer:sleep(2000), + + %% Wait until the cluster is consistent again. + wait_for_consistent_cluster(), + + %% Wait for ered to reconnect to the restarted node. + ?MSG(#{msg_type := connected, + addr := {"127.0.0.1", Port}, + master := false}), + + %% We allow a number of info messages during this procedure, but not the + %% status change events 'cluster_not_ok' and 'cluster_ok'. + fun Loop() -> + receive + #{addr := {"127.0.0.1", Port}, msg_type := Type} + when Type =:= connect_error; + Type =:= node_down_timeout; + Type =:= node_deactivated -> + Loop(); + #{msg_type := slot_map_updated} -> + Loop() + after 0 -> + ok + end + end(), + no_more_msgs(). t_blackhole(_) -> %% Simulate that a Redis node is unreachable, e.g. a network failure. We use @@ -267,14 +346,10 @@ t_blackhole(_) -> ?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, master := true}, ResponseTimeout + 1000), - ?MSG({ping_reply, {error, node_down}}, + ?MSG({ping_reply, {error, _Reason}}, % node_down or node_deactivated NodeDownTimeout + 1000), - ?MSG(#{msg_type := node_down_timeout, master := true}), - ?MSG(#{msg_type := cluster_not_ok, reason := master_down}), - ?MSG(#{msg_type := slot_map_updated}, - 5000), + ?MSG(#{msg_type := slot_map_updated}), ?MSG(#{msg_type := node_deactivated, addr := {"127.0.0.1", Port}}), - ?MSG(#{msg_type := cluster_ok}), ?MSG(#{msg_type := client_stopped, reason := normal, master := false}, CloseWait + 1000),