diff --git a/abios_sdk.go b/abios_sdk.go index 3d15c81..0309e95 100644 --- a/abios_sdk.go +++ b/abios_sdk.go @@ -111,8 +111,8 @@ func New(username, password string) *client { // SetRate sets the outgoing rate to "second" requests per second and "minute" requests // per minte. A value less than or equal to 0 means previous // value is kept. Default values are (5, 300) -func (a *client) SetRate(second, minute int) { - a.handler.setRate(int32(second), int32(minute)) +func (a *client) SetRate(second, minute uint) { + a.handler.setRate(second, minute) } // authenticate queries the /oauth/access_token endpoint with the given credentials and diff --git a/request_handler.go b/request_handler.go index 7f0e552..529e30d 100644 --- a/request_handler.go +++ b/request_handler.go @@ -2,14 +2,14 @@ package abios import ( "net/url" - "sync/atomic" + "sync" "time" ) // Default values for the outgoing rate and size of request buffer. const ( - default_requests_per_second int32 = 5 - default_requests_per_minute int32 = 300 + default_requests_per_second uint = 5 + default_requests_per_minute uint = 300 // Buffer one minutes worth of requests (this can not be changed at runtime) default_request_buffer_size = default_requests_per_minute @@ -56,8 +56,8 @@ type result struct { // requestHandler buffers requests and sends them out at a user-specified rate. type requestHandler struct { - requests_per_second int32 // How many requests can be performed per second. - requests_per_minute int32 // How many requests can be performed per minute. + requests_per_second uint // How many requests can be performed per second. + requests_per_minute uint // How many requests can be performed per minute. queue chan *request // The queue of requests. override responseOverride // Do we need to override the expected responses? } @@ -96,7 +96,7 @@ func newRequestHandler() *requestHandler { } // SetRate sets the outgoing rate according to the give parameters. 0 or less means do nothing. -func (r *requestHandler) setRate(second, minute int32) { +func (r *requestHandler) setRate(second, minute uint) { if 0 < second { r.requests_per_second = second } @@ -112,9 +112,37 @@ func (r *requestHandler) setRate(second, minute int32) { } +type resetable_counter struct { + count uint + mutex sync.Mutex +} + +func (r *resetable_counter) add(i uint) { + r.mutex.Lock() + r.count += i + r.mutex.Unlock() +} + +func (r *resetable_counter) increment() { + r.add(1) +} + +func (r *resetable_counter) get() uint { + r.mutex.Lock() + tmp := r.count + r.mutex.Unlock() + return tmp +} + +func (r *resetable_counter) reset() { + r.mutex.Lock() + r.count = 0 + r.mutex.Unlock() +} + // dispatcher does requestHandler.Rate api-calls every requestHandler.ResetInterval func (r *requestHandler) dispatcher() { - var requests_this_minute int32 + var counter resetable_counter ticker_second := time.NewTicker(time.Second) ticker_minute := time.NewTicker(time.Minute) @@ -127,42 +155,43 @@ func (r *requestHandler) dispatcher() { case <-ticker_minute.C: //if requests_today < r.requests_per_day // Also example // Allow for more requests this minute if we still have requests left today - atomic.AddInt32(&requests_this_minute, -requests_this_minute) + counter.reset() case <-ticker_second.C: // Allow for more requests this second if we still have requests left this minute - if requests_this_minute < r.requests_per_minute { + if counter.get() < r.requests_per_minute { go func() { number_to_send := r.requests_per_second // If there are less requests left this minute than the specified rate per second // then send the lesser amount. - left_this_minute := r.requests_per_minute - requests_this_minute // requests left this minute + left_this_minute := r.requests_per_minute - counter.get() // requests left this minute if left_this_minute < number_to_send { number_to_send = left_this_minute } - // If there are less items in the queue than the current "number_to_send" then - // send the lesser amount. - if int32(len(r.queue)) < number_to_send { - number_to_send = int32(len(r.queue)) - } - - // Consider the requests sent - atomic.AddInt32(&requests_this_minute, number_to_send) - for i := int32(0); i < number_to_send; i++ { - // One go-routine per request - go func() { - currentRequest := <-r.queue - re := result{} - - // Do we have to override the response? - if r.override.override { - currentRequest.ch <- r.override.data - } else { - re.statuscode, re.body = performRequest(currentRequest.url, currentRequest.params) - currentRequest.ch <- re - } - }() + // Send the requests in a non-blocking way, so in case the queue is empty we break + // the loop. I.e never create more routines than what is in the queue + for i := uint(0); i < number_to_send; i++ { + select { + case req := <-r.queue: + go func(currentRequest *request) { + re := result{} + + // Do we have to override the response? + if r.override.override { + currentRequest.ch <- r.override.data + } else { + re.statuscode, re.body = performRequest(currentRequest.url, currentRequest.params) + currentRequest.ch <- re + } + + }(req) + counter.increment() + default: + // The default case is when there are no more requests in the channel, in + // which case we break the loop + break + } } }() }