Skip to content

Commit

Permalink
Handle reap and erase in batches (#1862)
Browse files Browse the repository at this point in the history
Avoid overloading the eraser/reaper process mailbox by sending the requests in batches (as already happened with range_repl), and waiting for a response.

When a job is used, not local, the batching is done from the clusteraae_fsm.  This mechanism existed prior to this commit, and has not been changed, but has been extended to support the last-batch overflow
  • Loading branch information
martinsumner committed Jun 9, 2023
1 parent d7034f1 commit 44bfdc4
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 39 deletions.
26 changes: 19 additions & 7 deletions src/riak_kv_clusteraae_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,25 @@ process_results(Results, State) ->
{siblings, merge_countinlists(A_SbL, R_SbL)}];
QT when QT == erase_keys; QT == reap_tombs ->
case Results of
{[], Count, local} ->
{[], element(2, Acc) + Count, local};
{BKDHL, Count, local} ->
UpdCount = element(2, Acc) + Count,
Mod =
case QT of
erase_keys ->
riak_kv_eraser;
reap_tombs ->
riak_kv_reaper
end,
handle_in_batches(
QT, lists:reverse(BKDHL), 0, Mod),
{[], UpdCount, local};
{[], Count, count} ->
{[], element(2, Acc) + Count, count};
{BKDHL, 0, Pid} ->
{BKDHL, Count, Pid} ->
{[], AccCount, Pid} = Acc,
UpdCount = length(BKDHL) + AccCount,
handle_in_batches(QT, lists:reverse(BKDHL), 0, Pid),
UpdCount = Count + AccCount,
handle_in_batches(
QT, lists:reverse(BKDHL), 0, Pid),
{[], UpdCount, Pid}
end
end
Expand Down Expand Up @@ -768,12 +779,11 @@ hash_function({rehash, InitialisationVector}) ->
-spec handle_in_batches(reap_tombs|erase_keys,
list(riak_kv_reaper:reap_reference())|
list(riak_kv_eraser:delete_reference()),
non_neg_integer(), pid()) -> ok.
non_neg_integer(), pid()|module()) -> ok.
handle_in_batches(_Type, [], _BatchCount, _Worker) ->
ok;
handle_in_batches(Type, RefList, BatchCount, Worker)
when BatchCount >= ?DELETE_BATCH_SIZE ->

case Type of
reap_tombs ->
_ = riak_kv_reaper:reap_stats(Worker);
Expand All @@ -790,6 +800,8 @@ handle_in_batches(Type, [Ref|RestRefs], BatchCount, Worker) ->
end,
handle_in_batches(Type, RestRefs, BatchCount + 1, Worker).



%% ===================================================================
%% Internal functions
%% ===================================================================
Expand Down
10 changes: 10 additions & 0 deletions src/riak_kv_eraser.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
start_job/1,
request_delete/1,
request_delete/2,
bulk_request_delete/1,
bulk_request_delete/2,
delete_stats/0,
delete_stats/1,
override_redo/1,
Expand Down Expand Up @@ -86,6 +88,14 @@ request_delete(DeleteReference) ->
request_delete(Pid, DeleteReference) ->
riak_kv_queue_manager:request(Pid, DeleteReference).

-spec bulk_request_delete(list(delete_reference())) -> ok.
bulk_request_delete(RefList) ->
bulk_request_delete(?MODULE, RefList).

-spec bulk_request_delete(pid()|module(), list(delete_reference())) -> ok.
bulk_request_delete(Pid, RefList) ->
riak_kv_queue_manager:bulk_request(Pid, RefList).

