Skip to content

Commit

Permalink
Use fillmore-labs.com/async
Browse files Browse the repository at this point in the history
Signed-off-by: Oliver Eikemeier <eikemeier@fillmore-labs.com>
  • Loading branch information
eikemeier committed Apr 29, 2024
1 parent a69eb44 commit 30c3ee9
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ jobs:
- name: 🧸 golangci-lint
uses: golangci/golangci-lint-action@v4
with:
version: v1.56.2
version: v1.57.2
- name: 🔨 Test
run: go test -race ./...
12 changes: 11 additions & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@ linters:
- depguard
- exhaustruct
- forbidigo
- ireturn
- nonamedreturns
- varnamelen
- wrapcheck
- wsl
# Go 1.22
- copyloopvar
- intrange
linters-settings:
testifylint:
enable-all: true
disable:
- require-error
ireturn:
allow:
- error
- generic
issues:
exclude-generated-strict: true
exclude-files:
- _mock\.go$
38 changes: 19 additions & 19 deletions batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,31 @@ import (
"errors"
"time"

"fillmore-labs.com/async"
"fillmore-labs.com/microbatch/internal/processor"
"fillmore-labs.com/microbatch/internal/timer"
internal "fillmore-labs.com/microbatch/internal/types"
"fillmore-labs.com/promise"
)

// Batcher handles submitting requests in batches and returning results through channels.
type Batcher[Q, R any] struct {
// process processes batches of requests.
process func(requests []internal.BatchRequest[Q, R])

// queue holds the collected requests until processing.
queue chan []internal.BatchRequest[Q, R]

// batchSize is the maximum number of requests per batch or zero, when unlimited.
batchSize int

// batchDuration is the maximum time a batch can collect before processing or zero, when unlimited.
batchDuration time.Duration
// process processes batches of requests.
process func(requests []internal.BatchRequest[Q, R])

// timer tracks the batch duration and signals when it expires.
timer timer.Timer

// newTimer creates a new timer or a mock for testing
newTimer func(d time.Duration, f func(sent *bool)) timer.Timer

// batchDuration is the maximum time a batch can collect before processing or zero, when unlimited.
batchDuration time.Duration

// batchSize is the maximum number of requests per batch or zero, when unlimited.
batchSize int
}

