You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Nov 10, 2020. It is now read-only.
I haven't noticed it first as I used an idiomatic channel approach for a while but when I was stuck at 100k msg/sec, I took a look at your ring buffer once again.
In general I really like your buffer (and its amazingly fast 👍 ) but i've found some serious race conditions with your cursor locks.
They were hard to find because they're occurring really really rarely on my machine (1 out of 10k messages or so in a strictly serial 1-write-1-read test) but then your broker explodes.
Just take a look at ReadWait and WriteCommit.
func (this*buffer) WriteCommit(nint) (int, error) {
start, cnt, err:=this.waitForWriteSpace(n)
iferr!=nil {
return0, err
}
// If we are here then there's enough bytes to committhis.pseq.set(start+int64(cnt))
this.ccond.L.Lock()
this.ccond.Broadcast()
this.ccond.L.Unlock()
returncnt, nil
}
func (this*buffer) ReadWait(nint) ([]byte, error) {
ifint64(n) >this.size {
returnnil, bufio.ErrBufferFull
}
ifn<0 {
returnnil, bufio.ErrNegativeCount
}
cpos:=this.cseq.get()
ppos:=this.pseq.get()
// This is the magic read-to position. The producer position must be equal or// greater than the next position we read to.next:=cpos+int64(n)
// If there's no data, then let's wait until there is some datathis.ccond.L.Lock()
for ; next>ppos; ppos=this.pseq.get() {
ifthis.isDone() {
returnnil, io.EOF
}
this.ccond.Wait()
}
this.ccond.L.Unlock()
// If we are here that means we have at least n bytes of data available.cindex:=cpos&this.mask// If cindex (index relative to buffer) + n is more than buffer size, that means// the data wrappedifcindex+int64(n) >this.size {
// reset the tmp bufferthis.tmp=this.tmp[0:0]
l:=len(this.buf[cindex:])
this.tmp=append(this.tmp, this.buf[cindex:]...)
this.tmp=append(this.tmp, this.buf[0:n-l]...)
returnthis.tmp[:n], nil
}
returnthis.buf[cindex : cindex+int64(n)], nil
}
The race scenario:
ReadWait reads the producer position atomically with ppos := this.pseq.get()
Context switch
WriteCommit advances ppos, locks ccond, notifies sleeping consumers and finishes
Context switch
ReadWait calculates next, locks consumers, goes to sleep and never wakes up
Simple fix:
As you'll acquire the consumer lock anyway, you should read and write the producer position with consumer lock.
I haven't checked the other methods yet, but chances are good that there are more races like this one as you always use atomic operations without locking the parties.
The text was updated successfully, but these errors were encountered:
Yep there are more situations like this. Everywhere where you advance the producer/consumer position and lock the conditions for the broadcast or rely on the broadcast to wake up.
I haven't noticed it first as I used an idiomatic channel approach for a while but when I was stuck at 100k msg/sec, I took a look at your ring buffer once again.
In general I really like your buffer (and its amazingly fast 👍 ) but i've found some serious race conditions with your cursor locks.
They were hard to find because they're occurring really really rarely on my machine (1 out of 10k messages or so in a strictly serial 1-write-1-read test) but then your broker explodes.
Just take a look at ReadWait and WriteCommit.
The race scenario:
ppos := this.pseq.get()
ppos
, locksccond
, notifies sleeping consumers and finishesSimple fix:
As you'll acquire the consumer lock anyway, you should read and write the producer position with consumer lock.
I haven't checked the other methods yet, but chances are good that there are more races like this one as you always use atomic operations without locking the parties.
The text was updated successfully, but these errors were encountered: