Skip to content

Commit

Permalink
Remove "atomic" package in favour of sync.Mutex
Browse files Browse the repository at this point in the history
Reseting the counter with the "atomic" package is a little shaky and
probably doesn't work properly. Replace with sync.Mutex

Also simplify the case when we want to send more requests than what is
in the queue by using a non-blocking read.
  • Loading branch information
patstrom committed Aug 9, 2018
1 parent dde99f2 commit 361324b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 34 deletions.
4 changes: 2 additions & 2 deletions abios_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func New(username, password string) *client {
// SetRate sets the outgoing rate to "second" requests per second and "minute" requests
// per minte. A value less than or equal to 0 means previous
// value is kept. Default values are (5, 300)
func (a *client) SetRate(second, minute int) {
a.handler.setRate(int32(second), int32(minute))
func (a *client) SetRate(second, minute uint) {
a.handler.setRate(second, minute)
}

// authenticate queries the /oauth/access_token endpoint with the given credentials and
Expand Down
93 changes: 61 additions & 32 deletions request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package abios

import (
"net/url"
"sync/atomic"
"sync"
"time"
)

// Default values for the outgoing rate and size of request buffer.
const (
default_requests_per_second int32 = 5
default_requests_per_minute int32 = 300
default_requests_per_second uint = 5
default_requests_per_minute uint = 300

// Buffer one minutes worth of requests (this can not be changed at runtime)
default_request_buffer_size = default_requests_per_minute
Expand Down Expand Up @@ -56,8 +56,8 @@ type result struct {

// requestHandler buffers requests and sends them out at a user-specified rate.
type requestHandler struct {
requests_per_second int32 // How many requests can be performed per second.
requests_per_minute int32 // How many requests can be performed per minute.
requests_per_second uint // How many requests can be performed per second.
requests_per_minute uint // How many requests can be performed per minute.
queue chan *request // The queue of requests.
override responseOverride // Do we need to override the expected responses?
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func newRequestHandler() *requestHandler {
}

// SetRate sets the outgoing rate according to the give parameters. 0 or less means do nothing.
func (r *requestHandler) setRate(second, minute int32) {
func (r *requestHandler) setRate(second, minute uint) {
if 0 < second {
r.requests_per_second = second
}
Expand All @@ -112,9 +112,37 @@ func (r *requestHandler) setRate(second, minute int32) {

}

type resetable_counter struct {
count uint
mutex sync.Mutex
}

func (r *resetable_counter) add(i uint) {
r.mutex.Lock()
r.count += i
r.mutex.Unlock()
}

func (r *resetable_counter) increment() {
r.add(1)
}

func (r *resetable_counter) get() uint {
r.mutex.Lock()
tmp := r.count
r.mutex.Unlock()
return tmp
}

func (r *resetable_counter) reset() {
r.mutex.Lock()
r.count = 0
r.mutex.Unlock()
}

// dispatcher does requestHandler.Rate api-calls every requestHandler.ResetInterval
func (r *requestHandler) dispatcher() {
var requests_this_minute int32
var counter resetable_counter

ticker_second := time.NewTicker(time.Second)
ticker_minute := time.NewTicker(time.Minute)
Expand All @@ -127,42 +155,43 @@ func (r *requestHandler) dispatcher() {
case <-ticker_minute.C:
//if requests_today < r.requests_per_day // Also example
// Allow for more requests this minute if we still have requests left today
atomic.AddInt32(&requests_this_minute, -requests_this_minute)
counter.reset()
case <-ticker_second.C:
// Allow for more requests this second if we still have requests left this minute
if requests_this_minute < r.requests_per_minute {
if counter.get() < r.requests_per_minute {
go func() {
number_to_send := r.requests_per_second

// If there are less requests left this minute than the specified rate per second
// then send the lesser amount.
left_this_minute := r.requests_per_minute - requests_this_minute // requests left this minute
left_this_minute := r.requests_per_minute - counter.get() // requests left this minute
if left_this_minute < number_to_send {
number_to_send = left_this_minute
}

// If there are less items in the queue than the current "number_to_send" then
// send the lesser amount.
if int32(len(r.queue)) < number_to_send {
number_to_send = int32(len(r.queue))
}

// Consider the requests sent
atomic.AddInt32(&requests_this_minute, number_to_send)
for i := int32(0); i < number_to_send; i++ {
// One go-routine per request
go func() {
currentRequest := <-r.queue
re := result{}

// Do we have to override the response?
if r.override.override {
currentRequest.ch <- r.override.data
} else {
re.statuscode, re.body = performRequest(currentRequest.url, currentRequest.params)
currentRequest.ch <- re
}
}()
// Send the requests in a non-blocking way, so in case the queue is empty we break
// the loop. I.e never create more routines than what is in the queue
for i := uint(0); i < number_to_send; i++ {
select {
case req := <-r.queue:
go func(currentRequest *request) {
re := result{}

// Do we have to override the response?
if r.override.override {
currentRequest.ch <- r.override.data
} else {
re.statuscode, re.body = performRequest(currentRequest.url, currentRequest.params)
currentRequest.ch <- re
}

}(req)
counter.increment()
default:
// The default case is when there are no more requests in the channel, in
// which case we break the loop
break
}
}
}()
}
Expand Down

0 comments on commit 361324b

Please sign in to comment.