var (
Expand Down Expand Up @@ -88,13 +88,13 @@ func NewBatcher[Q, R any, C comparable, QQ ~[]Q, RR ~[]R](
queue <- nil

return &Batcher[Q, R]{
process: p.Process,
queue: queue,

batchSize: option.size,
batchDuration: option.timeout,
process: p.Process,

newTimer: timer.New,

batchDuration: option.timeout,
batchSize: option.size,
}
}

Expand All @@ -110,7 +110,7 @@ type Option interface {
}

// WithSize is an option to configure the batch size.
func WithSize(size int) Option {
func WithSize(size int) Option { //nolint:ireturn
return sizeOption{size: size}
}

Expand All @@ -123,7 +123,7 @@ func (o sizeOption) apply(opts *options) {
}

// WithTimeout is an option to configure the batch timeout.
func WithTimeout(timeout time.Duration) Option {
func WithTimeout(timeout time.Duration) Option { //nolint:ireturn
return timeoutOption{timeout: timeout}
}

Expand All @@ -136,16 +136,16 @@ func (o timeoutOption) apply(opts *options) {
}

// Submit submits a job without waiting for the result.
func (b *Batcher[Q, R]) Submit(request Q) promise.Future[R] {
result, future := promise.New[R]()
func (b *Batcher[Q, R]) Submit(request Q) *async.Future[R] {
var response async.Promise[R]
batchRequest := internal.BatchRequest[Q, R]{
Request: request,
Result: result,
Result: &response,
}

b.enqueue(batchRequest)

return future
return response.Future()
}

// Execute submits a job and waits for the result.
Expand Down
20 changes: 14 additions & 6 deletions batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ package microbatch_test

import (
"context"
"math/rand"
"math/rand/v2"
"reflect"
"strconv"
"sync"
"testing"
"time"

"fillmore-labs.com/async"
"fillmore-labs.com/microbatch"
"fillmore-labs.com/microbatch/internal/mocks"
"fillmore-labs.com/promise"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -73,16 +73,24 @@ func (s *BatcherTestSuite) TestBatcher() {
s.BatchProcessor.EXPECT().ProcessJobs(mock.Anything).Return(returned, nil).Once()

// when
futures := make([]promise.Future[string], iterations)
futures := make([]*async.Future[string], iterations)
for i := 0; i < iterations; i++ {
futures[i] = s.Batcher.Submit(i + 1)
}
s.Batcher.Send()

ctx := context.Background()
results, err := promise.AwaitAllValues(ctx, futures...)

s.Batcher.Send()
results := make([]string, 0, len(futures))
var err error
for _, f := range futures {
result, e := f.Await(ctx)
if e != nil {
err = e

break
}
results = append(results, result)
}

// then
if s.NoErrorf(err, "Unexpected error executing jobs") {
Expand Down
22 changes: 11 additions & 11 deletions dataloader/dataloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package dataloader
import (
"sync"

"fillmore-labs.com/async"
"fillmore-labs.com/microbatch"
"fillmore-labs.com/promise"
)

// DataLoader demonstrates how to use [microbatch.Batcher] to implement a simple Facebook [DataLoader].
Expand All @@ -29,8 +29,8 @@ import (
// [DataLoader]: https://www.youtube.com/watch?v=OQTnXNCDywA
type DataLoader[K comparable, R any] struct {
mu sync.RWMutex
cache map[K]*promise.Memoizer[R] // cache stores keys mapped to results
batcher *microbatch.Batcher[K, R] // batcher batches keys and retrieves results
cache map[K]*async.Future[R] // cache stores keys mapped to results
batcher *microbatch.Batcher[K, R] // batcher batches keys and retrieves results
}

// NewDataLoader create a new [DataLoader].
Expand All @@ -47,29 +47,29 @@ func NewDataLoader[K comparable, R any, KK ~[]K, RR ~[]R](
)

return &DataLoader[K, R]{
cache: make(map[K]*promise.Memoizer[R]),
cache: make(map[K]*async.Future[R]),
batcher: batcher,
}
}

// Load retrieves a value from the cache or loads it asynchronously.
func (d *DataLoader[K, R]) Load(key K) *promise.Memoizer[R] {
func (d *DataLoader[K, R]) Load(key K) *async.Future[R] {
d.mu.RLock()
memoizer, ok := d.cache[key]
future, ok := d.cache[key]
d.mu.RUnlock()
if ok {
return memoizer
return future
}

d.mu.Lock()
defer d.mu.Unlock()
memoizer, ok = d.cache[key]
future, ok = d.cache[key]
if !ok {
memoizer = d.batcher.Submit(key).Memoize()
d.cache[key] = memoizer
future = d.batcher.Submit(key)
d.cache[key] = future
}

return memoizer
return future
}

// Send loads all submitted keys.
Expand Down
4 changes: 2 additions & 2 deletions dataloader/dataloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"fmt"
"sync/atomic"

"fillmore-labs.com/async"
"fillmore-labs.com/microbatch"
"fillmore-labs.com/microbatch/dataloader"
"fillmore-labs.com/promise"
)

type DataProcessor struct {
Expand Down Expand Up @@ -63,7 +63,7 @@ func Example() {
)

queries := [11]int{1, 2, 1, 2, 3, 3, 4, 1, 2, 3, 5}
results := make([]*promise.Memoizer[QueryResult], len(queries))
results := make([]*async.Future[QueryResult], len(queries))
for i, query := range queries {
results[i] = d.Load(query)
}
Expand Down
1 change: 0 additions & 1 deletion examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ func Example_blocking() {

// Shut down
wg.Wait()
batcher.Send()
// Unordered output:
// Processed job 1
// Processed job 2
Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ module fillmore-labs.com/microbatch

go 1.21

toolchain go1.22.1
toolchain go1.22.2

require (
fillmore-labs.com/promise v0.0.2
fillmore-labs.com/async v0.0.2
github.com/stretchr/testify v1.9.0
)

require (
fillmore-labs.com/lazydone v0.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
20 changes: 9 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
fillmore-labs.com/promise v0.0.2 h1:DocbRu3sgTgGmrnh5Mn0/Ea5W7/T0LPujm734z8zhCc=
fillmore-labs.com/promise v0.0.2/go.mod h1:uRXW66RLl25mQXnTaEIyJOxckSkmHw+AfqwiU1/j38A=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
fillmore-labs.com/async v0.0.2 h1:6V8WkbYExgfDuILbGBBxi58HDah/znIEhM1z+TaxU+Q=
fillmore-labs.com/async v0.0.2/go.mod h1:ZeYeicLodp+2b4VT9V1c+ix7lGPD00W7tRXrhc6Cv0g=
fillmore-labs.com/lazydone v0.0.2 h1:y6LZlS32691K2T3mvmbYJ7qnwOTQgWG6NeG6E023EOI=
fillmore-labs.com/lazydone v0.0.2/go.mod h1:/v+72piFl79m/Ik8noCymbQmxWD/WtJm5FCeafWdkD0=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
6 changes: 3 additions & 3 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"fmt"
"log/slog"

"fillmore-labs.com/async"
internal "fillmore-labs.com/microbatch/internal/types"
"fillmore-labs.com/promise"
)

// Processor handles batch processing of jobs and results.
Expand All @@ -42,8 +42,8 @@ type Processor[Q, R any, C comparable, QQ ~[]Q, RR ~[]R] struct {
ErrDuplicateID error
}

// resultMap is map from correlation ID to result [promise.Promise].
type resultMap[R any, C comparable] map[C]promise.Promise[R]
// resultMap is map from correlation ID to result [async.Promise].
type resultMap[R any, C comparable] map[C]*async.Promise[R]

// Process takes a batch of requests and handles processing.
func (p *Processor[Q, R, _, _, _]) Process(requests []internal.BatchRequest[Q, R]) {
Expand Down
9 changes: 5 additions & 4 deletions internal/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"strconv"
"testing"

"fillmore-labs.com/async"
"fillmore-labs.com/microbatch/internal/mocks"
"fillmore-labs.com/microbatch/internal/processor"
internal "fillmore-labs.com/microbatch/internal/types"
"fillmore-labs.com/promise"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -132,11 +132,12 @@ func (s *ProcessorTestSuite) TestProcessorDuplicateUncorrelated() {
}
}

func makeRequestsResults[Q, R any](ids []Q) ([]internal.BatchRequest[Q, R], []promise.Future[R]) {
func makeRequestsResults[Q, R any](ids []Q) ([]internal.BatchRequest[Q, R], []*async.Future[R]) {
requests := make([]internal.BatchRequest[Q, R], len(ids))
results := make([]promise.Future[R], len(ids))
results := make([]*async.Future[R], len(ids))
for i, id := range ids {
p, f := promise.New[R]()
p := &async.Promise[R]{}
f := p.Future()
requests[i] = internal.BatchRequest[Q, R]{Request: id, Result: p}
results[i] = f
}
Expand Down
2 changes: 1 addition & 1 deletion internal/timer/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (t timer) Stop() {
}
}

func New(d time.Duration, f func(sent *bool)) Timer {
func New(d time.Duration, f func(sent *bool)) Timer { //nolint:ireturn
sent := new(bool)

return timer{
Expand Down
4 changes: 2 additions & 2 deletions internal/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package types

import "fillmore-labs.com/promise"
import "fillmore-labs.com/async"

// BatchRequest represents a single request submitted to the [Batcher], along with the channel to return the result on.
type BatchRequest[Q, R any] struct {
Request Q
Result promise.Promise[R]
Result *async.Promise[R]
}

0 comments on commit 30c3ee9

Please sign in to comment.