-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asychronous RPC requests #1751
base: dev
Are you sure you want to change the base?
Asychronous RPC requests #1751
Conversation
This adds a generic interface for making asychronous RPC request handlers, and applies these to the reward and exit signature endpoints. The RPC interface adds an alternative the `invoke(...)` method to optionally take a new, third argument of a `shared_ptr<response>`: when such a version of the invoke method is present then response is not send until the destruction of the shared_ptr, and so an asychronous request uses these, keeps the shared_ptr alive until the response is available, then lets it destruct which then triggers the response. This then pushes the async approach through the reward and exit signature requests, and gets everything working through asychronous callbacks rather than blocking requests. This also makes some small changes to signature handling: - Allow exit/liquidation RPC requests to be made by either SN pubkey (as currently) or BLS pubkey (new). - Allow liquidation of oxend-non-existent BLS pubkeys (i.e. liquidating a BLS pubkey that doesn't match any SNs oxen knows of); without this it isn't possible to remove a "bad" contract node that oxend didn't accept the registration of for whatever reason. - Add code to remove unwanted node extra signatures from produced signatures. Previously our concept of "non-signers" only included IDs to pass to the contract, but there is also a reverse failure were we collect a signature from a SN that isn't in the contract anymore (for example: a recently removed node with an incoming, but unconfirmed, exit event). This amends the signing code to detect any such signers and subtract the signatures of any such SNs from the aggregate before returning it. - Removes the walk-the-snode-linked-list contract handling code as it is not used anymore.
Fixes debug build.
There are (currently) two places invoking the callback, one of which wasn't catching exceptions; this genericizes the callback invocation to fix it, and adds a logging try/catch around the final_callback invocation as well.
src/bls/bls_aggregator.cpp
Outdated
cryptonote::core& core, std::unordered_map<bls_public_key, bls_signature> signatures) { | ||
|
||
if (log::get_level(logcat) <= log::Level::debug) | ||
core.l2_tracker().get_all_service_node_ids( | ||
std::nullopt, | ||
[&core, signatures = std::move(signatures)]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cryptonote::core& core, std::unordered_map<bls_public_key, bls_signature> signatures) { | |
if (log::get_level(logcat) <= log::Level::debug) | |
core.l2_tracker().get_all_service_node_ids( | |
std::nullopt, | |
[&core, signatures = std::move(signatures)]( | |
cryptonote::core& core, const std::unordered_map<bls_public_key, bls_signature>& signatures) { | |
if (log::get_level(logcat) <= log::Level::debug) | |
core.l2_tracker().get_all_service_node_ids( | |
std::nullopt, | |
[&core, signatures]( |
Take by ref and only produce a copy if in the lambda, e.g. in debug logging mode only.
MAX_CONNECTIONS); | ||
std::lock_guard lock{connection_mutex}; | ||
|
||
while (active_connections < MAX_CONNECTIONS && next_snode < snodes.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we need this MAX_CONNECTIONS
? Internally OMQ has a thread pool from my understanding, is there a need to rate limit how many jobs we dispatch? Maybe there's a queue limit in OMQ we have to respect, but if we omit this then we don't need the recursion which would simplify the algorithm.
If there was a queue limit though I feel like we should be querying that from OMQ rather than an ad-hoc check here (because multiple BLS requests will basically fill it up without us being able to rate limit ourselves)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second review, I see that essentially the threads allocated for the RPC server are only for parsing and submitting the request into a queue. It returns immediately so that the handler can service more requests. It's then OMQ's responsibility to dequeue and submit the network request and this is done recursively using the thread allocated for the network request from their thread pool. The consequences of this is fairly benign because after the network request is done it recurses into establish()
to dispatch more network requests and exits ASAP after that.
The work load the thread has to do post-response (recursive establish()
and friends) is fairly small so the window of time it actually blocks other network requests in OMQ waiting for the post-amble to finish is small.
This is the same pattern AFAICT is in L2Tracker, Ethyl but also the CURL revamp in the storage server. But I have issues in this pattern in that the control flow and the complexity for this is very high because there's a need to smuggle the response lifetime across multiple levels of callbacks and that asynchronous implementation gets leaked into the RPC implementation code.
The top level goal here being that something has to release the RPC server from waiting for the response because it doesn't want to block the handling of other RPC request. That something we're punting this responsibility to is the OMQ threads (but only if the request requires recursion, e.g. the max connections > 900).
Is there a simpler solution here that we just spawn a thread to handle the RPC request, capture the response context into the thread and let it run detached? This abstracts the concept that the RPC implementation has to be aware that they may be operating in an asynchronous manner. RPC code can be written synchronously where it's then the invoker's responsibility to prep to execute the RPC code asynchronously. That drops the recursion and the helper data structure rpc_responder
and auxiliary code to process that.
One of the characteristics we did gain from the current implementation is that the number of threads concurrently active by the process is bounded to the number of threads allocated to the RPC server. If we begin spawning a thread for each dispatched request we lose that but that is easily handled by just having some atomic counter to cap the number of threads spawned by that system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MAX_CONNECTIONS
isn't about processing concurrency, but rather about the number of simultaneous TCP connections (and thus file descriptors) that we will open at a time to issue these requests. 900 is a bit under the typical 1024 file descriptor limit, and we close these connections immediately as we get responses returned so that even if we open new ones, we should stay at a reasonably safe number.
I do worry that 900 seems a bit high since any incoming HTTP RPC requests are also using a file descriptor; I think we could reasonably lower it to something like 300-500 without impacting total request time too much.
The top level goal here being that something has to release the RPC server from waiting for the response because it doesn't want to block the handling of other RPC request. That something we're punting this responsibility to is the OMQ threads (but only if the request requires recursion, e.g. the max connections > 900).
That's not quite how this works: we are always (*
almost) releasing the RPC thread from responsibility until we have accumulated every response from the remotes, regardless of the number of connections. Even if there's just one request, we don't want to block waiting for that remote request to come back: and so we create the request state, initiate the connection and request on that connection, then return immediately, surrendering control. Control resumes when a response comes back, at which point we continue processing, accumulating the single result, possibly establishing new connections if more are needed, or if we got the very last response, fulfilling the RPC response (and release our state). That entire process could happen in a second, or might be a dozen seconds, depending on network latencies and timeouts.
*
-- well, not quite always: if we serve a repeated request it from cache then there's no async control surrender at all.
Is there a simpler solution here that we just spawn a thread to handle the RPC request, capture the response context into the thread and let it run detached?
The explicit intention of this PR is to get away from that.
One of the characteristics we did gain from the current implementation is that the number of threads concurrently active by the process is bounded to the number of threads allocated to the RPC server. If we begin spawning a thread for each dispatched request we lose that but that is easily handled by just having some atomic counter to cap the number of threads spawned by that system.
Doing so would just be pushing the bottleneck to a different place, but you still end up with a limit based on uncontrollable external network latencies (and timeouts) rather than local resources, and you can still end up in a situation where some network event (in this case: waiting for timeouts from slow or offline nodes) can still back up your request queue to the point that it is prevent from accepting new requests, and then you have the issue of what to do if a signing request comes in and all your threads are currently blocked waiting on responses: do you reply with an RPC failure? Do you push the blockage further back (effectively blocking other RPC requests)? Neither of these are particularly desirable.
Using threads to make blocking requests feels like an anti-pattern to me; yes non-blocking code is more complicated in general (whatever the domain), but it's also considerably more efficient in that your internal code is idle until something triggers it with something to do, rather than having multiple blocked threads waiting on responses from some unpredictable external factor.
With this async approach, we have no practical limit (other than system resources) on how many requests we can handle: the only time any resources are occupied is when processing the rpc request, initiating outbound connections, or processing the response to requests on this connections, but otherwise resources are idle and available for other tasks.
This is also an issue we have to deal with in multiple places already, in storage-server and lokinet but also inside existing oxen-core http rpc request processing, just lower down: uWebSockets is asynchronous by design, where there just isn't a single request callback because multiple requests are in various states of delivering data at any given time. All of these use a similar approach of stashing the ongoing state into an object inside a callback to get completed later.
Direct OMQ or QUIC requests make this much easier (you can just grab a msg.send_later()
and store that until you're ready to reply), but the RPC interface in core -- where you don't reply, you just set up the response in an object already waiting for you -- made it a bit messier here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not quite how this works: we are always (* almost) releasing the RPC thread from responsibility until we have accumulated every response from the remotes, regardless of the number of connections. Even if there's just one request, we don't want to block waiting for that remote request to come back: and so we create the request state, initiate the connection and request on that connection, then return immediately, surrendering control. Control resumes when a response comes back, at which point we continue processing, accumulating the single result, possibly establishing new connections if more are needed, or if we got the very last response, fulfilling the RPC response (and release our state). That entire process could happen in a second, or might be a dozen seconds, depending on network latencies and timeouts.
Yep this is what I meant. As in control is relinquished from the RPC thread but it doesn't disappear, we moved control to the OMQ thread that is handling the request, when that eventually finishes the callback is triggered, on the OMQ thread that is serving the last request in the aggregation (correct me if I'm wrong, that thread is sleeping and blocked).
So to rephrase more aptly, it's a coroutine implemented using asynchronous lambdas that proceeds the BLS aggregation state machine. This has complexities as we've mentioned. The complexity is avoidable using a different pattern like a coordinating thread that selectively forwards the request to OMQ and waits for the response on the same thread.
Doing so would just be pushing the bottleneck to a different place, but you still end up with a limit based on uncontrollable external network latencies (and timeouts) rather than local resources, and you can still end up in a situation where some network event (in this case: waiting for timeouts from slow or offline nodes) can still back up your request queue to the point that it is prevent from accepting new requests, and then you have the issue of what to do if a signing request comes in and all your threads are currently blocked waiting on responses: do you reply with an RPC failure? Do you push the blockage further back (effectively blocking other RPC requests)? Neither of these are particularly desirable.
So I'm identifying 2 problems I want to solve based off the discussion so far.
- Is it possible to get rid of the async complexity from the code
- How do we gracefully handle the limits of the system (new concern you've brought up)
A simple scenario that breaks the limits of our proposed systems. Submit 2x BLS aggregations on the same tick to the RPC endpoint. On mainnet with 2k nodes, a BLS aggregation will generate 1800 (max 900 per BLS request atm) requests to the OMQ queue. This will most likely break with the async/detached thread method because they act independently to each other and try to jam the queue without considering each other (and anecdotally I've seem OMQ freeze up with a 1k job limit, and yes we can bump this higher and move the can down the road a bit further).
Using threads to make blocking requests feels like an anti-pattern to me; yes non-blocking code is more complicated in general (whatever the domain), but it's also considerably more efficient in that your internal code is idle until something triggers it with something to do, rather than having multiple blocked threads waiting on responses from some unpredictable external factor.
I'm not sure about this concern because we are blocking threads, we're blocking in OMQ, or whatever mechanism is in there to wait for the response to return. Something is waiting for epoll/kqueue/io_uring, it's never free. It's that if we move the blocking out of OMQ into essentially userland code (Oxen) we have granular control over when they are forwarded into OMQ.
--
So for 1) initially my first draft was to suggest a detached thread because it's easy to implement that. That in itself removes the need for recursive lambdas because you have a thread that will sit around waiting for the response.
But for 2) it's insufficient in actually controlling when the request is forwarded onto OMQ. The next step for that builds on that by actually having a coordinating thread that dispatches those jobs to OMQ. This is similar in to how io_uring works.
The reason for that is you only want to dequeue requests when the underlying system, the coordinating thread knows that OMQ has the capacity to serve the request. So requests come in, it stores them into a queue, RPC thread returns so it can await more requests, the requests gets dequeued and executed when it knows OMQ has capacity to handle it.
Then the amount of requests we can service is limited to the coordinating thread's queue size. It's tweakable, it can be scaled up, it can use a fixed amount of resources, it can rate limit how many requests are pushed into OMQ e.t.c.
This adds a generic interface for making asychronous RPC request
handlers, and applies these to the reward and exit signature endpoints.
The RPC interface adds an alternative the
invoke(...)
method tooptionally take a new, third argument of a
shared_ptr<response>
: whensuch a version of the invoke method is present then response is not send
until the destruction of the shared_ptr, and so an asychronous request
uses these, keeps the shared_ptr alive until the response is available,
then lets it destruct which then triggers the response.
This then pushes the async approach through the reward and exit
signature requests, and gets everything working through asychronous
callbacks rather than blocking requests.
This also makes some small changes to signature handling:
Allow exit/liquidation RPC requests to be made by either SN pubkey (as
currently) or BLS pubkey (new).
Allow liquidation of oxend-non-existent BLS pubkeys (i.e. liquidating
a BLS pubkey that doesn't match any SNs oxen knows of); without this
it isn't possible to remove a "bad" contract node that oxend didn't
accept the registration of for whatever reason.
Add code to remove unwanted node extra signatures from produced
signatures. Previously our concept of "non-signers" only included IDs
to pass to the contract, but there is also a reverse failure were we
collect a signature from a SN that isn't in the contract anymore (for
example: a recently removed node with an incoming, but unconfirmed,
exit event). This amends the signing code to detect any such signers
and subtract the signatures of any such SNs from the aggregate before
returning it.
Removes the walk-the-snode-linked-list contract handling code as it is
not used anymore.