diff --git a/README.md b/README.md index c1bdb7e..12fee5c 100644 --- a/README.md +++ b/README.md @@ -116,6 +116,12 @@ See [Differences from Open Match](./docs/differences.md) for details. Is minimatch really just a mini? No, it is not! Despite its name, minimatch has scalability. Please see [Scalable minimatch](./docs/scalable.md). +## Consistency and performance + +Please see the following docs for consistency and performance to consider in minimatch. + +[Consistency and performance](./docs/consistency.md) + ## Metrics minimatch Backend exposes metrics in OpenTelemetry format to help monitor performance. diff --git a/backend.go b/backend.go index 8570b85..0ef7096 100644 --- a/backend.go +++ b/backend.go @@ -20,7 +20,7 @@ const ( ) type Backend struct { - store statestore.StateStore + store statestore.BackendStore mmfs map[*pb.MatchProfile]MatchFunction mmfMu sync.RWMutex assigner Assigner @@ -39,18 +39,20 @@ func (f BackendOptionFunc) apply(options *backendOptions) { } type backendOptions struct { - evaluator Evaluator - fetchTicketsLimit int64 - meterProvider metric.MeterProvider - logger *slog.Logger + evaluator Evaluator + fetchTicketsLimit int64 + meterProvider metric.MeterProvider + logger *slog.Logger + validateTicketsBeforeAssign bool } func defaultBackendOptions() *backendOptions { return &backendOptions{ - evaluator: nil, - fetchTicketsLimit: defaultFetchTicketsLimit, - meterProvider: otel.GetMeterProvider(), - logger: slog.Default(), + evaluator: nil, + fetchTicketsLimit: defaultFetchTicketsLimit, + meterProvider: otel.GetMeterProvider(), + logger: slog.Default(), + validateTicketsBeforeAssign: true, } } @@ -66,6 +68,8 @@ func WithBackendMeterProvider(provider metric.MeterProvider) BackendOption { }) } +// FetchTicketsLimit prevents OOM Kill by limiting the number of tickets retrieved at one time. +// The default is 10000. func WithFetchTicketsLimit(limit int64) BackendOption { return BackendOptionFunc(func(options *backendOptions) { options.fetchTicketsLimit = limit @@ -78,7 +82,15 @@ func WithBackendLogger(logger *slog.Logger) BackendOption { }) } -func NewBackend(store statestore.StateStore, assigner Assigner, opts ...BackendOption) (*Backend, error) { +// WithTicketValidationBeforeAssign specifies whether to enable to check for the existence of tickets before assigning them. +// See docs/consistency.md for details. +func WithTicketValidationBeforeAssign(enabled bool) BackendOption { + return BackendOptionFunc(func(options *backendOptions) { + options.validateTicketsBeforeAssign = enabled + }) +} + +func NewBackend(store statestore.BackendStore, assigner Assigner, opts ...BackendOption) (*Backend, error) { options := defaultBackendOptions() for _, opt := range opts { opt.apply(options) @@ -145,10 +157,6 @@ func (b *Backend) Tick(ctx context.Context) error { } if len(matches) > 0 { if err := b.assign(ctx, matches); err != nil { - unmatchedTicketIDs = ticketIDsFromMatches(matches) - if err := b.store.ReleaseTickets(ctx, unmatchedTicketIDs); err != nil { - return fmt.Errorf("failed to release unmatched tickets: %w", err) - } return err } } @@ -209,13 +217,32 @@ func (b *Backend) makeMatches(ctx context.Context, activeTickets []*pb.Ticket) ( } func (b *Backend) assign(ctx context.Context, matches []*pb.Match) error { + var ticketIDsToRelease []string + defer func() { + if len(ticketIDsToRelease) > 0 { + _ = b.store.ReleaseTickets(ctx, ticketIDsToRelease) + } + }() + asgs, err := b.assigner.Assign(ctx, matches) if err != nil { + ticketIDsToRelease = append(ticketIDsToRelease, ticketIDsFromMatches(matches)...) return fmt.Errorf("failed to assign matches: %w", err) } if len(asgs) > 0 { + if b.options.validateTicketsBeforeAssign { + filteredAsgs, notAssigned, err := b.validateTicketsBeforeAssign(ctx, asgs) + ticketIDsToRelease = append(ticketIDsToRelease, notAssigned...) + if err != nil { + return fmt.Errorf("failed to validate ticket before assign: %w", err) + } + asgs = filteredAsgs + } + start := time.Now() - if err := b.store.AssignTickets(ctx, asgs); err != nil { + notAssigned, err := b.store.AssignTickets(ctx, asgs) + ticketIDsToRelease = append(ticketIDsToRelease, notAssigned...) + if err != nil { return fmt.Errorf("failed to assign tickets: %w", err) } b.metrics.recordAssignToRedisLatency(ctx, time.Since(start)) @@ -223,6 +250,35 @@ func (b *Backend) assign(ctx context.Context, matches []*pb.Match) error { return nil } +func (b *Backend) validateTicketsBeforeAssign(ctx context.Context, asgs []*pb.AssignmentGroup) ([]*pb.AssignmentGroup, []string, error) { + allTicketIDs := ticketIDsFromAssignmentGroups(asgs) + tickets, err := b.store.GetTickets(ctx, allTicketIDs) + if err != nil { + return nil, nil, fmt.Errorf("failed to fetch tickets: %w", err) + } + existsMap := map[string]struct{}{} + for _, existingTicketID := range ticketIDsFromTickets(tickets) { + existsMap[existingTicketID] = struct{}{} + } + + validAsgs := make([]*pb.AssignmentGroup, 0, len(asgs)) + var notAssignedTicketIDs []string + for _, asg := range asgs { + isValidAsg := true + for _, ticketID := range asg.TicketIds { + if _, ok := existsMap[ticketID]; !ok { + isValidAsg = false + } + } + if isValidAsg { + validAsgs = append(validAsgs, asg) + } else { + notAssignedTicketIDs = append(notAssignedTicketIDs, asg.TicketIds...) + } + } + return validAsgs, notAssignedTicketIDs, nil +} + func evaluateMatches(ctx context.Context, evaluator Evaluator, matches []*pb.Match) ([]*pb.Match, error) { evaluatedMatches := make([]*pb.Match, 0, len(matches)) evaluatedMatchIDs, err := evaluator.Evaluate(ctx, matches) @@ -263,7 +319,7 @@ func filterTickets(profile *pb.MatchProfile, tickets []*pb.Ticket) (map[string][ func filterUnmatchedTicketIDs(allTickets []*pb.Ticket, matches []*pb.Match) []string { matchedTickets := map[string]struct{}{} for _, match := range matches { - for _, ticketID := range ticketIDs(match.Tickets) { + for _, ticketID := range ticketIDsFromTickets(match.Tickets) { matchedTickets[ticketID] = struct{}{} } } @@ -286,3 +342,11 @@ func ticketIDsFromMatches(matches []*pb.Match) []string { } return ticketIDs } + +func ticketIDsFromAssignmentGroups(asgs []*pb.AssignmentGroup) []string { + var ticketIDs []string + for _, asg := range asgs { + ticketIDs = append(ticketIDs, asg.TicketIds...) + } + return ticketIDs +} diff --git a/backend_test.go b/backend_test.go new file mode 100644 index 0000000..a492338 --- /dev/null +++ b/backend_test.go @@ -0,0 +1,124 @@ +package minimatch + +import ( + "context" + "log" + "testing" + + "github.com/bojand/hri" + "github.com/stretchr/testify/require" + "open-match.dev/open-match/pkg/pb" + + "github.com/castaneai/minimatch/pkg/statestore" +) + +func TestValidateTicketExistenceBeforeAssign(t *testing.T) { + frontStore, backStore, _ := NewStateStoreWithMiniRedis(t) + ctx := context.Background() + + t.Run("ValidationEnabled", func(t *testing.T) { + backend, err := NewBackend(backStore, AssignerFunc(dummyAssign), WithTicketValidationBeforeAssign(true)) + require.NoError(t, err) + backend.AddMatchFunction(anyProfile, MatchFunctionSimple1vs1) + + err = frontStore.CreateTicket(ctx, &pb.Ticket{Id: "t1"}, defaultTicketTTL) + require.NoError(t, err) + err = frontStore.CreateTicket(ctx, &pb.Ticket{Id: "t2"}, defaultTicketTTL) + require.NoError(t, err) + + activeTickets, err := backend.fetchActiveTickets(ctx, defaultFetchTicketsLimit) + require.NoError(t, err) + require.Len(t, activeTickets, 2) + + matches, err := backend.makeMatches(ctx, activeTickets) + require.NoError(t, err) + require.Len(t, matches, 1) + + // Delete "t1" after match is established + err = frontStore.DeleteTicket(ctx, "t1") + require.NoError(t, err) + + // If any ticket is lost in any part of a match, all matched tickets will not be assigned. + err = backend.assign(ctx, matches) + require.NoError(t, err) + + _, err = frontStore.GetAssignment(ctx, "t1") + require.ErrorIs(t, err, statestore.ErrAssignmentNotFound) + _, err = frontStore.GetAssignment(ctx, "t2") + require.ErrorIs(t, err, statestore.ErrAssignmentNotFound) + + // Any remaining tickets for which no assignment is made will return to active again. + activeTickets, err = backend.fetchActiveTickets(ctx, defaultFetchTicketsLimit) + require.NoError(t, err) + require.Len(t, activeTickets, 1) + require.Equal(t, "t2", activeTickets[0].Id) + }) + + t.Run("ValidationDisabled", func(t *testing.T) { + backend, err := NewBackend(backStore, AssignerFunc(dummyAssign), WithTicketValidationBeforeAssign(false)) + require.NoError(t, err) + backend.AddMatchFunction(anyProfile, MatchFunctionSimple1vs1) + + err = frontStore.CreateTicket(ctx, &pb.Ticket{Id: "t3"}, defaultTicketTTL) + require.NoError(t, err) + err = frontStore.CreateTicket(ctx, &pb.Ticket{Id: "t4"}, defaultTicketTTL) + require.NoError(t, err) + + activeTickets, err := backend.fetchActiveTickets(ctx, defaultFetchTicketsLimit) + require.NoError(t, err) + require.Len(t, activeTickets, 2) + + matches, err := backend.makeMatches(ctx, activeTickets) + require.NoError(t, err) + require.Len(t, matches, 1) + + // Delete "t3" after match is established + err = frontStore.DeleteTicket(ctx, "t3") + require.NoError(t, err) + + // Assignment is created even if the ticket disappears because the validation is invalid. + err = backend.assign(ctx, matches) + require.NoError(t, err) + + as1, err := frontStore.GetAssignment(ctx, "t3") + require.NoError(t, err) + require.NotNil(t, as1) + as2, err := frontStore.GetAssignment(ctx, "t4") + require.NoError(t, err) + require.NotNil(t, as2) + require.Equal(t, as1.Connection, as2.Connection) + + activeTickets, err = backend.fetchActiveTickets(ctx, defaultFetchTicketsLimit) + require.NoError(t, err) + require.Empty(t, activeTickets) + }) +} + +var anyProfile = &pb.MatchProfile{ + Name: "test-profile", + Pools: []*pb.Pool{ + {Name: "test-pool"}, + }, +} + +func dummyAssign(ctx context.Context, matches []*pb.Match) ([]*pb.AssignmentGroup, error) { + var asgs []*pb.AssignmentGroup + for _, match := range matches { + tids := ticketIDsFromMatch(match) + conn := hri.Random() + log.Printf("assign '%s' to tickets: %v", conn, tids) + asgs = append(asgs, &pb.AssignmentGroup{ + TicketIds: tids, + Assignment: &pb.Assignment{Connection: conn}, + }) + } + return asgs, nil +} + +func ticketIDsFromMatch(match *pb.Match) []string { + var ids []string + for _, ticket := range match.Tickets { + ids = append(ids, ticket.Id) + } + return ids +} diff --git a/docs/backend_timeline.png b/docs/backend_timeline.png index 1f3a9e4..193037f 100644 Binary files a/docs/backend_timeline.png and b/docs/backend_timeline.png differ diff --git a/docs/consistency.md b/docs/consistency.md new file mode 100644 index 0000000..2511a7d --- /dev/null +++ b/docs/consistency.md @@ -0,0 +1,32 @@ +# Consistency and performance + +This document describes some of the consistency and performance points to consider with minimatch. + +In general, there is a trade-off between consistency and performance. As a distributed system over the Internet, we must accept a certain amount of inconsistency. + +## Invalid Assignment + +If some or all of the tickets in an Assignment are deleted, it becomes invalid. + +minimatch backend processes in the following order in one tick. + +1. fetch active tickets +2. matchmaking +3. allocating resources to established matches +4. assigning the successful match to a ticket (creating an Assignment) +5. the user retrieves the ticket's Assignment through minimatch Frontend + +If a ticket is deleted between 1 and 4 due to expiration or other request cancellation, an invalid Assignment will result. +To prevent this, minimatch Backend checks the existence of the ticket again at step 4. + +**It is important to note that** even with this validation enabled, invalid Assignment cannot be completely prevented. +For example, if a ticket is deleted during step 5, an invalid Assignment will still occur. +Checking for the existence of tickets only reduces the possibility of invalid assignments, but does not prevent them completely. + +In addition, this validation has some impact on performance. +If an invalid Assignment can be handled by the application and the ticket existence check is not needed, +it can be disabled by `WithTicketValidationBeforeAssign(false)`. + +```go +backend, err := minimatch.NewBackend(store, assigner, minimatch.WithTicketValidationBeforeAssign(false)) +``` diff --git a/docs/metrics.md b/docs/metrics.md index a7a7393..9e30d19 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -18,7 +18,7 @@ minimatch Backend exposes metrics in OpenTelemetry format to help monitor perfor The following timeline shows the process flow of minimatch Backend and the corresponding metrics. -![](./backend_timeline.png) +![](./metrics.png) ## Meter provider diff --git a/docs/metrics.png b/docs/metrics.png new file mode 100644 index 0000000..1f3a9e4 Binary files /dev/null and b/docs/metrics.png differ diff --git a/frontend.go b/frontend.go index e7bc84e..0154ef0 100644 --- a/frontend.go +++ b/frontend.go @@ -10,24 +10,60 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" + "google.golang.org/protobuf/types/known/wrapperspb" "open-match.dev/open-match/pkg/pb" "github.com/castaneai/minimatch/pkg/statestore" ) const ( - watchAssignmentInterval = 100 * time.Millisecond + watchAssignmentInterval = 100 * time.Millisecond + defaultTicketTTL = 10 * time.Minute + persistentFieldKeyTicketTTL = "ttl" ) +type FrontendOption interface { + apply(options *frontendOptions) +} + +type FrontendOptionFunc func(options *frontendOptions) + +func (f FrontendOptionFunc) apply(options *frontendOptions) { + f(options) +} + +type frontendOptions struct { + ticketTTL time.Duration +} + +func defaultFrontendOptions() *frontendOptions { + return &frontendOptions{ + ticketTTL: defaultTicketTTL, + } +} + +func WithTicketTTL(ticketTTL time.Duration) FrontendOption { + return FrontendOptionFunc(func(options *frontendOptions) { + options.ticketTTL = ticketTTL + }) +} + type FrontendService struct { - store statestore.StateStore + store statestore.FrontendStore + options *frontendOptions } -func NewFrontendService(store statestore.StateStore) *FrontendService { +func NewFrontendService(store statestore.FrontendStore, opts ...FrontendOption) *FrontendService { + options := defaultFrontendOptions() + for _, opt := range opts { + opt.apply(options) + } return &FrontendService{ - store: store, + store: store, + options: options, } } @@ -38,7 +74,14 @@ func (s *FrontendService) CreateTicket(ctx context.Context, req *pb.CreateTicket } ticket.Id = xid.New().String() ticket.CreateTime = timestamppb.Now() - if err := s.store.CreateTicket(ctx, ticket); err != nil { + ttlVal, err := anypb.New(wrapperspb.Int64(s.options.ticketTTL.Nanoseconds())) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to create ttl value") + } + ticket.PersistentField = map[string]*anypb.Any{ + persistentFieldKeyTicketTTL: ttlVal, + } + if err := s.store.CreateTicket(ctx, ticket, s.options.ticketTTL); err != nil { return nil, err } return ticket, nil diff --git a/loadtest/README.md b/loadtest/README.md index 6aed220..f5172fb 100644 --- a/loadtest/README.md +++ b/loadtest/README.md @@ -19,6 +19,10 @@ export SKAFFOLD_DEFAULT_REPO=xxx-docker.pkg.dev/yyy # Start loadtest skaffold dev --tail=false +# Open Grafana to check the metrics. +# Log in as user: admin, password: test. +open http://127.0.0.1:8080/d/ea66819b-b3f1-4068-808e-4d1c650a46b2/minimatch-loadtest?orgId=1&refresh=5s + # Scale attacker vim skaffold.yaml ``` diff --git a/loadtest/cmd/backend/main.go b/loadtest/cmd/backend/main.go index 2e73883..71f34c7 100644 --- a/loadtest/cmd/backend/main.go +++ b/loadtest/cmd/backend/main.go @@ -64,7 +64,7 @@ func main() { log.Fatalf("failed to create redis store: %+v", err) } ticketCache := cache.New[string, *pb.Ticket]() - store := statestore.NewStoreWithTicketCache(redisStore, ticketCache, + store := statestore.NewBackendStoreWithTicketCache(redisStore, ticketCache, statestore.WithTicketCacheTTL(conf.TicketCacheTTL)) assigner, err := newAssigner(&conf, meterProvider) backend, err := minimatch.NewBackend(store, assigner) @@ -99,7 +99,7 @@ func newAssigner(conf *config, provider metric.MeterProvider) (minimatch.Assigne return assigner, nil } -func newRedisStateStore(conf *config) (statestore.StateStore, error) { +func newRedisStateStore(conf *config) (statestore.BackendStore, error) { copt := rueidis.ClientOption{ InitAddress: []string{conf.RedisAddr}, DisableCache: true, diff --git a/loadtest/cmd/frontend/main.go b/loadtest/cmd/frontend/main.go index e56194c..92ffdc1 100644 --- a/loadtest/cmd/frontend/main.go +++ b/loadtest/cmd/frontend/main.go @@ -46,7 +46,7 @@ func main() { log.Fatalf("failed to create redis store: %+v", err) } ticketCache := cache.New[string, *pb.Ticket]() - store := statestore.NewStoreWithTicketCache(redisStore, ticketCache, + store := statestore.NewFrontendStoreWithTicketCache(redisStore, ticketCache, statestore.WithTicketCacheTTL(conf.TicketCacheTTL)) sv := grpc.NewServer() @@ -63,7 +63,7 @@ func main() { } } -func newRedisStateStore(conf *config) (statestore.StateStore, error) { +func newRedisStateStore(conf *config) (statestore.FrontendStore, error) { copt := rueidis.ClientOption{ InitAddress: []string{conf.RedisAddr}, DisableCache: true, diff --git a/metrics.go b/metrics.go index 91878b8..bf71f79 100644 --- a/metrics.go +++ b/metrics.go @@ -34,7 +34,7 @@ type backendMetrics struct { assignToRedisLatency metric.Float64Histogram } -func newBackendMetrics(provider metric.MeterProvider, store statestore.StateStore) (*backendMetrics, error) { +func newBackendMetrics(provider metric.MeterProvider, store statestore.BackendStore) (*backendMetrics, error) { meter := provider.Meter(metricsScopeName) ticketsFetched, err := meter.Int64Counter("minimatch.backend.tickets_fetched") if err != nil { diff --git a/minimatch.go b/minimatch.go index 1b2499e..546884f 100644 --- a/minimatch.go +++ b/minimatch.go @@ -17,10 +17,11 @@ import ( ) type MiniMatch struct { - store statestore.StateStore - mmfs map[*pb.MatchProfile]MatchFunction - backend *Backend - mu sync.RWMutex + frontendStore statestore.FrontendStore + backendStore statestore.BackendStore + mmfs map[*pb.MatchProfile]MatchFunction + backend *Backend + mu sync.RWMutex } func NewMiniMatchWithRedis(opts ...statestore.RedisOption) (*MiniMatch, error) { @@ -40,14 +41,15 @@ func NewMiniMatchWithRedis(opts ...statestore.RedisOption) (*MiniMatch, error) { return nil, fmt.Errorf("failed to new rueidis locker: %w", err) } store := statestore.NewRedisStore(rc, locker, opts...) - return NewMiniMatch(store), nil + return NewMiniMatch(store, store), nil } -func NewMiniMatch(store statestore.StateStore) *MiniMatch { +func NewMiniMatch(frontendStore statestore.FrontendStore, backendStore statestore.BackendStore) *MiniMatch { return &MiniMatch{ - store: store, - mmfs: map[*pb.MatchProfile]MatchFunction{}, - mu: sync.RWMutex{}, + frontendStore: frontendStore, + backendStore: backendStore, + mmfs: map[*pb.MatchProfile]MatchFunction{}, + mu: sync.RWMutex{}, } } @@ -56,7 +58,7 @@ func (m *MiniMatch) AddMatchFunction(profile *pb.MatchProfile, mmf MatchFunction } func (m *MiniMatch) FrontendService() pb.FrontendServiceServer { - return NewFrontendService(m.store) + return NewFrontendService(m.frontendStore) } func (m *MiniMatch) StartFrontend(listenAddr string) error { @@ -70,7 +72,7 @@ func (m *MiniMatch) StartFrontend(listenAddr string) error { } func (m *MiniMatch) StartBackend(ctx context.Context, assigner Assigner, tickRate time.Duration, opts ...BackendOption) error { - backend, err := NewBackend(m.store, assigner, opts...) + backend, err := NewBackend(m.backendStore, assigner, opts...) if err != nil { return fmt.Errorf("failed to create minimatch backend: %w", err) } @@ -109,15 +111,15 @@ var MatchFunctionSimple1vs1 = MatchFunctionFunc(func(ctx context.Context, profil func newMatch(profile *pb.MatchProfile, tickets []*pb.Ticket) *pb.Match { return &pb.Match{ - MatchId: fmt.Sprintf("%s_%v", profile.Name, ticketIDs(tickets)), + MatchId: fmt.Sprintf("%s_%v", profile.Name, ticketIDsFromTickets(tickets)), MatchProfile: profile.Name, MatchFunction: "Simple1vs1", Tickets: tickets, } } -func ticketIDs(tickets []*pb.Ticket) []string { - var ids []string +func ticketIDsFromTickets(tickets []*pb.Ticket) []string { + ids := make([]string, 0, len(tickets)) for _, ticket := range tickets { ids = append(ids, ticket.Id) } diff --git a/pkg/statestore/backend.go b/pkg/statestore/backend.go new file mode 100644 index 0000000..b4c4029 --- /dev/null +++ b/pkg/statestore/backend.go @@ -0,0 +1,16 @@ +package statestore + +import ( + "context" + + "open-match.dev/open-match/pkg/pb" +) + +type BackendStore interface { + GetTickets(ctx context.Context, ticketIDs []string) ([]*pb.Ticket, error) + GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) + GetTicketCount(ctx context.Context) (int64, error) + ReleaseTickets(ctx context.Context, ticketIDs []string) error + // AssignTickets Returns a list of ticket IDs that have failed assignments; you will need to check that list when err occurs. + AssignTickets(ctx context.Context, asgs []*pb.AssignmentGroup) ([]string, error) +} diff --git a/pkg/statestore/errors.go b/pkg/statestore/errors.go new file mode 100644 index 0000000..b8e951d --- /dev/null +++ b/pkg/statestore/errors.go @@ -0,0 +1,8 @@ +package statestore + +import "errors" + +var ( + ErrTicketNotFound = errors.New("ticket not found") + ErrAssignmentNotFound = errors.New("assignment not found") +) diff --git a/pkg/statestore/frontend.go b/pkg/statestore/frontend.go new file mode 100644 index 0000000..924d800 --- /dev/null +++ b/pkg/statestore/frontend.go @@ -0,0 +1,15 @@ +package statestore + +import ( + "context" + "time" + + "open-match.dev/open-match/pkg/pb" +) + +type FrontendStore interface { + CreateTicket(ctx context.Context, ticket *pb.Ticket, ttl time.Duration) error + DeleteTicket(ctx context.Context, ticketID string) error + GetTicket(ctx context.Context, ticketID string) (*pb.Ticket, error) + GetAssignment(ctx context.Context, ticketID string) (*pb.Assignment, error) +} diff --git a/pkg/statestore/redis.go b/pkg/statestore/redis.go index 04901bf..9192144 100644 --- a/pkg/statestore/redis.go +++ b/pkg/statestore/redis.go @@ -14,7 +14,6 @@ import ( ) const ( - defaultTicketTTL = 10 * time.Minute defaultPendingReleaseTimeout = 1 * time.Minute defaultAssignedDeleteTimeout = 1 * time.Minute ) @@ -26,7 +25,6 @@ type RedisStore struct { } type redisOpts struct { - ticketTTL time.Duration pendingReleaseTimeout time.Duration assignedDeleteTimeout time.Duration // common key prefix in redis @@ -39,7 +37,6 @@ type redisOpts struct { func defaultRedisOpts() *redisOpts { return &redisOpts{ - ticketTTL: defaultTicketTTL, pendingReleaseTimeout: defaultPendingReleaseTimeout, assignedDeleteTimeout: defaultAssignedDeleteTimeout, keyPrefix: "", @@ -58,12 +55,6 @@ func (f RedisOptionFunc) apply(opts *redisOpts) { f(opts) } -func WithTicketTTL(ticketTTL time.Duration) RedisOption { - return RedisOptionFunc(func(opts *redisOpts) { - opts.ticketTTL = ticketTTL - }) -} - func WithPendingReleaseTimeout(pendingReleaseTimeout time.Duration) RedisOption { return RedisOptionFunc(func(opts *redisOpts) { opts.pendingReleaseTimeout = pendingReleaseTimeout @@ -106,7 +97,7 @@ func NewRedisStore(client rueidis.Client, locker rueidislock.Locker, opts ...Red } } -func (s *RedisStore) CreateTicket(ctx context.Context, ticket *pb.Ticket) error { +func (s *RedisStore) CreateTicket(ctx context.Context, ticket *pb.Ticket, ttl time.Duration) error { data, err := encodeTicket(ticket) if err != nil { return err @@ -115,7 +106,7 @@ func (s *RedisStore) CreateTicket(ctx context.Context, ticket *pb.Ticket) error s.client.B().Set(). Key(redisKeyTicketData(s.opts.keyPrefix, ticket.Id)). Value(rueidis.BinaryString(data)). - Ex(s.opts.ticketTTL). + Ex(ttl). Build(), s.client.B().Sadd(). Key(redisKeyTicketIndex(s.opts.keyPrefix)). @@ -292,8 +283,8 @@ func (s *RedisStore) ReleaseTickets(ctx context.Context, ticketIDs []string) err return nil } -func (s *RedisStore) AssignTickets(ctx context.Context, asgs []*pb.AssignmentGroup) error { - var assignedTicketIDs []string +func (s *RedisStore) AssignTickets(ctx context.Context, asgs []*pb.AssignmentGroup) ([]string, error) { + var assignedTicketIDs, notAssignedTicketIDs []string for _, asg := range asgs { if len(asg.TicketIds) == 0 { continue @@ -304,20 +295,21 @@ func (s *RedisStore) AssignTickets(ctx context.Context, asgs []*pb.AssignmentGro redis = s.opts.assignmentSpaceClient } if err := s.setAssignmentToTickets(ctx, redis, asg.TicketIds, asg.Assignment); err != nil { - return err + notAssignedTicketIDs = append(notAssignedTicketIDs, asg.TicketIds...) + return notAssignedTicketIDs, err } assignedTicketIDs = append(assignedTicketIDs, asg.TicketIds...) } if len(assignedTicketIDs) > 0 { // de-index assigned tickets if err := s.deIndexTickets(ctx, assignedTicketIDs); err != nil { - return fmt.Errorf("failed to deindex assigned tickets: %w", err) + return notAssignedTicketIDs, fmt.Errorf("failed to deindex assigned tickets: %w", err) } if err := s.setTicketsExpiration(ctx, assignedTicketIDs, s.opts.assignedDeleteTimeout); err != nil { - return err + return notAssignedTicketIDs, err } } - return nil + return notAssignedTicketIDs, nil } func (s *RedisStore) GetTicketCount(ctx context.Context) (int64, error) { diff --git a/pkg/statestore/redis_test.go b/pkg/statestore/redis_test.go index 331ee61..0267fd3 100644 --- a/pkg/statestore/redis_test.go +++ b/pkg/statestore/redis_test.go @@ -19,6 +19,7 @@ import ( const ( defaultGetTicketLimit = 10000 + defaultTicketTTL = 10 * time.Minute ) func newTestRedisStore(t *testing.T, addr string, opts ...RedisOption) *RedisStore { @@ -44,8 +45,8 @@ func TestPendingRelease(t *testing.T) { store := newTestRedisStore(t, mr.Addr()) ctx := context.Background() - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test1"})) - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test2"})) + require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test1"}, defaultTicketTTL)) + require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test2"}, defaultTicketTTL)) activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultGetTicketLimit) require.NoError(t, err) require.ElementsMatch(t, activeTicketIDs, []string{"test1", "test2"}) @@ -69,7 +70,7 @@ func TestPendingReleaseTimeout(t *testing.T) { ctx := context.Background() // 1 active ticket - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test"})) + require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test"}, defaultTicketTTL)) // get active tickets for proposal (active -> pending) activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultGetTicketLimit) @@ -96,8 +97,8 @@ func TestAssignedDeleteTimeout(t *testing.T) { store := newTestRedisStore(t, mr.Addr()) ctx := context.Background() - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test1"})) - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test2"})) + require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test1"}, defaultTicketTTL)) + require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test2"}, defaultTicketTTL)) activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultGetTicketLimit) require.NoError(t, err) require.ElementsMatch(t, activeTicketIDs, []string{"test1", "test2"}) @@ -108,9 +109,10 @@ func TestAssignedDeleteTimeout(t *testing.T) { require.Error(t, err, ErrAssignmentNotFound) as := &pb.Assignment{Connection: "test-assign"} - require.NoError(t, store.AssignTickets(ctx, []*pb.AssignmentGroup{ + _, err = store.AssignTickets(ctx, []*pb.AssignmentGroup{ {TicketIds: []string{"test1", "test2"}, Assignment: as}, - })) + }) + require.NoError(t, err) for i := 0; i < 3; i++ { as1, err := store.GetAssignment(ctx, "test1") require.NoError(t, err) @@ -138,11 +140,11 @@ func TestAssignedDeleteTimeout(t *testing.T) { func TestTicketTTL(t *testing.T) { mr := miniredis.RunT(t) ticketTTL := 5 * time.Second - store := newTestRedisStore(t, mr.Addr(), WithTicketTTL(ticketTTL)) + store := newTestRedisStore(t, mr.Addr()) ctx := context.Background() mustCreateTicket := func(id string) { - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: id})) + require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: id}, ticketTTL)) ticket, err := store.GetTicket(ctx, id) require.NoError(t, err) require.Equal(t, id, ticket.Id) @@ -191,7 +193,7 @@ func TestConcurrentFetchActiveTickets(t *testing.T) { ticketCount := 1000 concurrency := 1000 for i := 0; i < ticketCount; i++ { - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: xid.New().String()})) + require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: xid.New().String()}, defaultTicketTTL)) } eg, _ := errgroup.WithContext(ctx) @@ -228,7 +230,7 @@ func TestConcurrentFetchAndAssign(t *testing.T) { concurrency := 1000 for i := 0; i < ticketCount; i++ { ticket := &pb.Ticket{Id: xid.New().String()} - require.NoError(t, store.CreateTicket(ctx, ticket)) + require.NoError(t, store.CreateTicket(ctx, ticket, defaultTicketTTL)) } var mu sync.Mutex @@ -258,7 +260,7 @@ func TestConcurrentFetchAndAssign(t *testing.T) { mu.Unlock() } } - if err := store.AssignTickets(ctx, asgs); err != nil { + if _, err := store.AssignTickets(ctx, asgs); err != nil { return err } return nil @@ -275,21 +277,21 @@ func TestReadReplica(t *testing.T) { store := newTestRedisStore(t, mr.Addr(), WithRedisReadReplicaClient(newRedisClient(t, readReplica.Addr()))) replicaStore := newTestRedisStore(t, readReplica.Addr()) - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "t1"})) + require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "t1"}, defaultTicketTTL)) t1, err := store.GetTicket(ctx, "t1") require.NoError(t, err) require.Equal(t, "t1", t1.Id) // emulate replication - require.NoError(t, replicaStore.CreateTicket(ctx, &pb.Ticket{Id: "t1"})) + require.NoError(t, replicaStore.CreateTicket(ctx, &pb.Ticket{Id: "t1"}, defaultTicketTTL)) t1, err = store.GetTicket(ctx, "t1") require.NoError(t, err) require.Equal(t, "t1", t1.Id) // t2 is replicated but have different params - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "t2", SearchFields: &pb.SearchFields{Tags: []string{"primary"}}})) - require.NoError(t, replicaStore.CreateTicket(ctx, &pb.Ticket{Id: "t2", SearchFields: &pb.SearchFields{Tags: []string{"replica"}}})) + require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "t2", SearchFields: &pb.SearchFields{Tags: []string{"primary"}}}, defaultTicketTTL)) + require.NoError(t, replicaStore.CreateTicket(ctx, &pb.Ticket{Id: "t2", SearchFields: &pb.SearchFields{Tags: []string{"replica"}}}, defaultTicketTTL)) // t3 is not replicated - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "t3"})) + require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "t3"}, defaultTicketTTL)) tickets, err := replicaStore.GetTickets(ctx, []string{"t1", "t2", "t3"}) require.NoError(t, err) diff --git a/pkg/statestore/statestore.go b/pkg/statestore/statestore.go deleted file mode 100644 index ee8ebb9..0000000 --- a/pkg/statestore/statestore.go +++ /dev/null @@ -1,27 +0,0 @@ -package statestore - -import ( - "context" - "errors" - - "open-match.dev/open-match/pkg/pb" -) - -var ( - ErrTicketNotFound = errors.New("ticket not found") - ErrAssignmentNotFound = errors.New("assignment not found") -) - -type StateStore interface { - CreateTicket(ctx context.Context, ticket *pb.Ticket) error - DeleteTicket(ctx context.Context, ticketID string) error - // GetTicket is an API to retrieve the status of a single ticket and is called from Frontend. - GetTicket(ctx context.Context, ticketID string) (*pb.Ticket, error) - // GetTickets on the other hand, retrieves multiple tickets at once and is intended to be called from Backend. - GetTickets(ctx context.Context, ticketIDs []string) ([]*pb.Ticket, error) - GetAssignment(ctx context.Context, ticketID string) (*pb.Assignment, error) - GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) - GetTicketCount(ctx context.Context) (int64, error) - ReleaseTickets(ctx context.Context, ticketIDs []string) error - AssignTickets(ctx context.Context, asgs []*pb.AssignmentGroup) error -} diff --git a/pkg/statestore/ticketcache.go b/pkg/statestore/ticketcache.go index 016f057..6e2733e 100644 --- a/pkg/statestore/ticketcache.go +++ b/pkg/statestore/ticketcache.go @@ -28,41 +28,43 @@ func (f ticketCacheOptionFunc) apply(options *ticketCacheOptions) { f(options) } +// WithTicketCacheTTL specifies the time to cache ticket data in-memory. +// The default is 10 seconds. func WithTicketCacheTTL(ttl time.Duration) TicketCacheOption { return ticketCacheOptionFunc(func(options *ticketCacheOptions) { options.ttl = ttl }) } -// StoreWithTicketCache caches GetTicket results in-memory with TTL -type StoreWithTicketCache struct { - origin StateStore +// FrontendStoreWithTicketCache caches GetTicket results in-memory with TTL +type FrontendStoreWithTicketCache struct { + origin FrontendStore ticketCache *cache.Cache[string, *pb.Ticket] options *ticketCacheOptions } -func NewStoreWithTicketCache(origin StateStore, ticketCache *cache.Cache[string, *pb.Ticket], opts ...TicketCacheOption) *StoreWithTicketCache { +func NewFrontendStoreWithTicketCache(origin FrontendStore, ticketCache *cache.Cache[string, *pb.Ticket], opts ...TicketCacheOption) *FrontendStoreWithTicketCache { options := defaultTicketCacheOptions() for _, o := range opts { o.apply(options) } - return &StoreWithTicketCache{ + return &FrontendStoreWithTicketCache{ origin: origin, ticketCache: ticketCache, options: options, } } -func (s *StoreWithTicketCache) CreateTicket(ctx context.Context, ticket *pb.Ticket) error { - return s.origin.CreateTicket(ctx, ticket) +func (s *FrontendStoreWithTicketCache) CreateTicket(ctx context.Context, ticket *pb.Ticket, ttl time.Duration) error { + return s.origin.CreateTicket(ctx, ticket, ttl) } -func (s *StoreWithTicketCache) DeleteTicket(ctx context.Context, ticketID string) error { +func (s *FrontendStoreWithTicketCache) DeleteTicket(ctx context.Context, ticketID string) error { s.ticketCache.Delete(ticketID) return s.origin.DeleteTicket(ctx, ticketID) } -func (s *StoreWithTicketCache) GetTicket(ctx context.Context, ticketID string) (*pb.Ticket, error) { +func (s *FrontendStoreWithTicketCache) GetTicket(ctx context.Context, ticketID string) (*pb.Ticket, error) { if ticket, hit := s.ticketCache.Get(ticketID); hit { return ticket, nil } @@ -74,7 +76,30 @@ func (s *StoreWithTicketCache) GetTicket(ctx context.Context, ticketID string) ( return ticket, nil } -func (s *StoreWithTicketCache) GetTickets(ctx context.Context, ticketIDs []string) ([]*pb.Ticket, error) { +func (s *FrontendStoreWithTicketCache) GetAssignment(ctx context.Context, ticketID string) (*pb.Assignment, error) { + return s.origin.GetAssignment(ctx, ticketID) +} + +// BackendStoreWithTicketCache caches GetTickets results in-memory with TTL +type BackendStoreWithTicketCache struct { + origin BackendStore + ticketCache *cache.Cache[string, *pb.Ticket] + options *ticketCacheOptions +} + +func NewBackendStoreWithTicketCache(origin BackendStore, ticketCache *cache.Cache[string, *pb.Ticket], opts ...TicketCacheOption) *BackendStoreWithTicketCache { + options := defaultTicketCacheOptions() + for _, o := range opts { + o.apply(options) + } + return &BackendStoreWithTicketCache{ + origin: origin, + ticketCache: ticketCache, + options: options, + } +} + +func (s *BackendStoreWithTicketCache) GetTickets(ctx context.Context, ticketIDs []string) ([]*pb.Ticket, error) { var noCachedTicketIDs []string tickets := make([]*pb.Ticket, 0, len(ticketIDs)) for _, ticketID := range ticketIDs { @@ -97,22 +122,18 @@ func (s *StoreWithTicketCache) GetTickets(ctx context.Context, ticketIDs []strin return tickets, nil } -func (s *StoreWithTicketCache) GetAssignment(ctx context.Context, ticketID string) (*pb.Assignment, error) { - return s.origin.GetAssignment(ctx, ticketID) -} - -func (s *StoreWithTicketCache) GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) { +func (s *BackendStoreWithTicketCache) GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) { return s.origin.GetActiveTicketIDs(ctx, limit) } -func (s *StoreWithTicketCache) ReleaseTickets(ctx context.Context, ticketIDs []string) error { +func (s *BackendStoreWithTicketCache) ReleaseTickets(ctx context.Context, ticketIDs []string) error { return s.origin.ReleaseTickets(ctx, ticketIDs) } -func (s *StoreWithTicketCache) AssignTickets(ctx context.Context, asgs []*pb.AssignmentGroup) error { +func (s *BackendStoreWithTicketCache) AssignTickets(ctx context.Context, asgs []*pb.AssignmentGroup) ([]string, error) { return s.origin.AssignTickets(ctx, asgs) } -func (s *StoreWithTicketCache) GetTicketCount(ctx context.Context) (int64, error) { +func (s *BackendStoreWithTicketCache) GetTicketCount(ctx context.Context) (int64, error) { return s.origin.GetTicketCount(ctx) } diff --git a/pkg/statestore/ticketcache_test.go b/pkg/statestore/ticketcache_test.go index c6786dc..fc4ff05 100644 --- a/pkg/statestore/ticketcache_test.go +++ b/pkg/statestore/ticketcache_test.go @@ -16,24 +16,25 @@ func TestTicketCache(t *testing.T) { ticketCache := cache.New[string, *pb.Ticket]() ttl := 500 * time.Millisecond redisStore := newTestRedisStore(t, mr.Addr()) - store := NewStoreWithTicketCache(redisStore, ticketCache, WithTicketCacheTTL(ttl)) + frontendStore := NewFrontendStoreWithTicketCache(redisStore, ticketCache, WithTicketCacheTTL(ttl)) + backendStore := NewBackendStoreWithTicketCache(redisStore, ticketCache, WithTicketCacheTTL(ttl)) ctx := context.Background() - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "t1"})) - t1, err := store.GetTicket(ctx, "t1") + require.NoError(t, frontendStore.CreateTicket(ctx, &pb.Ticket{Id: "t1"}, defaultTicketTTL)) + t1, err := frontendStore.GetTicket(ctx, "t1") require.NoError(t, err) require.Equal(t, "t1", t1.Id) require.NoError(t, redisStore.DeleteTicket(ctx, "t1")) // it can be retrieved from the cache even if deleted - t1, err = store.GetTicket(ctx, "t1") + t1, err = frontendStore.GetTicket(ctx, "t1") require.NoError(t, err) require.Equal(t, "t1", t1.Id) time.Sleep(ttl + 10*time.Millisecond) - _, err = store.GetTicket(ctx, "t1") + _, err = frontendStore.GetTicket(ctx, "t1") require.Error(t, err, ErrTicketNotFound) getTicketIDs := func(l []*pb.Ticket) []string { @@ -44,10 +45,10 @@ func TestTicketCache(t *testing.T) { return tids } - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "t2"})) - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "t3"})) - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "t4"})) - ts, err := store.GetTickets(ctx, []string{"t2", "t3", "t4", "t5"}) + require.NoError(t, frontendStore.CreateTicket(ctx, &pb.Ticket{Id: "t2"}, defaultTicketTTL)) + require.NoError(t, frontendStore.CreateTicket(ctx, &pb.Ticket{Id: "t3"}, defaultTicketTTL)) + require.NoError(t, frontendStore.CreateTicket(ctx, &pb.Ticket{Id: "t4"}, defaultTicketTTL)) + ts, err := backendStore.GetTickets(ctx, []string{"t2", "t3", "t4", "t5"}) require.NoError(t, err) require.ElementsMatch(t, []string{"t2", "t3", "t4"}, getTicketIDs(ts)) @@ -55,14 +56,14 @@ func TestTicketCache(t *testing.T) { require.NoError(t, redisStore.DeleteTicket(ctx, "t3")) // "t3" is still in cache - ts, err = store.GetTickets(ctx, []string{"t2", "t3", "t4"}) + ts, err = backendStore.GetTickets(ctx, []string{"t2", "t3", "t4"}) require.NoError(t, err) require.ElementsMatch(t, []string{"t2", "t3", "t4"}, getTicketIDs(ts)) // expires "t3" cache time.Sleep(ttl + 10*time.Millisecond) - ts, err = store.GetTickets(ctx, []string{"t2", "t3", "t4"}) + ts, err = backendStore.GetTickets(ctx, []string{"t2", "t3", "t4"}) require.NoError(t, err) require.ElementsMatch(t, []string{"t2", "t4"}, getTicketIDs(ts)) } diff --git a/testing.go b/testing.go index 2800a6a..edc0cda 100644 --- a/testing.go +++ b/testing.go @@ -44,6 +44,12 @@ func WithTestServerBackendTick(tick time.Duration) TestServerOption { }) } +func WithTestServerFrontendOptions(frontendOptions ...FrontendOption) TestServerOption { + return TestServerOptionFunc(func(opts *testServerOptions) { + opts.frontendOptions = frontendOptions + }) +} + func WithTestServerBackendOptions(backendOptions ...BackendOption) TestServerOption { return TestServerOptionFunc(func(opts *testServerOptions) { opts.backendOptions = backendOptions @@ -51,15 +57,17 @@ func WithTestServerBackendOptions(backendOptions ...BackendOption) TestServerOpt } type testServerOptions struct { + frontendListenAddr string + frontendOptions []FrontendOption backendTick time.Duration backendOptions []BackendOption - frontendListenAddr string } func defaultTestServerOpts() *testServerOptions { return &testServerOptions{ - backendTick: 1 * time.Second, frontendListenAddr: "127.0.0.1:0", // random port + frontendOptions: nil, + backendTick: 1 * time.Second, backendOptions: nil, } } @@ -94,7 +102,7 @@ func (ts *TestFrontendServer) Stop() { ts.sv.Stop() } -func NewTestFrontendServer(t *testing.T, store statestore.StateStore, addr string) *TestFrontendServer { +func NewTestFrontendServer(t *testing.T, store statestore.FrontendStore, addr string, opts ...FrontendOption) *TestFrontendServer { // start frontend lis, err := net.Listen("tcp", addr) if err != nil { @@ -102,7 +110,7 @@ func NewTestFrontendServer(t *testing.T, store statestore.StateStore, addr strin } t.Cleanup(func() { _ = lis.Close() }) sv := grpc.NewServer() - pb.RegisterFrontendServiceServer(sv, NewFrontendService(store)) + pb.RegisterFrontendServiceServer(sv, NewFrontendService(store, opts...)) ts := &TestFrontendServer{ sv: sv, lis: lis, @@ -118,13 +126,13 @@ func RunTestServer(t *testing.T, matchFunctions map[*pb.MatchProfile]MatchFuncti for _, o := range opts { o.apply(options) } - store, _ := NewStateStoreWithMiniRedis(t) - mm := NewMiniMatch(store) + front, back, _ := NewStateStoreWithMiniRedis(t) + mm := NewMiniMatch(front, back) for profile, mmf := range matchFunctions { mm.AddMatchFunction(profile, mmf) } - frontend := NewTestFrontendServer(t, store, options.frontendListenAddr) + frontend := NewTestFrontendServer(t, front, options.frontendListenAddr) ts := &TestServer{mm: mm, frontend: frontend, options: options} // start backend @@ -194,7 +202,7 @@ func waitForTCPServerReady(t *testing.T, addr string, timeout time.Duration) { } } -func NewStateStoreWithMiniRedis(t *testing.T) (statestore.StateStore, *miniredis.Miniredis) { +func NewStateStoreWithMiniRedis(t *testing.T) (statestore.FrontendStore, statestore.BackendStore, *miniredis.Miniredis) { mr := miniredis.RunT(t) copt := rueidis.ClientOption{InitAddress: []string{mr.Addr()}, DisableCache: true} redis, err := rueidis.NewClient(copt) @@ -207,5 +215,6 @@ func NewStateStoreWithMiniRedis(t *testing.T) (statestore.StateStore, *miniredis if err != nil { t.Fatalf("failed to create rueidis locker: %+v", err) } - return statestore.NewRedisStore(redis, locker), mr + redisStore := statestore.NewRedisStore(redis, locker) + return redisStore, redisStore, mr } diff --git a/tests/intergration_test.go b/tests/intergration_test.go index 8c6ed74..f575567 100644 --- a/tests/intergration_test.go +++ b/tests/intergration_test.go @@ -276,21 +276,21 @@ func TestEvaluator(t *testing.T) { } func TestAssignerError(t *testing.T) { - store, _ := minimatch.NewStateStoreWithMiniRedis(t) + frontStore, backStore, _ := minimatch.NewStateStoreWithMiniRedis(t) invalidAssigner := minimatch.AssignerFunc(func(ctx context.Context, matches []*pb.Match) ([]*pb.AssignmentGroup, error) { return nil, errors.New("error") }) - invalidBackend, err := minimatch.NewBackend(store, invalidAssigner) + invalidBackend, err := minimatch.NewBackend(backStore, invalidAssigner) require.NoError(t, err) invalidBackend.AddMatchFunction(anyProfile, minimatch.MatchFunctionSimple1vs1) validAssigner := minimatch.AssignerFunc(dummyAssign) - validBackend, err := minimatch.NewBackend(store, validAssigner) + validBackend, err := minimatch.NewBackend(backStore, validAssigner) require.NoError(t, err) validBackend.AddMatchFunction(anyProfile, minimatch.MatchFunctionSimple1vs1) ctx := context.Background() - frontend := minimatch.NewTestFrontendServer(t, store, "127.0.0.1:0") + frontend := minimatch.NewTestFrontendServer(t, frontStore, "127.0.0.1:0") frontend.Start(t) fc := frontend.Dial(t) t1, err := fc.CreateTicket(ctx, &pb.CreateTicketRequest{Ticket: &pb.Ticket{}})