Skip to content

Commit

Permalink
Merge pull request #28 from castaneai/check-ticket-exists-after-match
Browse files Browse the repository at this point in the history
backend: Check for existence before assigning tickets.
  • Loading branch information
castaneai authored Aug 28, 2024
2 parents fdfd034 + 2bf475c commit d2755f1
Show file tree
Hide file tree
Showing 23 changed files with 456 additions and 144 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ See [Differences from Open Match](./docs/differences.md) for details.
Is minimatch really just a mini? No, it is not! Despite its name, minimatch has scalability.
Please see [Scalable minimatch](./docs/scalable.md).

## Consistency and performance

Please see the following docs for consistency and performance to consider in minimatch.

[Consistency and performance](./docs/consistency.md)

## Metrics

minimatch Backend exposes metrics in OpenTelemetry format to help monitor performance.
Expand Down
96 changes: 80 additions & 16 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
)

type Backend struct {
store statestore.StateStore
store statestore.BackendStore
mmfs map[*pb.MatchProfile]MatchFunction
mmfMu sync.RWMutex
assigner Assigner
Expand All @@ -39,18 +39,20 @@ func (f BackendOptionFunc) apply(options *backendOptions) {
}

type backendOptions struct {
evaluator Evaluator
fetchTicketsLimit int64
meterProvider metric.MeterProvider
logger *slog.Logger
evaluator Evaluator
fetchTicketsLimit int64
meterProvider metric.MeterProvider
logger *slog.Logger
validateTicketsBeforeAssign bool
}

func defaultBackendOptions() *backendOptions {
return &backendOptions{
evaluator: nil,
fetchTicketsLimit: defaultFetchTicketsLimit,
meterProvider: otel.GetMeterProvider(),
logger: slog.Default(),
evaluator: nil,
fetchTicketsLimit: defaultFetchTicketsLimit,
meterProvider: otel.GetMeterProvider(),
logger: slog.Default(),
validateTicketsBeforeAssign: true,
}
}

Expand All @@ -66,6 +68,8 @@ func WithBackendMeterProvider(provider metric.MeterProvider) BackendOption {
})
}

