Skip to content

Commit

Permalink
Move to channel-less implementation. (#11)
Browse files Browse the repository at this point in the history
* Working rough cut

Signed-off-by: Ed Warnicke <hagbard@gmail.com>

* Move to channel-less implementation.

When multiple go routines queue to send into a channel, the order of their entry to the channel is unknowable:

https://groups.google.com/g/golang-nuts/c/PWt4r9b40bc/m/lC59KD5TQCwJ

Therefore, we cannot rely on a channel for ordering, as if we overrun its size, order will be non-deterministic.

This PR moves to using a strict ordering by atomically ordered ticket.

Signed-off-by: Ed Warnicke <hagbard@gmail.com>
  • Loading branch information
edwarnicke authored Oct 25, 2020
1 parent 009feaf commit 2dc42a7
Showing 1 changed file with 65 additions and 40 deletions.
105 changes: 65 additions & 40 deletions serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,60 +20,85 @@
package serialize

import (
"sort"
"sync"
"sync/atomic"
)

const (
channelSize = 256 // 256 is chosen because 256*8 = 2kb, or about the cost of a go routine
)
type job struct {
f func()
done chan struct{}
ticket uintptr
}

// Executor - a struct that can be used to guarantee exclusive, in order execution of functions.
type Executor struct {
orderCh chan func()
buffer []func()
bufferMutex sync.Mutex
init sync.Once
count int32
buffer []*job
mu sync.Mutex
ticket uintptr
}

// AsyncExec - guarantees f() will be executed Exclusively and in the Order submitted.
// It immediately returns a channel that will be closed when f() has completed execution.
func (e *Executor) AsyncExec(f func()) <-chan struct{} {
e.init.Do(func() {
e.orderCh = make(chan func(), channelSize)
})
// Start go routine if we don't have one
if atomic.AddInt32(&e.count, 1) == 1 {
result := make(chan struct{})
go func() {
f()
close(result)
if atomic.AddInt32(&e.count, -1) == 0 {
return
}
for {
e.bufferMutex.Lock()
buf := e.buffer[0:]
e.buffer = e.buffer[len(e.buffer):]
e.bufferMutex.Unlock()
for _, f := range buf {
f()
}
if len(buf) > 0 && atomic.AddInt32(&e.count, -int32(len(buf))) == 0 {
// Get a ticket. The ticket established absolute order.
ticket := atomic.AddUintptr(&e.ticket, 1)
if ticket == 0 {
panic("ticket == 0 - you've overrun a uintptr counter of jobs and are probably deadlocked")
}
// Create the job
jb := &job{
f: f,
done: make(chan struct{}),
ticket: ticket,
}
// The first ticket fires off processing
if ticket == 1 {
go e.process(jb)
return jb.done
}
// queue up the job in the buffer (note: buffer order itself does not guarantee order, job.ticket does)
e.mu.Lock()
e.buffer = append(e.buffer, jb)
e.mu.Unlock()
return jb.done
}

func (e *Executor) process(jb *job) {
// Run the first job inline with processing. This is a performance optimization
jb.f()
close(jb.done)
// If there are no more jobs, exit
if atomic.CompareAndSwapUintptr(&e.ticket, 1, 0) {
return
}
// Starting from ticket == 2 (because we processed ticket == 1 already)
ticket := uintptr(2)
var buf []*job
for {
// Drain the buffer
e.mu.Lock()
buf = append(buf, e.buffer[0:]...)
e.buffer = e.buffer[len(e.buffer):]
e.mu.Unlock()
// Sort the buffer
sort.SliceStable(buf, func(i, j int) bool {
return buf[i].ticket < buf[j].ticket
})
for len(buf) > 0 {
// If buf[0].ticket == ticket (the next ticket to process), process it
if buf[0].ticket == ticket {
buf[0].f()
close(buf[0].done)
if atomic.CompareAndSwapUintptr(&e.ticket, ticket, 0) {
return
}
buf = buf[1:]
ticket++
continue
}
}()
return result
}
result := make(chan struct{})
e.orderCh <- func() {
f()
close(result)
// If buf[0] != ticket, we have yet to receive the next job and need to drain the buffer again
break
}
}
e.bufferMutex.Lock()
e.buffer = append(e.buffer, <-e.orderCh)
e.bufferMutex.Unlock()
return result
}

0 comments on commit 2dc42a7

Please sign in to comment.