-spec delete_stats() ->
list({atom(), non_neg_integer()|riak_kv_overflow_queue:queue_stats()}).
delete_stats() -> delete_stats(?MODULE).
Expand Down
22 changes: 18 additions & 4 deletions src/riak_kv_queue_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
-export([start_link/2,
start_job/3,
request/2,
bulk_request/2,
stats/1,
immediate_action/2,
override_redo/2,
Expand Down Expand Up @@ -121,6 +122,10 @@ start_job(JobID, Module, RootPath) ->
request(Pid, Reference) ->
gen_server:cast(Pid, {request, Reference, ?REQUEST_PRIORITY}).

-spec bulk_request(pid()|module(), list()) -> ok.
bulk_request(Pid, RefList) ->
gen_server:call(Pid, {bulk_request, RefList, ?REQUEST_PRIORITY}, infinity).

-spec stats(pid()|module()) ->
list({atom(), non_neg_integer()|riak_kv_overflow_queue:queue_stats()}).
stats(Pid) ->
Expand Down Expand Up @@ -180,14 +185,23 @@ handle_call(stop_job, _From, State) ->
{reply, ok, State#state{pending_close = true}, 0};
handle_call({immediate_action, Reference}, _From, State) ->
Mod = State#state.callback_mod,
{reply, Mod:action(Reference, false), State, 0}.
{reply, Mod:action(Reference, false), State, 0};
handle_call({bulk_request, RefList, Priority}, From, State) ->
gen_server:reply(From, ok),
UpdOverflowQueue =
lists:foldr(
fun(Ref, AccQ) ->
riak_kv_overflow_queue:addto_queue(Priority, Ref, AccQ)
end,
State#state.queue,
RefList),
{noreply, State#state{queue = UpdOverflowQueue}, 0}.


handle_cast({request, Reference, Priority}, State) ->
UpdOverflowQueue =
riak_kv_overflow_queue:addto_queue(Priority,
Reference,
State#state.queue),
riak_kv_overflow_queue:addto_queue(
Priority, Reference, State#state.queue),
{noreply, State#state{queue = UpdOverflowQueue}, 0}.


Expand Down
10 changes: 10 additions & 0 deletions src/riak_kv_reaper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
start_job/1,
request_reap/1,
request_reap/2,
bulk_request_reap/1,
bulk_request_reap/2,
direct_reap/1,
reap_stats/0,
reap_stats/1,
Expand Down Expand Up @@ -94,6 +96,14 @@ request_reap(ReapReference) ->
request_reap(Pid, ReapReference) ->
riak_kv_queue_manager:request(Pid, ReapReference).

-spec bulk_request_reap(list(reap_reference())) -> ok.
bulk_request_reap(RefList) ->
bulk_request_reap(?MODULE, RefList).

-spec bulk_request_reap(pid()|module(), list(reap_reference())) -> ok.
bulk_request_reap(Pid, RefList) ->
riak_kv_queue_manager:bulk_request(Pid, RefList).

-spec reap_stats() ->
list({atom(), non_neg_integer()|riak_kv_overflow_queue:queue_stats()}).
reap_stats() -> reap_stats(?MODULE).
Expand Down
71 changes: 43 additions & 28 deletions src/riak_kv_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@
%% Best efforts (aka scavenger) pool.
%% Parallel AAE store rebuilds


-define(REAPER_BATCH_SIZE, 1024).
-define(ERASER_BATCH_SIZE, 1024).

%% Erlang's if Bool -> thing; true -> thang end. syntax hurts my
%% brain. It scans as if true -> thing; true -> thang end. So, here is
%% a macro, ?ELSE to use in if statements. You're welcome.
Expand Down Expand Up @@ -1985,7 +1989,7 @@ handle_aaefold({find_tombs,
handle_aaefold({reap_tombs,
Bucket, KeyRange,
SegmentFilter, ModifiedRange,
ReapMethod},
_ReapMethod},
InitAcc, _Nval,
IndexNs, Filtered, ReturnFun, Cntrl, Sender,
State) ->
Expand All @@ -1996,18 +2000,24 @@ handle_aaefold({reap_tombs,
{true, undefined} ->
{clock, VV} = lists:keyfind(clock, 1, EFs),
DH = riak_object:delete_hash(VV),
case ReapMethod of
local ->
riak_kv_reaper:request_reap({{BF, KF}, DH}),
NewCount = element(2, TombHashAcc) + 1,
setelement(2, TombHashAcc, NewCount);
count ->
NewCount = element(2, TombHashAcc) + 1,
setelement(2, TombHashAcc, NewCount);
{job, _JobID} ->
{[{{BF, KF}, DH}|element(1, TombHashAcc)],
element(2, TombHashAcc),
element(3, TombHashAcc)}
case TombHashAcc of
{BatchList, Count, local} ->
NewCount = Count + 1,
case NewCount div ?REAPER_BATCH_SIZE of
0 ->
riak_kv_reaper:bulk_request_reap(
[{{BF, KF}, DH}|BatchList]
),
{[], NewCount, local};
_ ->
{[{{BF, KF}, DH}|BatchList],
NewCount,
local}
end;
{BatchList, Count, count} ->
{BatchList, Count + 1, count};
{BatchList, Count, Job} ->
{[{{BF, KF}, DH}|BatchList], Count + 1, Job}
end;
{false, undefined} ->
TombHashAcc
Expand All @@ -2029,7 +2039,7 @@ handle_aaefold({reap_tombs,
handle_aaefold({erase_keys,
Bucket, KeyRange,
SegmentFilter, ModifiedRange,
DeleteMethod},
_DeleteMethod},
InitAcc, _Nval,
IndexNs, Filtered, ReturnFun, Cntrl, Sender,
State) ->
Expand All @@ -2046,18 +2056,23 @@ handle_aaefold({erase_keys,
EraseKeyAcc;
{false, undefined} ->
{clock, VV} = lists:keyfind(clock, 1, EFs),
case DeleteMethod of
local ->
riak_kv_eraser:request_delete({{BF, KF}, VV}),
NewCount = element(2, EraseKeyAcc) + 1,
setelement(2, EraseKeyAcc, NewCount);
count ->
NewCount = element(2, EraseKeyAcc) + 1,
setelement(2, EraseKeyAcc, NewCount);
{job, _JobID} ->
{[{{BF, KF}, VV}|element(1, EraseKeyAcc)],
element(2, EraseKeyAcc),
element(3, EraseKeyAcc)}
case EraseKeyAcc of
{BatchList, Count, local} ->
NewCount = Count + 1,
case NewCount div ?ERASER_BATCH_SIZE of
0 ->
riak_kv_eraser:bulk_request_delete(
[{{BF, KF}, VV}|BatchList]),
{[], NewCount, local};
_ ->
{[{{BF, KF}, VV}|BatchList],
NewCount,
local}
end;
{BatchList, Count, count} ->
{BatchList, Count + 1, count};
{BatchList, Count, Job} ->
{[{{BF, KF}, VV}|BatchList], Count + 1, Job}
end
end
end,
Expand Down Expand Up @@ -2756,7 +2771,7 @@ final_delete(BKey, DeleteHash, State = #state{mod=Mod, modstate=ModState}) ->
[BKey, IsDeleted, DeleteHash, OtherHash]),
State#state{modstate=ModState1}
end;
{{error, _}, ModState1} ->
{{error, _R}, ModState1} ->
State#state{modstate=ModState1}
end.

Expand Down Expand Up @@ -3491,7 +3506,7 @@ do_delete(BKey, State) ->
ModState = State#state.modstate,
Idx = State#state.idx,
DeleteMode = State#state.delete_mode,

%% Get the existing object.
case do_get_term(BKey, Mod, ModState) of
{{ok, RObj}, UpdModState} ->
Expand Down

0 comments on commit 44bfdc4

Please sign in to comment.