From 8922222593f71b6e1e4891ad817f00011156fc16 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 13 Feb 2024 10:56:44 +0000 Subject: [PATCH] Nhse develop d30upd (#15) * Merge pull request #1 from nhs-riak/nhse-contrib-kv1871 KV i1871 - Handle timeout on remote connection * Trigger batch correctly at each size (#4) * Force timeout to trigger (#3) Previously, the inactivity timeout on handle_continue could be cancelled by a call to riak_kv_rpelrtq_snk (e.g. from riak_kv_rpelrtq_peer). this might lead to the log_stats loop never being triggered. * Configurable %key query on leveled (#8) Can be configured to ignore tombstone keys by default. * Allow nextgenrepl to real-time replicate reaps (#6) * Allow nextgenrepl to real-time replicate reaps This is to address the issue of reaping across sync'd clusters. Without this feature it is necessary to disable full-sync whilst independently replicating on each cluster. Now if reaping via riak_kv_reaper the reap will be replicated assuming the `riak_kv.repl_reap` flag has been enabled. At the receiving cluster the reap will not be replicated any further. There are some API changes to support this. The `find_tombs` aae_fold will now return Keys/Clocks and not Keys/DeleteHash. The ReapReference for riak_kv_repaer will now expect a clock (version vector) not a DeleteHash, and will also now expect an additional boolean to indicate if this repl is a replication candidate (it will be false for all pushed reaps). The object encoding for nextgenrepl now has a flag to indicate a reap, with a special encoding for reap references. * Update riak_object.erl Clarify specs * Take timestamp at correct point (after push) * Updates following review * Update rebar.config * Make current_peers empty when disabled (#10) * Make current_peers empty when disabled * Peer discovery to recognise suspend and disable of sink * Update src/riak_kv_replrtq_peer.erl Co-authored-by: Thomas Arts * Update src/riak_kv_replrtq_peer.erl Co-authored-by: Thomas Arts --------- Co-authored-by: Thomas Arts * De-lager * Add support for v0 object in parallel-mode AAE (#11) * Add support for v0 object in parallel-mode AAE Cannot assume that v0 objects will not happen - capability negotiation down to v0 on 3.0 Riak during failure scenarios * Update following review As ?MAGIC is distinctive constant, then it should be the one on the pattern match - with everything else assume to be convertible by term_to_binary. * Update src/riak_object.erl Co-authored-by: Thomas Arts --------- Co-authored-by: Thomas Arts * Update riak_kv_ttaaefs_manager.erl (#13) For bucket-based full-sync `{tree_compare, 0}` is the return on success. * Correct log macro typo --------- Co-authored-by: Thomas Arts --- priv/riak_kv.schema | 20 ++++- src/riak_client.erl | 47 ++++++++--- src/riak_kv_clusteraae_fsm.erl | 12 +-- src/riak_kv_get_fsm.erl | 7 ++ src/riak_kv_leveled_backend.erl | 108 ++++++++++++++++-------- src/riak_kv_pb_object.erl | 70 ++++++++++------ src/riak_kv_reaper.erl | 28 ++++++- src/riak_kv_replrtq_peer.erl | 63 +++++++------- src/riak_kv_replrtq_snk.erl | 144 +++++++++++++++++++------------- src/riak_kv_replrtq_src.erl | 130 +++++++++++++++------------- src/riak_kv_ttaaefs_manager.erl | 3 +- src/riak_kv_vnode.erl | 17 ++-- src/riak_kv_wm_queue.erl | 16 ++++ src/riak_object.erl | 98 ++++++++++++++-------- 14 files changed, 496 insertions(+), 267 deletions(-) diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index 9e5ac234d..ebb1d57e9 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -1337,13 +1337,20 @@ {default, "q1_ttaaefs:block_rtq"} ]}. -%% @doc Enable this node zlib compress objects over the wire +%% @doc Enable this node to zlib compress objects over the wire {mapping, "replrtq_compressonwire", "riak_kv.replrtq_compressonwire", [ {datatype, {flag, enabled, disabled}}, {default, disabled}, {commented, enabled} ]}. +%% @doc Enable this node to replicate reap requests to other clusters +{mapping, "repl_reap", "riak_kv.repl_reap", [ + {datatype, {flag, enabled, disabled}}, + {default, disabled}, + {commented, enabled} +]}. + %% @doc Enable this node to act as a sink and consume from a src cluster {mapping, "replrtq_enablesink", "riak_kv.replrtq_enablesink", [ {datatype, {flag, enabled, disabled}}, @@ -1478,7 +1485,7 @@ %% @doc Choose to read repair to primary vnodes only %% When fallback vnodes are elected, then read repair will by default repair %% any missing data from the vnode - i.e. every GET while the fallback is in -%% play will lead to a PUT to add the rewuested object to the fallback vnode, +%% play will lead to a PUT to add the requested object to the fallback vnode, %% as the fallback by default starts empty. %% If the expectation is that failed vnodes are replaced quickly, as would be %% possible in a Cloud scenario, this may not be desirable. Read repair to @@ -1508,4 +1515,13 @@ {mapping, "handoff_deletes", "riak_kv.handoff_deletes", [ {datatype, {flag, enabled, disabled}}, {default, disabled} +]}. + +%% @doc For $key index queries, should keys which are tombstones be returned. +%% This config will only make a difference with the leveled backend, it is +%% ignored on other backends. Disable to change default behaviour and stop +%% returning keys of tombstones in $key queries +{mapping, "dollarkey_readtombs", "riak_kv.dollarkey_readtombs", [ + {datatype, {flag, enabled, disabled}}, + {default, enabled} ]}. \ No newline at end of file diff --git a/src/riak_client.erl b/src/riak_client.erl index 4a524808e..e0f414a20 100644 --- a/src/riak_client.erl +++ b/src/riak_client.erl @@ -196,6 +196,7 @@ replrtq_reset_all_workercounts(WorkerC, PerPeerL) -> {ok, riak_object:riak_object()} | {ok, queue_empty} | {ok, {deleted, vclock:vclock(), riak_object:riak_object()}} | + {ok, {reap, {riak_object:bucket(), riak_object:key(), vclock:vclock(), erlang:timestamp()}}}| {error, timeout} | {error, not_yet_implemented} | {error, Err :: term()}. @@ -223,10 +224,11 @@ fetch(QueueName, {?MODULE, [Node, _ClientId]}) -> -spec push(riak_object:riak_object()|binary(), boolean(), list(), riak_client()) -> {ok, erlang:timestamp()} | + {ok, reap} | {error, too_many_fails} | {error, timeout} | {error, {n_val_violation, N::integer()}}. -push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) -> +push(RObjMaybeBin, IsDeleted, Opts, RiakClient) -> RObj = case riak_object:is_robject(RObjMaybeBin) of % May get pushed a riak object, or a riak object as a binary, but @@ -236,6 +238,25 @@ push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) -> false -> riak_object:nextgenrepl_decode(RObjMaybeBin) end, + case RObj of + {reap, {B, K, TC, LMD}} -> + {repl_reap(B, K, TC), LMD}; + RObj -> + repl_push(RObj, IsDeleted, Opts, RiakClient) + end. + +-spec repl_reap( + riak_object:bucket(), riak_object:key(), vclock:vclock()) -> ok. +repl_reap(B, K, TC) -> + riak_kv_reaper:request_reap({{B, K}, TC, false}). + +-spec repl_push(riak_object:riak_object()|binary(), + boolean(), list(), riak_client()) -> + {ok, erlang:timestamp()} | + {error, too_many_fails} | + {error, timeout} | + {error, {n_val_violation, N::integer()}}. +repl_push(RObj, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) -> Bucket = riak_object:bucket(RObj), Key = riak_object:key(RObj), Me = self(), @@ -579,26 +600,30 @@ consistent_delete(Bucket, Key, Options, _Timeout, {?MODULE, [Node, _ClientId]}) end. --spec reap(riak_object:bucket(), riak_object:key(), riak_client()) - -> boolean(). +-spec reap( + riak_object:bucket(), riak_object:key(), riak_client()) -> boolean(). reap(Bucket, Key, Client) -> case normal_get(Bucket, Key, [deletedvclock], Client) of {error, {deleted, TombstoneVClock}} -> - DeleteHash = riak_object:delete_hash(TombstoneVClock), - reap(Bucket, Key, DeleteHash, Client); + reap(Bucket, Key, TombstoneVClock, Client); _Unexpected -> false end. --spec reap(riak_object:bucket(), riak_object:key(), pos_integer(), - riak_client()) -> boolean(). -reap(Bucket, Key, DeleteHash, {?MODULE, [Node, _ClientId]}) -> +-spec reap( + riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client()) + -> boolean(). +reap(Bucket, Key, TombClock, {?MODULE, [Node, _ClientId]}) -> case node() of Node -> - riak_kv_reaper:direct_reap({{Bucket, Key}, DeleteHash}); + riak_kv_reaper:direct_reap({{Bucket, Key}, TombClock, true}); _ -> - riak_core_util:safe_rpc(Node, riak_kv_reaper, direct_reap, - [{{Bucket, Key}, DeleteHash}]) + riak_core_util:safe_rpc( + Node, + riak_kv_reaper, + direct_reap, + [{{Bucket, Key}, TombClock, true}] + ) end. %% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client()) -> diff --git a/src/riak_kv_clusteraae_fsm.erl b/src/riak_kv_clusteraae_fsm.erl index fd2e1349d..2cbc230c4 100644 --- a/src/riak_kv_clusteraae_fsm.erl +++ b/src/riak_kv_clusteraae_fsm.erl @@ -589,8 +589,8 @@ json_encode_results(find_keys, Result) -> Keys = {struct, [{<<"results">>, [{struct, encode_find_key(Key, Int)} || {_Bucket, Key, Int} <- Result]} ]}, mochijson2:encode(Keys); -json_encode_results(find_tombs, Result) -> - json_encode_results(find_keys, Result); +json_encode_results(find_tombs, KeysNClocks) -> + encode_keys_and_clocks(KeysNClocks); json_encode_results(reap_tombs, Count) -> mochijson2:encode({struct, [{<<"dispatched_count">>, Count}]}); json_encode_results(erase_keys, Count) -> @@ -620,7 +620,7 @@ pb_encode_results(merge_branch_nval, _QD, Branches) -> level_two = L2 }; pb_encode_results(fetch_clocks_nval, _QD, KeysNClocks) -> - #rpbaaefoldkeyvalueresp{ + #rpbaaefoldkeyvalueresp{ response_type = atom_to_binary(clock, unicode), keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)}; pb_encode_results(merge_tree_range, QD, Tree) -> @@ -666,8 +666,10 @@ pb_encode_results(find_keys, _QD, Results) -> end, #rpbaaefoldkeycountresp{response_type = <<"find_keys">>, keys_count = lists:map(KeyCountMap, Results)}; -pb_encode_results(find_tombs, QD, Results) -> - pb_encode_results(find_keys, QD, Results); +pb_encode_results(find_tombs, _QD, KeysNClocks) -> + #rpbaaefoldkeyvalueresp{ + response_type = atom_to_binary(clock, unicode), + keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)}; pb_encode_results(reap_tombs, _QD, Count) -> #rpbaaefoldkeycountresp{response_type = <<"reap_tombs">>, keys_count = diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index 33429c200..da53cbb91 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -238,6 +238,13 @@ queue_fetch(timeout, StateData) -> Msg = {ReqID, {ok, {deleted, ExpectedClock, Obj}}}, Pid ! Msg, ok = riak_kv_stat:update(ngrfetch_prefetch), + {stop, normal, StateData}; + {Bucket, Key, TombClock, {reap, LMD}} -> + % A reap request was queued - so there is no need to fetch + % A tombstone was queued - so there is no need to fetch + Msg = {ReqID, {ok, {reap, {Bucket, Key, TombClock, LMD}}}}, + Pid ! Msg, + ok = riak_kv_stat:update(ngrfetch_prefetch), {stop, normal, StateData} end. diff --git a/src/riak_kv_leveled_backend.erl b/src/riak_kv_leveled_backend.erl index fb5a58a28..88992f293 100644 --- a/src/riak_kv_leveled_backend.erl +++ b/src/riak_kv_leveled_backend.erl @@ -321,13 +321,14 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{bookie=Bookie}) -> if Index /= false -> {index, QBucket, Q} = Index, - ?KV_INDEX_Q{filter_field=Field, - start_key=StartKey0, - start_term=StartTerm, - end_term=EndTerm, - return_terms=ReturnTerms, - start_inclusive=StartInc, - term_regex=TermRegex} = riak_index:upgrade_query(Q), + ?KV_INDEX_Q{ + filter_field=Field, + start_key=StartKey0, + start_term=StartTerm, + end_term=EndTerm, + return_terms=ReturnTerms, + start_inclusive=StartInc, + term_regex=TermRegex} = riak_index:upgrade_query(Q), StartKey = case StartInc of @@ -339,44 +340,50 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{bookie=Bookie}) -> % If this is a $key index query, the start key is assumed % to mean the start of the range, and so we want to use % this start key inclusively (and so don't advance it to - % the next_key. + % the next_key). case Field of <<"$bucket">> -> - leveled_bookie:book_keylist(Bookie, - ?RIAK_TAG, - QBucket, - {StartKey, null}, - {FoldKeysFun, Acc}, - TermRegex); + leveled_bookie:book_keylist( + Bookie, + ?RIAK_TAG, + QBucket, + {StartKey, null}, + {FoldKeysFun, Acc}, + TermRegex); <<"$key">> -> - leveled_bookie:book_keylist(Bookie, - ?RIAK_TAG, - QBucket, - {StartKey, EndTerm}, - {FoldKeysFun, Acc}, - TermRegex); + ReadTombs = + application:get_env( + riak_kv, dollarkey_readtombs, true), + FoldHeadsFun = + dollarkey_foldfun( + FoldKeysFun, ReadTombs, TermRegex), + leveled_bookie:book_headfold( + Bookie, + ?RIAK_TAG, + {range, QBucket, {StartKey, EndTerm}}, + {FoldHeadsFun, Acc}, + false, + SnapPreFold, + false + ); _ -> - leveled_bookie:book_indexfold(Bookie, - {QBucket, StartKey}, - {FoldKeysFun, Acc}, - {Field, - StartTerm, - EndTerm}, - {ReturnTerms, - TermRegex}) + leveled_bookie:book_indexfold( + Bookie, + {QBucket, StartKey}, + {FoldKeysFun, Acc}, + {Field, StartTerm, EndTerm}, + {ReturnTerms, TermRegex}) end; Bucket /= false -> % Equivalent to $bucket query, but without the StartKey {bucket, B} = Bucket, - leveled_bookie:book_keylist(Bookie, - ?RIAK_TAG, B, - {FoldKeysFun, Acc}); + leveled_bookie:book_keylist( + Bookie, ?RIAK_TAG, B, {FoldKeysFun, Acc}); true -> % All key query - don't constrain by bucket - leveled_bookie:book_keylist(Bookie, - ?RIAK_TAG, - {FoldKeysFun, Acc}) + leveled_bookie:book_keylist( + Bookie, ?RIAK_TAG, {FoldKeysFun, Acc}) end, case {lists:member(async_fold, Opts), SnapPreFold} of @@ -640,6 +647,39 @@ callback(Ref, UnexpectedCallback, State) -> %% =================================================================== +-spec dollarkey_foldfun( + riak_kv_backend:fold_keys_fun(), boolean(), re:mp()|undefined) + -> riak_kv_backend:fold_objects_fun(). +dollarkey_foldfun(FoldKeysFun, ReadTombs, TermRegex) -> + FilteredFoldKeysFun = + fun(B, K, Acc) -> + case TermRegex of + undefined -> + FoldKeysFun(B, K, Acc); + TermRegex -> + case re:run(K, TermRegex) of + nomatch -> + Acc; + _ -> + FoldKeysFun(B, K, Acc) + end + end + end, + fun(B, K, HeadObj, KeyAcc) -> + case ReadTombs of + true -> + FilteredFoldKeysFun(B, K, KeyAcc); + false -> + MetaBin = element(5, riak_object:summary_from_binary(HeadObj)), + case riak_object:is_aae_object_deleted(MetaBin, false) of + {true, undefined} -> + KeyAcc; + _ -> + FilteredFoldKeysFun(B, K, KeyAcc) + end + end + end. + -spec log_fragmentation(eheap_alloc|binary_alloc) -> ok. log_fragmentation(Allocator) -> {MB_BS, MB_CS, SB_BS, SB_CS} = diff --git a/src/riak_kv_pb_object.erl b/src/riak_kv_pb_object.erl index a9602914b..48ea82aaa 100644 --- a/src/riak_kv_pb_object.erl +++ b/src/riak_kv_pb_object.erl @@ -202,6 +202,19 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin}, case Result of {ok, queue_empty} -> {reply, #rpbfetchresp{queue_empty = true}, State}; + {ok, {reap, {B, K, TC, LMD}}} -> + EncObj = + riak_object:nextgenrepl_encode( + repl_v1, {reap, {B, K, TC, LMD}}, false), + CRC32 = erlang:crc32(EncObj), + Resp = + #rpbfetchresp{ + queue_empty = false, + replencoded_object = EncObj, + crc_check = CRC32}, + {reply, + encode_nextgenrepl_response(Encoding, Resp, {B, K, TC}), + State}; {ok, {deleted, TombClock, RObj}} -> % Never bother compressing tombstones, they're practically empty EncObj = riak_object:nextgenrepl_encode(repl_v1, RObj, false), @@ -212,18 +225,7 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin}, replencoded_object = EncObj, crc_check = CRC32, deleted_vclock = pbify_rpbvc(TombClock)}, - case Encoding of - internal -> - {reply, Resp, State}; - internal_aaehash -> - BK = make_binarykey(RObj), - {SegID, SegHash} = - leveled_tictac:tictac_hash(BK, lists:sort(TombClock)), - {reply, - Resp#rpbfetchresp{segment_id = SegID, - segment_hash = SegHash}, - State} - end; + {reply, encode_nextgenrepl_response(Encoding, Resp, RObj), State}; {ok, RObj} -> EncObj = riak_object:nextgenrepl_encode(repl_v1, RObj, ToCompress), CRC32 = erlang:crc32(EncObj), @@ -232,19 +234,7 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin}, deleted = false, replencoded_object = EncObj, crc_check = CRC32}, - case Encoding of - internal -> - {reply, Resp, State}; - internal_aaehash -> - BK = make_binarykey(RObj), - Clock = lists:sort(riak_object:vclock(RObj)), - {SegID, SegHash} = - leveled_tictac:tictac_hash(BK, Clock), - {reply, - Resp#rpbfetchresp{segment_id = SegID, - segment_hash = SegHash}, - State} - end; + {reply, encode_nextgenrepl_response(Encoding, Resp, RObj), State}; {error, Reason} -> {error, {format, Reason}, State} end; @@ -443,7 +433,35 @@ process_stream(_,_,State) -> %% Internal functions %% =================================================================== --spec make_binarykey(riak_object:riak_object()) -> binary(). +-spec encode_nextgenrepl_response( + intenal|internal_aaehash, + #rpbfetchresp{}, + riak_object:riak_object()| + {riak_object:bucket(), riak_object:key(), vclock:vclock()}) + -> #rpbfetchresp{}. +encode_nextgenrepl_response(Encoding, Resp, RObj) -> + case Encoding of + internal -> + Resp; + internal_aaehash -> + {SegID, SegHash} = + case RObj of + {B, K, TC} -> + BK = make_binarykey({B, K}), + leveled_tictac:tictac_hash(BK, lists:sort(TC)); + RObj -> + BK = make_binarykey(RObj), + leveled_tictac:tictac_hash( + BK, lists:sort(riak_object:vclock(RObj))) + end, + Resp#rpbfetchresp{segment_id = SegID, segment_hash = SegHash} + end. + +-spec make_binarykey( + riak_object:riak_object()|{riak_object:bucket(), riak_object:key()}) + -> binary(). +make_binarykey({B, K}) -> + make_binarykey(B, K); make_binarykey(RObj) -> make_binarykey(riak_object:bucket(RObj), riak_object:key(RObj)). diff --git a/src/riak_kv_reaper.erl b/src/riak_kv_reaper.erl index ef04ce582..08cae2b32 100644 --- a/src/riak_kv_reaper.erl +++ b/src/riak_kv_reaper.erl @@ -63,7 +63,12 @@ redo/0]). -type reap_reference() :: - {{riak_object:bucket(), riak_object:key()}, non_neg_integer()}. + {{riak_object:bucket(), riak_object:key()}, vclock:vclock(), boolean()}. + %% the reap reference is {Bucket, Key, Clock (of tombstone), Forward}. The + %% Forward boolean() indicates if this reap should be replicated if + %% riak_kv.repl_reap is true. When a reap is received via replication + %% Forward should be set to false, to prevent reaps from perpetually + %% circulating -type job_id() :: pos_integer(). -export_type([reap_reference/0, job_id/0]). @@ -149,7 +154,7 @@ get_limits() -> %% we will not redo - redo is only to handle the failure related to unavailable %% primaries -spec action(reap_reference(), boolean()) -> boolean(). -action({{Bucket, Key}, DeleteHash}, Redo) -> +action({{Bucket, Key}, TombClock, ToRepl}, Redo) -> BucketProps = riak_core_bucket:get_bucket(Bucket), DocIdx = riak_core_util:chash_key({Bucket, Key}, BucketProps), {n_val, N} = lists:keyfind(n_val, 1, BucketProps), @@ -160,7 +165,11 @@ action({{Bucket, Key}, DeleteHash}, Redo) -> PL0 = lists:map(fun({Target, primary}) -> Target end, PrefList), case check_all_mailboxes(PL0) of ok -> - riak_kv_vnode:reap(PL0, {Bucket, Key}, DeleteHash), + riak_kv_vnode:reap( + PL0, + {Bucket, Key}, + riak_object:delete_hash(TombClock)), + maybe_repl_reap(Bucket, Key, TombClock, ToRepl), timer:sleep(TombPause), true; soft_loaded -> @@ -171,6 +180,7 @@ action({{Bucket, Key}, DeleteHash}, Redo) -> if Redo -> false; true -> true end end. + -spec redo() -> boolean(). redo() -> true. @@ -180,6 +190,18 @@ redo() -> true. -type preflist_entry() :: {non_neg_integer(), node()}. +-spec maybe_repl_reap( + riak_object:bucket(), riak_object:key(), vclock:vclock(), boolean()) -> ok. +maybe_repl_reap(Bucket, Key, TombClock, ToReap) -> + case application:get_env(riak_kv, repl_reap, false) and ToReap of + true -> + riak_kv_replrtq_src:replrtq_reap( + Bucket, Key, TombClock, os:timestamp()); + false -> + ok + end. + + %% Protect against overloading the system when not reaping should any %% mailbox be in soft overload state -spec check_all_mailboxes(list(preflist_entry())) -> ok|soft_loaded. diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index e6871d85b..ce3ef4b05 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -43,8 +43,8 @@ -type discovery_peer() :: {riak_kv_replrtq_snk:queue_name(), [riak_kv_replrtq_snk:peer_info()]}. --define(DISCOVERY_TIMEOUT_SECONDS, 60). --define(UPDATE_TIMEOUT_SECONDS, 60). +-define(DISCOVERY_TIMEOUT_SECONDS, 300). +-define(UPDATE_TIMEOUT_SECONDS, 300). -define(AUTO_DISCOVERY_MAXIMUM_SECONDS, 900). -define(AUTO_DISCOVERY_MINIMUM_SECONDS, 60). @@ -68,7 +68,7 @@ update_discovery(QueueName) -> ?DISCOVERY_TIMEOUT_SECONDS * 1000). -spec update_workers(pos_integer(), pos_integer()) -> boolean(). -update_workers(WorkerCount, PerPeerLimit) -> +update_workers(WorkerCount, PerPeerLimit) when PerPeerLimit =< WorkerCount -> gen_server:call( ?MODULE, {update_workers, WorkerCount, PerPeerLimit}, @@ -145,6 +145,11 @@ handle_info({scheduled_discovery, QueueName}, State) -> ?AUTO_DISCOVERY_MAXIMUM_SECONDS), Delay = rand:uniform(max(1, MaxDelay - MinDelay)) + MinDelay, _ = schedule_discovery(QueueName, self(), Delay), + {noreply, State}; +handle_info({Ref, {error, HTTPClientError}}, State) when is_reference(Ref) -> + ?LOG_INFO( + "Client error caught - error ~p returned after timeout", + [HTTPClientError]), {noreply, State}. terminate(_Reason, _State) -> @@ -181,46 +186,48 @@ do_discovery(QueueName, PeerInfo, Type) -> {SnkWorkerCount, PerPeerLimit} = riak_kv_replrtq_snk:get_worker_counts(), StartDelayMS = riak_kv_replrtq_snk:starting_delay(), CurrentPeers = - case Type of - count_change -> + case {Type, riak_kv_replrtq_snk:current_peers(QueueName)} of + {count_change, _} -> %% Ignore current peers, to update worker counts, so all %% discovered peers will have their worker counts updated as %% the list returned from discover_peers/2 will never match %% the atom count_change. count_change; - _ -> + {_, PeerList} when is_list(PeerList) -> lists:usort( lists:map( fun({ID, _D, H, P, Prot}) -> {ID, StartDelayMS, H, P, Prot} end, - riak_kv_replrtq_snk:current_peers(QueueName))) + PeerList)); + {_, SnkResponse} -> + ?LOG_INFO( + "Peer Discovery disabled as snk status ~w", [SnkResponse]), + SnkResponse end, - case discover_peers(PeerInfo, StartDelayMS) of - CurrentPeers -> + case {discover_peers(PeerInfo, StartDelayMS), CurrentPeers} of + {CurrentPeers, CurrentPeers} -> ?LOG_INFO("Type=~w discovery led to no change", [Type]), false; - [] -> + {[], CurrentPeers} when is_list(CurrentPeers) -> ?LOG_INFO("Type=~w discovery led to reset of peers", [Type]), - riak_kv_replrtq_snk:add_snkqueue(QueueName, - PeerInfo, - SnkWorkerCount, - PerPeerLimit), + riak_kv_replrtq_snk:add_snkqueue( + QueueName, PeerInfo, SnkWorkerCount, PerPeerLimit), false; - DiscoveredPeers -> - case CurrentPeers of - count_change -> - ok; - CurrentPeers when is_list(CurrentPeers) -> - ?LOG_INFO( - "Type=~w discovery old_peers=~w new_peers=~w", - [Type, length(CurrentPeers), length(DiscoveredPeers)]) - end, - riak_kv_replrtq_snk:add_snkqueue(QueueName, - DiscoveredPeers, - SnkWorkerCount, - PerPeerLimit), - true + {DiscoveredPeers, count_change} -> + riak_kv_replrtq_snk:add_snkqueue( + QueueName, DiscoveredPeers, SnkWorkerCount, PerPeerLimit), + true; + {DiscoveredPeers, _} when is_list(CurrentPeers) -> + ?LOG_INFO( + "Type=~w discovery old_peers=~w new_peers=~w", + [Type, length(CurrentPeers), length(DiscoveredPeers)]), + riak_kv_replrtq_snk:add_snkqueue( + QueueName, DiscoveredPeers, SnkWorkerCount, PerPeerLimit), + true; + {_, _} -> + ?LOG_INFO("Type=~w discovery led to no change", [Type]), + false end. -spec discover_peers(list(riak_kv_replrtq_snk:peer_info()), pos_integer()) diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index 8120e067a..ad7d6ab1b 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -31,6 +31,7 @@ handle_call/3, handle_cast/2, handle_info/2, + handle_continue/2, terminate/2, code_change/3]). @@ -67,7 +68,7 @@ -define(STARTING_DELAYMS, 8). -define(MAX_SUCCESS_DELAYMS, 1024). -define(ON_ERROR_DELAYMS, 65536). --define(INACTIVITY_TIMEOUT_MS, 60000). +-define(INITIAL_TIMEOUT_MS, 60000). -define(DEFAULT_WORKERCOUNT, 1). -record(sink_work, {queue_name :: queue_name(), @@ -140,10 +141,10 @@ % Modified time by bucket - second, minute, hour, day, longer} -type reply_tuple() :: - {queue_empty, non_neg_integer()} | - {tomb, non_neg_integer(), non_neg_integer(), non_neg_integer()} | - {object, non_neg_integer(), non_neg_integer(), non_neg_integer()} | - {error, any(), any()}. + {queue_empty, non_neg_integer()}| + {tomb, non_neg_integer(), non_neg_integer(), non_neg_integer()}| + {object, non_neg_integer(), non_neg_integer(), non_neg_integer()}| + {error, any(), any()}. -export_type([peer_info/0, queue_name/0]). @@ -210,19 +211,22 @@ add_snkqueue(QueueName, Peers, WorkerCount) -> %% number of workers overall -spec add_snkqueue(queue_name(), list(peer_info()), pos_integer(), pos_integer()) -> ok. -add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) - when PerPeerLimit =< WorkerCount -> - gen_server:call(?MODULE, - {add, QueueName, Peers, WorkerCount, PerPeerLimit}). +add_snkqueue( + QueueName, Peers, WorkerCount, PerPeerLimit) + when PerPeerLimit =< WorkerCount -> + gen_server:call( + ?MODULE, + {add, QueueName, Peers, WorkerCount, PerPeerLimit}, + infinity). %% @doc %% Return the current list of peers being used by this snk host, and the %% settings currently being used for this host and he workers per peer. %% Returns undefined if there are currently no peers defined. --spec current_peers(queue_name()) -> list(peer_info())|undefined. +-spec current_peers(queue_name()) -> list(peer_info())|suspended|disabled. current_peers(QueueName) -> - gen_server:call(?MODULE, {current_peers, QueueName}). + gen_server:call(?MODULE, {current_peers, QueueName}, infinity). %% @doc @@ -238,47 +242,27 @@ set_workercount(QueueName, WorkerCount) -> %% @doc %% Change the number of concurrent workers whilst limiting the number of %% workers per peer --spec set_workercount(queue_name(), pos_integer(), pos_integer()) - -> ok|not_found. -set_workercount(QueueName, WorkerCount, PerPeerLimit) - when PerPeerLimit =< WorkerCount -> - gen_server:call(?MODULE, - {worker_count, QueueName, WorkerCount, PerPeerLimit}). +-spec set_workercount( + queue_name(), pos_integer(), pos_integer()) -> ok|not_found. +set_workercount( + QueueName, WorkerCount, PerPeerLimit) + when PerPeerLimit =< WorkerCount -> + gen_server:call( + ?MODULE, + {worker_count, QueueName, WorkerCount, PerPeerLimit}, + infinity + ). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ init([]) -> - SinkEnabled = - app_helper:get_env(riak_kv, replrtq_enablesink, false), + SinkEnabled = app_helper:get_env(riak_kv, replrtq_enablesink, false), case SinkEnabled of true -> - SinkPeers = - app_helper:get_env(riak_kv, replrtq_sinkpeers, ""), - DefaultQueue = - app_helper:get_env(riak_kv, replrtq_sinkqueue), - SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers), - {SnkWorkerCount, PerPeerLimit} = get_worker_counts(), - Iteration = 1, - MapPeerInfoFun = - fun({SnkQueueName, SnkPeerInfo}) -> - {SnkQueueLength, SnkWorkQueue} = - determine_workitems(SnkQueueName, - Iteration, - SnkPeerInfo, - SnkWorkerCount, - min(SnkWorkerCount, PerPeerLimit)), - SnkW = - #sink_work{queue_name = SnkQueueName, - work_queue = SnkWorkQueue, - minimum_queue_length = SnkQueueLength, - peer_list = SnkPeerInfo, - max_worker_count = SnkWorkerCount}, - {SnkQueueName, Iteration, SnkW} - end, - Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), - {ok, #state{enabled = true, work = Work}, ?INACTIVITY_TIMEOUT_MS}; + {ok, #state{}, {continue, initialise_work}}; false -> {ok, #state{}} end. @@ -358,11 +342,21 @@ handle_call({worker_count, QueueN, WorkerCount, PerPeerLimit}, _From, State) -> {reply, ok, State#state{work = W0, iteration = Iteration}} end; handle_call({current_peers, QueueN}, _From, State) -> - case lists:keyfind(QueueN, 1, State#state.work) of - false -> - {reply, undefined, State}; - {QueueN, _I, SinkWork} -> - {reply, SinkWork#sink_work.peer_list, State} + case State#state.enabled of + true -> + case lists:keyfind(QueueN, 1, State#state.work) of + false -> + {reply, [], State}; + {QueueN, _I, SinkWork} -> + case SinkWork#sink_work.suspended of + false -> + {reply, SinkWork#sink_work.peer_list, State}; + _ -> + {reply, suspended, State} + end + end; + _ -> + {reply, disabled, State} end. @@ -416,7 +410,7 @@ handle_cast({requeue_work, WorkItem}, State) -> {noreply, State} end. -handle_info(timeout, State) -> +handle_info(deferred_start, State) -> prompt_work(), erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats), {noreply, State}; @@ -434,6 +428,34 @@ handle_info({prompt_requeue, WorkItem}, State) -> requeue_work(WorkItem), {noreply, State}. +handle_continue(initialise_work, State) -> + SinkPeers = + app_helper:get_env(riak_kv, replrtq_sinkpeers, ""), + DefaultQueue = + app_helper:get_env(riak_kv, replrtq_sinkqueue), + SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers), + {SnkWorkerCount, PerPeerLimit} = get_worker_counts(), + Iteration = 1, + MapPeerInfoFun = + fun({SnkQueueName, SnkPeerInfo}) -> + {SnkQueueLength, SnkWorkQueue} = + determine_workitems(SnkQueueName, + Iteration, + SnkPeerInfo, + SnkWorkerCount, + min(SnkWorkerCount, PerPeerLimit)), + SnkW = + #sink_work{queue_name = SnkQueueName, + work_queue = SnkWorkQueue, + minimum_queue_length = SnkQueueLength, + peer_list = SnkPeerInfo, + max_worker_count = SnkWorkerCount}, + {SnkQueueName, Iteration, SnkW} + end, + Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), + erlang:send_after(?INITIAL_TIMEOUT_MS, self(), deferred_start), + {noreply, State#state{enabled = true, work = Work}}. + terminate(_Reason, State) -> WorkItems = lists:map(fun(SW) -> element(3, SW) end, State#state.work), CloseFun = @@ -714,9 +736,9 @@ repl_fetcher(WorkItem) -> SWFetched = os:timestamp(), {ok, LMD} = riak_client:push(RObj, false, [], LocalClient), SWPushed = os:timestamp(), - ModSplit = timer:now_diff(SWPushed, LMD), FetchSplit = timer:now_diff(SWFetched, SW), PushSplit = timer:now_diff(SWPushed, SWFetched), + ModSplit = timer:now_diff(SWPushed, LMD), ok = riak_kv_stat:update(ngrrepl_object), done_work(WorkItem, true, {object, FetchSplit, PushSplit, ModSplit}); @@ -741,9 +763,16 @@ repl_fetcher(WorkItem) -> done_work(UpdWorkItem, false, {error, error, remote_error}) end catch - Type:Exception -> - ?LOG_WARNING("Snk worker failed at Peer ~w due to ~w error ~w", - [Peer, Type, Exception]), + Type:Exception:Stk -> + ?LOG_WARNING( + "Snk worker failed at Peer ~w due to ~w error ~w", + [Peer, Type, Exception]), + case app_helper:get_env(riak_kv, log_snk_stacktrace, false) of + true -> + ?LOG_WARNING("Snk worker failed due to ~p", [Stk]); + _ -> + ok + end, RemoteFun(close), UpdWorkItem0 = setelement(3, WorkItem, RenewClientFun()), ok = riak_kv_stat:update(ngrrepl_error), @@ -788,8 +817,9 @@ add_success({{success, Success}, F, FT, PT, RT, MT}) -> add_failure({S, {failure, Failure}, FT, PT, RT, MT}) -> {S, {failure, Failure + 1}, FT, PT, RT, MT}. --spec add_repltime(queue_stats(), - {integer(), integer(), integer()}) -> queue_stats(). +-spec add_repltime( + queue_stats(), {non_neg_integer(), non_neg_integer(), non_neg_integer()}) + -> queue_stats(). add_repltime({S, F, {replfetch_time, FT}, {replpush_time, PT}, {replmod_time, RT}, @@ -801,7 +831,7 @@ add_repltime({S, {replmod_time, RT + RT0}, MT}. --spec add_modtime(queue_stats(), integer()) -> queue_stats(). +-spec add_modtime(queue_stats(), non_neg_integer()) -> queue_stats(). add_modtime({S, F, FT, PT, RT, MT}, ModTime) -> E = mod_split_element(ModTime div 1000) + 1, C = element(E, MT), diff --git a/src/riak_kv_replrtq_src.erl b/src/riak_kv_replrtq_src.erl index 20d9336cb..89a6f613d 100644 --- a/src/riak_kv_replrtq_src.erl +++ b/src/riak_kv_replrtq_src.erl @@ -35,6 +35,7 @@ handle_call/3, handle_cast/2, handle_info/2, + handle_continue/2, terminate/2, code_change/3, format_status/2]). @@ -44,6 +45,7 @@ replrtq_aaefold/2, replrtq_ttaaefs/2, replrtq_coordput/1, + replrtq_reap/4, register_rtq/2, delist_rtq/1, suspend_rtq/1, @@ -115,9 +117,13 @@ -type object_ref() :: {tomb, riak_object:riak_object()}| {object, riak_object:riak_object()}| + {reap, erlang:timestamp()}| to_fetch. % The object reference can be the actual object or a request to fetch the % actual object using the Bucket, Key and Clock in the repl_entry + % If the replicated operation is a reap the future vector clock should be + % the mpty list (as there will be no object) and the delete hash should be + % passed for validation (as required by riak_kv_vnode:final_delete/3). -type repl_entry() :: {riak_object:bucket(), riak_object:key(), vclock:vclock(), object_ref()}. % If the object is a tombstone which had been PUT, then the actual @@ -223,12 +229,23 @@ replrtq_ttaaefs(QueueName, ReplEntries) -> %% @doc %% Add a single repl_entry associated with a PUT coordinated on this node. -%% Never wait for the response or backoff - replictaion should be asynchronous +%% Never wait for the response or backoff - replication should be asynchronous %% and never slow the PUT path on the src cluster. -spec replrtq_coordput(repl_entry()) -> ok. replrtq_coordput({Bucket, _, _, _} = ReplEntry) when is_binary(Bucket); is_tuple(Bucket) -> gen_server:cast(?MODULE, {rtq_coordput, Bucket, ReplEntry}). + +%% @doc +%% Add a single reference to a reap to be replicated. This is a call to +%% prevent queued reaps from overloading the mailbox of the real-time queue. +-spec replrtq_reap( + riak_object:bucket(), riak_object:key(), + vclock:vclock(), erlang:timestamp()) -> ok. +replrtq_reap(Bucket, Key, TombClock, LMD) -> + gen_server:call( + ?MODULE, {rtq_reap, Bucket, Key, TombClock, LMD}, infinity). + %% @doc %% Setup a queue with a given queuename, which will take coordput repl_entries %% that pass the given filter. @@ -392,6 +409,10 @@ handle_call({bulk_add, Priority, QueueName, ReplEntries}, _From, State) -> _ -> {reply, ok, State} end; +handle_call({rtq_reap, Bucket, Key, TombClock, LMD}, _From, State) -> + QueueNames = find_queues(Bucket, State#state.queue_filtermap, []), + ReapRef = {Bucket, Key, TombClock, {reap, LMD}}, + {reply, ok, State, {continue, {repl, ReapRef, QueueNames}}}; handle_call({length_rtq, QueueName}, _From, State) -> case lists:keyfind(QueueName, 1, State#state.queue_local) of {QueueName, LocalQueues} -> @@ -602,61 +623,9 @@ handle_call(stop, _From, State) -> handle_cast({rtq_coordput, Bucket, ReplEntry}, State) -> - QueueNames = - find_queues(Bucket, State#state.queue_filtermap, []), - AddFun = - fun(QueueName, AccState) -> - {QueueName, LQ} = - lists:keyfind(QueueName, 1, AccState#state.queue_local), - case element(?RTQ_PRIORITY, LQ) of - {_Q, LC, OC} when (LC + OC) >= State#state.queue_limit -> - _ = riak_kv_stat:update({ngrrepl_srcdiscard, 1}), - AccState; - {Q, LC, OC} when LC >= State#state.object_limit; OC > 0 -> - {QueueName, OverflowQ} = - lists:keyfind( - QueueName, - 1, - AccState#state.queue_overflow), - UpdOverflowQ = - riak_kv_overflow_queue:addto_queue( - ?RTQ_PRIORITY, - filter_on_objectlimit(ReplEntry), - OverflowQ), - UpdOverflowQueues = - lists:keyreplace( - QueueName, - 1, - AccState#state.queue_overflow, - {QueueName, UpdOverflowQ}), - UpdLQs = - lists:keyreplace( - QueueName, - 1, - AccState#state.queue_local, - {QueueName, - setelement( - ?RTQ_PRIORITY, - LQ, - {Q, LC, OC + 1})}), - AccState#state{ - queue_overflow = UpdOverflowQueues, - queue_local = UpdLQs}; - {Q, LC, 0} -> - UpdLQs = - lists:keyreplace( - QueueName, - 1, - AccState#state.queue_local, - {QueueName, - setelement( - ?RTQ_PRIORITY, - LQ, - {queue:in(ReplEntry, Q), LC + 1, 0})}), - AccState#state{queue_local = UpdLQs} - end - end, - {noreply, lists:foldl(AddFun, State, QueueNames)}. + QueueNames = find_queues(Bucket, State#state.queue_filtermap, []), + {noreply, State, {continue, {repl, ReplEntry, QueueNames}}}. + handle_info(log_queue, State) -> LogFun = @@ -678,6 +647,55 @@ handle_info(log_queue, State) -> erlang:send_after(State#state.log_frequency_in_ms, self(), log_queue), {noreply, State}. + +handle_continue({repl, _ReplEntry, []}, State) -> + {noreply, State}; +handle_continue({repl, ReplEntry, [QueueName|OtherQueues]}, State) -> + {QueueName, LQ} = lists:keyfind(QueueName, 1, State#state.queue_local), + case element(?RTQ_PRIORITY, LQ) of + {_Q, LC, OC} when (LC + OC) >= State#state.queue_limit -> + _ = riak_kv_stat:update({ngrrepl_srcdiscard, 1}), + {noreply, State, {continue, {repl, ReplEntry, OtherQueues}}}; + {Q, LC, OC} when LC >= State#state.object_limit; OC > 0 -> + {QueueName, OverflowQ} = + lists:keyfind(QueueName, 1, State#state.queue_overflow), + UpdOverflowQ = + riak_kv_overflow_queue:addto_queue( + ?RTQ_PRIORITY, + filter_on_objectlimit(ReplEntry), + OverflowQ), + UpdOverflowQueues = + lists:keyreplace( + QueueName, + 1, + State#state.queue_overflow, + {QueueName, UpdOverflowQ}), + UpdCount = + {QueueName, setelement(?RTQ_PRIORITY, LQ, {Q, LC, OC + 1})}, + UpdLQs = + lists:keyreplace( + QueueName, 1, State#state.queue_local, UpdCount), + {noreply, + State#state{ + queue_overflow = UpdOverflowQueues, + queue_local = UpdLQs}, + {continue, {repl, ReplEntry, OtherQueues}}}; + {Q, LC, 0} -> + UpdLQs = + lists:keyreplace( + QueueName, + 1, + State#state.queue_local, + {QueueName, + setelement( + ?RTQ_PRIORITY, + LQ, + {queue:in(ReplEntry, Q), LC + 1, 0})}), + {noreply, + State#state{queue_local = UpdLQs}, + {continue, {repl, ReplEntry, OtherQueues}}} + end. + format_status(normal, [_PDict, State]) -> State; format_status(terminate, [_PDict, State]) -> diff --git a/src/riak_kv_ttaaefs_manager.erl b/src/riak_kv_ttaaefs_manager.erl index e2429a029..c435274a8 100644 --- a/src/riak_kv_ttaaefs_manager.erl +++ b/src/riak_kv_ttaaefs_manager.erl @@ -411,7 +411,8 @@ handle_cast({reply_complete, ReqID, Result}, State) -> riak_kv_stat:update({ttaaefs, sync_fail, Duration}), {?CRASH_TIMEOUT, State#state{previous_success = false}}; {SyncState, 0} when SyncState == root_compare; - SyncState == branch_compare -> + SyncState == branch_compare; + SyncState == tree_compare -> riak_kv_stat:update({ttaaefs, sync_sync, Duration}), ?LOG_INFO( "exchange=~w complete result=~w in duration=~w s" ++ diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 04dbf17d7..491421475 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1963,14 +1963,14 @@ handle_aaefold({find_tombs, IndexNs, Filtered, ReturnFun, Cntrl, Sender, State) -> FoldFun = - fun(BF, KF, EFs, TombHashAcc) -> + fun(BF, KF, EFs, TombClockAcc) -> {md, MD} = lists:keyfind(md, 1, EFs), case riak_object:is_aae_object_deleted(MD, false) of {true, undefined} -> {clock, VV} = lists:keyfind(clock, 1, EFs), - [{BF, KF, riak_object:delete_hash(VV)}|TombHashAcc]; + [{BF, KF, VV}|TombClockAcc]; {false, undefined} -> - TombHashAcc + TombClockAcc end end, WrappedFoldFun = aaefold_withcoveragecheck(FoldFun, IndexNs, Filtered), @@ -1999,25 +1999,24 @@ handle_aaefold({reap_tombs, case riak_object:is_aae_object_deleted(MD, false) of {true, undefined} -> {clock, VV} = lists:keyfind(clock, 1, EFs), - DH = riak_object:delete_hash(VV), case TombHashAcc of {BatchList, Count, local} -> NewCount = Count + 1, - case NewCount div ?REAPER_BATCH_SIZE of + case NewCount rem ?REAPER_BATCH_SIZE of 0 -> riak_kv_reaper:bulk_request_reap( - [{{BF, KF}, DH}|BatchList] + [{{BF, KF}, VV, true}|BatchList] ), {[], NewCount, local}; _ -> - {[{{BF, KF}, DH}|BatchList], + {[{{BF, KF}, VV, true}|BatchList], NewCount, local} end; {BatchList, Count, count} -> {BatchList, Count + 1, count}; {BatchList, Count, Job} -> - {[{{BF, KF}, DH}|BatchList], Count + 1, Job} + {[{{BF, KF}, VV, true}|BatchList], Count + 1, Job} end; {false, undefined} -> TombHashAcc @@ -2059,7 +2058,7 @@ handle_aaefold({erase_keys, case EraseKeyAcc of {BatchList, Count, local} -> NewCount = Count + 1, - case NewCount div ?ERASER_BATCH_SIZE of + case NewCount rem ?ERASER_BATCH_SIZE of 0 -> riak_kv_eraser:bulk_request_delete( [{{BF, KF}, VV}|BatchList]), diff --git a/src/riak_kv_wm_queue.erl b/src/riak_kv_wm_queue.erl index 48cc7f289..71c90fc02 100644 --- a/src/riak_kv_wm_queue.erl +++ b/src/riak_kv_wm_queue.erl @@ -311,6 +311,16 @@ format_response(_, {ok, queue_empty}, RD, Ctx) -> format_response(_, {error, Reason}, RD, Ctx) -> ?LOG_WARNING("Fetch error ~w", [Reason]), {{error, Reason}, RD, Ctx}; +format_response(internal_aaehash, {ok, {reap, {B, K, TC, LMD}}}, RD, Ctx) -> + BK = make_binarykey(B, K), + {SegmentID, SegmentHash} = + leveled_tictac:tictac_hash(BK, lists:sort(TC)), + SuccessMark = <<1:8/integer>>, + IsTombstone = <<0:8/integer>>, + ObjBin = encode_riakobject({reap, {B, K, TC, LMD}}), + {<>, RD, Ctx}; format_response(internal_aaehash, {ok, {deleted, TombClock, RObj}}, RD, Ctx) -> BK = make_binarykey(riak_object:bucket(RObj), riak_object:key(RObj)), {SegmentID, SegmentHash} = @@ -334,6 +344,12 @@ format_response(internal_aaehash, {ok, RObj}, RD, Ctx) -> {<>, RD, Ctx}; +format_response(internal, {ok, {reap, {B, K, TC, LMD}}}, RD, Ctx) -> + SuccessMark = <<1:8/integer>>, + IsTombstone = <<0:8/integer>>, + ObjBin = encode_riakobject({reap, {B, K, TC, LMD}}), + {<>, RD, Ctx}; format_response(internal, {ok, {deleted, TombClock, RObj}}, RD, Ctx) -> SuccessMark = <<1:8/integer>>, IsTombstone = <<1:8/integer>>, diff --git a/src/riak_object.erl b/src/riak_object.erl index 318f89b23..6b04dcfcd 100644 --- a/src/riak_object.erl +++ b/src/riak_object.erl @@ -1190,24 +1190,26 @@ to_binary_version(Vsn, _B, _K, Obj = #r_object{}) -> binary_version(<<131,_/binary>>) -> v0; binary_version(<>) -> v1. +-type repl_ref() :: + {reap, + {riak_object:bucket(), riak_object:key(), + vclock:vclock(), erlang:timestamp()}}. + %% @doc Encode for nextgen_repl --spec nextgenrepl_encode(repl_v1, riak_object(), boolean()) -> binary(). +-spec nextgenrepl_encode( + repl_v1, riak_object()|repl_ref(), boolean()) -> binary(). +nextgenrepl_encode(repl_v1, {reap, {B, K, TC, LMD}}, _ToCompress) -> + ObjBK = nextgenrepl_binarykey(B, K), + TCBin = term_to_binary(TC), + {Mega, Secs, Micro} = LMD, + <<1:4/integer, 0:1/integer, 1:1/integer, 0:2/integer, + ObjBK/binary, + Mega:32/integer, Secs:32/integer, Micro:32/integer, + TCBin/binary>>; nextgenrepl_encode(repl_v1, RObj, ToCompress) -> B = riak_object:bucket(RObj), K = riak_object:key(RObj), - KS = byte_size(K), - ObjBK = - case B of - {T, B0} -> - TS = byte_size(T), - B0S = byte_size(B0), - <>; - B0 -> - B0S = byte_size(B0), - <<0:32/integer, B0S:32/integer, B0/binary, - KS:32/integer, K/binary>> - end, + ObjBK = nextgenrepl_binarykey(B, K), {Version, ObjBin} = case ToCompress of true -> @@ -1219,22 +1221,40 @@ nextgenrepl_encode(repl_v1, RObj, ToCompress) -> end, <>. -%% @doc Deocde for nextgen_repl --spec nextgenrepl_decode(binary()) -> riak_object(). -nextgenrepl_decode(<<1:4/integer, C:1/integer, _:3/integer, +-spec nextgenrepl_binarykey( + riak_object:bucket(), riak_object:key()) -> binary(). +nextgenrepl_binarykey({T, B}, K) -> + TS = byte_size(T), + BS = byte_size(B), + KS = byte_size(K), + <>; +nextgenrepl_binarykey(B, K) -> + BS = byte_size(B), + KS = byte_size(K), + <<0:32/integer, BS:32/integer, B/binary, KS:32/integer, K/binary>>. + +%% @doc Deocde for nextgenrepl +-spec nextgenrepl_decode(binary()) -> riak_object()|repl_ref(). +nextgenrepl_decode(<<1:4/integer, C:1/integer, R:1/integer, _:2/integer, 0:32/integer, BL:32/integer, B:BL/binary, KL:32/integer, K:KL/binary, ObjBin/binary>>) -> - nextgenrepl_decode(B, K, C == 1, ObjBin); -nextgenrepl_decode(<<1:4/integer, C:1/integer, _:3/integer, + nextgenrepl_decode(B, K, C == 1, R == 1, ObjBin); +nextgenrepl_decode(<<1:4/integer, C:1/integer, R:1/integer, _:2/integer, TL:32/integer, T:TL/binary, BL:32/integer, B:BL/binary, KL:32/integer, K:KL/binary, ObjBin/binary>>) -> - nextgenrepl_decode({T, B}, K, C == 1, ObjBin). - -nextgenrepl_decode(B, K, true, ObjBin) -> - nextgenrepl_decode(B, K, false, zlib:uncompress(ObjBin)); -nextgenrepl_decode(B, K, false, ObjBin) -> + nextgenrepl_decode({T, B}, K, C == 1, R == 1, ObjBin). + +nextgenrepl_decode(B, K, _, true, MetaBin) -> + <> = + MetaBin, + {reap, {B, K, binary_to_term(TClockBin), {Mega, Secs, Micro}}}; +nextgenrepl_decode(B, K, true, false, ObjBin) -> + nextgenrepl_decode(B, K, false, false, zlib:uncompress(ObjBin)); +nextgenrepl_decode(B, K, false, false, ObjBin) -> riak_object:from_binary(B, K, ObjBin). %% @doc Convert binary object to riak object @@ -1264,23 +1284,21 @@ from_binary(_B, _K, Obj = #r_object{}) -> Obj. --spec summary_from_binary(binary()) -> +-spec summary_from_binary(binary()|riak_object()) -> {vclock:vclock(), integer(), integer(), list(erlang:timestamp())|undefined, binary()}. %% @doc -%% Extract only sumarry infromation from the binary - the vector, the object +%% Extract only summary information from the binary - the vector, the object %% size and the sibling count -summary_from_binary(<<131, _Rest/binary>>=ObjBin) -> - case binary_to_term(ObjBin) of +summary_from_binary(<>=ObjBin) -> + summary_from_binary(ObjBin, byte_size(ObjBin)); +summary_from_binary(TermToBin) when is_binary(TermToBin) -> + case binary_to_term(TermToBin) of {proxy_object, HeadBin, ObjSize, _Fetcher} -> summary_from_binary(HeadBin, ObjSize); - T -> - {vclock(T), byte_size(ObjBin), value_count(T), - undefined, <<>>} - % Legacy object version will end up with dummy details + Objv0 -> + summary_from_binary(Objv0) end; -summary_from_binary(ObjBin) when is_binary(ObjBin) -> - summary_from_binary(ObjBin, byte_size(ObjBin)); summary_from_binary(Object = #r_object{}) -> % Unexpected scenario but included for parity with from_binary % Calculating object size is expensive (relatively to other branches) @@ -2259,7 +2277,17 @@ nextgenrepl() -> ACZ = riak_object:reconcile([A, C, Z], true), ACZ0 = nextgenrepl_decode(nextgenrepl_encode(repl_v1, ACZ, false)), ACZ0 = nextgenrepl_decode(nextgenrepl_encode(repl_v1, ACZ, true)), - ?assertEqual(ACZ0, ACZ). + ?assertEqual(ACZ, ACZ0), + LMD = os:timestamp(), + {reap, {B, K, ACZ1, LMD}} = + nextgenrepl_decode( + nextgenrepl_encode(repl_v1, {reap, {B, K, ACZ, LMD}}, false)), + {reap, {B, K, ACZ1, LMD}} = + nextgenrepl_decode( + nextgenrepl_encode(repl_v1, {reap, {B, K, ACZ, LMD}}, true)), + ?assertEqual(ACZ, ACZ1). + + verify_contents([], []) ->