Skip to content

Commit

Permalink
Include node address in info msg about updated slotmap
Browse files Browse the repository at this point in the history
  • Loading branch information
zuiderkwast committed Sep 5, 2023
1 parent c3a12fb commit 7a02f7a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
22 changes: 9 additions & 13 deletions src/ered_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ handle_info(Msg = {connection_status, {_Pid, Addr, _Id} , Status}, State) ->
{noreply, update_cluster_state(ClusterStatus, State1)}
end;

handle_info({slot_info, Version, Response}, State) ->
handle_info({slot_info, Version, Response, FromAddr}, State) ->
case Response of
_ when Version < State#st.slot_map_version ->
%% got a response for a request triggered for an old version of the slot map, ignore
Expand All @@ -229,11 +229,11 @@ handle_info({slot_info, Version, Response}, State) ->
{noreply, State};
{ok, {error, Error}} ->
%% error sent from redis
ered_info_msg:cluster_slots_error_response(Error, State#st.info_pid),
ered_info_msg:cluster_slots_error_response(Error, FromAddr, State#st.info_pid),
{noreply, State};
{ok, []} ->
%% Empty slotmap. Maybe the node has been CLUSTER RESET.
ered_info_msg:cluster_slots_error_response(empty, State#st.info_pid),
ered_info_msg:cluster_slots_error_response(empty, FromAddr, State#st.info_pid),
{noreply, State};
{ok, ClusterSlotsReply} ->
NewMap = lists:sort(ClusterSlotsReply),
Expand All @@ -259,7 +259,8 @@ handle_info({slot_info, Version, Response}, State) ->
NewClosing = maps:merge(maps:from_list([{Addr, TimerRef} || Addr <- Remove]),
State#st.closing),

ered_info_msg:slot_map_updated(ClusterSlotsReply, Version + 1, State#st.info_pid),
ered_info_msg:slot_map_updated(ClusterSlotsReply, Version + 1,
FromAddr, State#st.info_pid),

%% open new clients
State1 = start_clients(Nodes, State),
Expand Down Expand Up @@ -398,9 +399,10 @@ stop_periodic_slot_info_request(State) ->
State#st{slot_timer_ref = none}
end.

send_slot_info_request(Node, State) ->
send_slot_info_request(Addr, State) ->
Node = maps:get(Addr, State#st.nodes),
Pid = self(),
Cb = fun(Answer) -> Pid ! {slot_info, State#st.slot_map_version, Answer} end,
Cb = fun(Answer) -> Pid ! {slot_info, State#st.slot_map_version, Answer, Addr} end,
ered_client:command_async(Node, [<<"CLUSTER">>, <<"SLOTS">>], Cb).

%% Pick a random available node, preferring the ones in PreferredNodes if any of
Expand All @@ -412,15 +414,9 @@ pick_node(PreferredNodes, State) ->
case pick_available_node(shuffle(PreferredNodes), State) of
none ->
%% No preferred node available. Pick one from the 'up' set.
Addr = pick_available_node(shuffle(sets:to_list(State#st.up)), State);
pick_available_node(shuffle(sets:to_list(State#st.up)), State);
Addr ->
Addr
end,
case Addr of
none ->
none;
_ ->
maps:get(Addr, State#st.nodes)
end.

shuffle(List) ->
Expand Down
16 changes: 10 additions & 6 deletions src/ered_info_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
%% Functions used to format and send info messages

-export([connection_status/3,
slot_map_updated/3,
cluster_slots_error_response/2,
slot_map_updated/4,
cluster_slots_error_response/3,
cluster_ok/1,
cluster_nok/2
]).
Expand Down Expand Up @@ -44,9 +44,11 @@

#{msg_type := slot_map_updated,
slot_map := ClusterSlotsReply :: any(),
addr := addr(),
map_version := non_neg_integer()} |

#{msg_type := cluster_slots_error_response,
addr := addr(),
response := RedisReply :: any()} |

#{msg_type := cluster_ok} |
Expand Down Expand Up @@ -86,24 +88,26 @@ connection_status(ClientInfo, IsMaster, Pids) ->
Pids).

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec slot_map_updated(ered_lib:slot_map(), non_neg_integer(), [pid()]) -> ok.
-spec slot_map_updated(ered_lib:slot_map(), non_neg_integer(), addr(), [pid()]) -> ok.
%%
%% A new slot map received from Redis, different from the current one.
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
slot_map_updated(ClusterSlotsReply, Version, Pids) ->
slot_map_updated(ClusterSlotsReply, Version, FromAddr, Pids) ->
send_info(#{msg_type => slot_map_updated,
slot_map => ClusterSlotsReply,
addr => FromAddr,
map_version => Version},
Pids).


%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec cluster_slots_error_response(binary() | empty, [pid()]) -> ok.
-spec cluster_slots_error_response(binary() | empty, addr(), [pid()]) -> ok.
%%
%% Redis returned an error message when trying to fetch the slot map.
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
cluster_slots_error_response(Response, Pids) ->
cluster_slots_error_response(Response, FromAddr, Pids) ->
send_info(#{msg_type => cluster_slots_error_response,
addr => FromAddr,
response => Response},
Pids).

Expand Down

0 comments on commit 7a02f7a

Please sign in to comment.