// FetchTicketsLimit prevents OOM Kill by limiting the number of tickets retrieved at one time.
// The default is 10000.
func WithFetchTicketsLimit(limit int64) BackendOption {
return BackendOptionFunc(func(options *backendOptions) {
options.fetchTicketsLimit = limit
Expand All @@ -78,7 +82,15 @@ func WithBackendLogger(logger *slog.Logger) BackendOption {
})
}

func NewBackend(store statestore.StateStore, assigner Assigner, opts ...BackendOption) (*Backend, error) {
// WithTicketValidationBeforeAssign specifies whether to enable to check for the existence of tickets before assigning them.
// See docs/consistency.md for details.
func WithTicketValidationBeforeAssign(enabled bool) BackendOption {
return BackendOptionFunc(func(options *backendOptions) {
options.validateTicketsBeforeAssign = enabled
})
}

func NewBackend(store statestore.BackendStore, assigner Assigner, opts ...BackendOption) (*Backend, error) {
options := defaultBackendOptions()
for _, opt := range opts {
opt.apply(options)
Expand Down Expand Up @@ -145,10 +157,6 @@ func (b *Backend) Tick(ctx context.Context) error {
}
if len(matches) > 0 {
if err := b.assign(ctx, matches); err != nil {
unmatchedTicketIDs = ticketIDsFromMatches(matches)
if err := b.store.ReleaseTickets(ctx, unmatchedTicketIDs); err != nil {
return fmt.Errorf("failed to release unmatched tickets: %w", err)
}
return err
}
}
Expand Down Expand Up @@ -209,20 +217,68 @@ func (b *Backend) makeMatches(ctx context.Context, activeTickets []*pb.Ticket) (
}

func (b *Backend) assign(ctx context.Context, matches []*pb.Match) error {
var ticketIDsToRelease []string
defer func() {
if len(ticketIDsToRelease) > 0 {
_ = b.store.ReleaseTickets(ctx, ticketIDsToRelease)
}
}()

asgs, err := b.assigner.Assign(ctx, matches)
if err != nil {
ticketIDsToRelease = append(ticketIDsToRelease, ticketIDsFromMatches(matches)...)
return fmt.Errorf("failed to assign matches: %w", err)
}
if len(asgs) > 0 {
if b.options.validateTicketsBeforeAssign {
filteredAsgs, notAssigned, err := b.validateTicketsBeforeAssign(ctx, asgs)
ticketIDsToRelease = append(ticketIDsToRelease, notAssigned...)
if err != nil {
return fmt.Errorf("failed to validate ticket before assign: %w", err)
}
asgs = filteredAsgs
}

start := time.Now()
if err := b.store.AssignTickets(ctx, asgs); err != nil {
notAssigned, err := b.store.AssignTickets(ctx, asgs)
ticketIDsToRelease = append(ticketIDsToRelease, notAssigned...)
if err != nil {
return fmt.Errorf("failed to assign tickets: %w", err)
}
b.metrics.recordAssignToRedisLatency(ctx, time.Since(start))
}
return nil
}

func (b *Backend) validateTicketsBeforeAssign(ctx context.Context, asgs []*pb.AssignmentGroup) ([]*pb.AssignmentGroup, []string, error) {
allTicketIDs := ticketIDsFromAssignmentGroups(asgs)
tickets, err := b.store.GetTickets(ctx, allTicketIDs)
if err != nil {
return nil, nil, fmt.Errorf("failed to fetch tickets: %w", err)
}
existsMap := map[string]struct{}{}
for _, existingTicketID := range ticketIDsFromTickets(tickets) {
existsMap[existingTicketID] = struct{}{}
}

validAsgs := make([]*pb.AssignmentGroup, 0, len(asgs))
var notAssignedTicketIDs []string
for _, asg := range asgs {
isValidAsg := true
for _, ticketID := range asg.TicketIds {
if _, ok := existsMap[ticketID]; !ok {
isValidAsg = false
}
}
if isValidAsg {
validAsgs = append(validAsgs, asg)
} else {
notAssignedTicketIDs = append(notAssignedTicketIDs, asg.TicketIds...)
}
}
return validAsgs, notAssignedTicketIDs, nil
}

func evaluateMatches(ctx context.Context, evaluator Evaluator, matches []*pb.Match) ([]*pb.Match, error) {
evaluatedMatches := make([]*pb.Match, 0, len(matches))
evaluatedMatchIDs, err := evaluator.Evaluate(ctx, matches)
Expand Down Expand Up @@ -263,7 +319,7 @@ func filterTickets(profile *pb.MatchProfile, tickets []*pb.Ticket) (map[string][
func filterUnmatchedTicketIDs(allTickets []*pb.Ticket, matches []*pb.Match) []string {
matchedTickets := map[string]struct{}{}
for _, match := range matches {
for _, ticketID := range ticketIDs(match.Tickets) {
for _, ticketID := range ticketIDsFromTickets(match.Tickets) {
matchedTickets[ticketID] = struct{}{}
}
}
Expand All @@ -286,3 +342,11 @@ func ticketIDsFromMatches(matches []*pb.Match) []string {
}
return ticketIDs
}

func ticketIDsFromAssignmentGroups(asgs []*pb.AssignmentGroup) []string {
var ticketIDs []string
for _, asg := range asgs {
ticketIDs = append(ticketIDs, asg.TicketIds...)
}
return ticketIDs
}
124 changes: 124 additions & 0 deletions backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package minimatch

import (
"context"
"log"
"testing"

"github.com/bojand/hri"
"github.com/stretchr/testify/require"
"open-match.dev/open-match/pkg/pb"

"github.com/castaneai/minimatch/pkg/statestore"
)

func TestValidateTicketExistenceBeforeAssign(t *testing.T) {
frontStore, backStore, _ := NewStateStoreWithMiniRedis(t)
ctx := context.Background()

t.Run("ValidationEnabled", func(t *testing.T) {
backend, err := NewBackend(backStore, AssignerFunc(dummyAssign), WithTicketValidationBeforeAssign(true))
require.NoError(t, err)
backend.AddMatchFunction(anyProfile, MatchFunctionSimple1vs1)

err = frontStore.CreateTicket(ctx, &pb.Ticket{Id: "t1"}, defaultTicketTTL)
require.NoError(t, err)
err = frontStore.CreateTicket(ctx, &pb.Ticket{Id: "t2"}, defaultTicketTTL)
require.NoError(t, err)

activeTickets, err := backend.fetchActiveTickets(ctx, defaultFetchTicketsLimit)
require.NoError(t, err)
require.Len(t, activeTickets, 2)

matches, err := backend.makeMatches(ctx, activeTickets)
require.NoError(t, err)
require.Len(t, matches, 1)

// Delete "t1" after match is established
err = frontStore.DeleteTicket(ctx, "t1")
require.NoError(t, err)

// If any ticket is lost in any part of a match, all matched tickets will not be assigned.
err = backend.assign(ctx, matches)
require.NoError(t, err)

_, err = frontStore.GetAssignment(ctx, "t1")
require.ErrorIs(t, err, statestore.ErrAssignmentNotFound)
_, err = frontStore.GetAssignment(ctx, "t2")
require.ErrorIs(t, err, statestore.ErrAssignmentNotFound)

// Any remaining tickets for which no assignment is made will return to active again.
activeTickets, err = backend.fetchActiveTickets(ctx, defaultFetchTicketsLimit)
require.NoError(t, err)
require.Len(t, activeTickets, 1)
require.Equal(t, "t2", activeTickets[0].Id)
})

t.Run("ValidationDisabled", func(t *testing.T) {
backend, err := NewBackend(backStore, AssignerFunc(dummyAssign), WithTicketValidationBeforeAssign(false))
require.NoError(t, err)
backend.AddMatchFunction(anyProfile, MatchFunctionSimple1vs1)

err = frontStore.CreateTicket(ctx, &pb.Ticket{Id: "t3"}, defaultTicketTTL)
require.NoError(t, err)
err = frontStore.CreateTicket(ctx, &pb.Ticket{Id: "t4"}, defaultTicketTTL)
require.NoError(t, err)

activeTickets, err := backend.fetchActiveTickets(ctx, defaultFetchTicketsLimit)
require.NoError(t, err)
require.Len(t, activeTickets, 2)

matches, err := backend.makeMatches(ctx, activeTickets)
require.NoError(t, err)
require.Len(t, matches, 1)

// Delete "t3" after match is established
err = frontStore.DeleteTicket(ctx, "t3")
require.NoError(t, err)

// Assignment is created even if the ticket disappears because the validation is invalid.
err = backend.assign(ctx, matches)
require.NoError(t, err)

as1, err := frontStore.GetAssignment(ctx, "t3")
require.NoError(t, err)
require.NotNil(t, as1)
as2, err := frontStore.GetAssignment(ctx, "t4")
require.NoError(t, err)
require.NotNil(t, as2)
require.Equal(t, as1.Connection, as2.Connection)

activeTickets, err = backend.fetchActiveTickets(ctx, defaultFetchTicketsLimit)
require.NoError(t, err)
require.Empty(t, activeTickets)
})
}

var anyProfile = &pb.MatchProfile{
Name: "test-profile",
Pools: []*pb.Pool{
{Name: "test-pool"},
},
}

func dummyAssign(ctx context.Context, matches []*pb.Match) ([]*pb.AssignmentGroup, error) {
var asgs []*pb.AssignmentGroup
for _, match := range matches {
tids := ticketIDsFromMatch(match)
conn := hri.Random()
log.Printf("assign '%s' to tickets: %v", conn, tids)
asgs = append(asgs, &pb.AssignmentGroup{
TicketIds: tids,
Assignment: &pb.Assignment{Connection: conn},
})
}
return asgs, nil
}

func ticketIDsFromMatch(match *pb.Match) []string {
var ids []string
for _, ticket := range match.Tickets {
ids = append(ids, ticket.Id)
}
return ids
}
Binary file modified docs/backend_timeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
32 changes: 32 additions & 0 deletions docs/consistency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Consistency and performance

This document describes some of the consistency and performance points to consider with minimatch.

In general, there is a trade-off between consistency and performance. As a distributed system over the Internet, we must accept a certain amount of inconsistency.

## Invalid Assignment

If some or all of the tickets in an Assignment are deleted, it becomes invalid.

minimatch backend processes in the following order in one tick.

1. fetch active tickets
2. matchmaking
3. allocating resources to established matches
4. assigning the successful match to a ticket (creating an Assignment)
5. the user retrieves the ticket's Assignment through minimatch Frontend

If a ticket is deleted between 1 and 4 due to expiration or other request cancellation, an invalid Assignment will result.
To prevent this, minimatch Backend checks the existence of the ticket again at step 4.

**It is important to note that** even with this validation enabled, invalid Assignment cannot be completely prevented.
For example, if a ticket is deleted during step 5, an invalid Assignment will still occur.
Checking for the existence of tickets only reduces the possibility of invalid assignments, but does not prevent them completely.

In addition, this validation has some impact on performance.
If an invalid Assignment can be handled by the application and the ticket existence check is not needed,
it can be disabled by `WithTicketValidationBeforeAssign(false)`.

```go
backend, err := minimatch.NewBackend(store, assigner, minimatch.WithTicketValidationBeforeAssign(false))
```
2 changes: 1 addition & 1 deletion docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ minimatch Backend exposes metrics in OpenTelemetry format to help monitor perfor

The following timeline shows the process flow of minimatch Backend and the corresponding metrics.

![](./backend_timeline.png)
![](./metrics.png)


## Meter provider
Expand Down
Binary file added docs/metrics.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit d2755f1

Please sign in to comment.