Skip to content

Commit

Permalink
Merge pull request #26 from castaneai/fix-total-tickets-metrics
Browse files Browse the repository at this point in the history
add GetTicketCount
  • Loading branch information
castaneai authored Mar 19, 2024
2 parents a48f110 + 94a3a06 commit 1b8e6f9
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 49 deletions.
11 changes: 3 additions & 8 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewBackend(store statestore.StateStore, assigner Assigner, opts ...BackendO
for _, opt := range opts {
opt.apply(options)
}
metrics, err := newBackendMetrics(options.meterProvider)
metrics, err := newBackendMetrics(options.meterProvider, store)
if err != nil {
return nil, fmt.Errorf("failed to create backend metrics: %w", err)
}
Expand Down Expand Up @@ -157,18 +157,13 @@ func (b *Backend) Tick(ctx context.Context) error {

func (b *Backend) fetchActiveTickets(ctx context.Context, limit int64) ([]*pb.Ticket, error) {
start := time.Now()
activeTicketIDs, err := b.store.GetActiveTicketIDs(ctx)
activeTicketIDs, err := b.store.GetActiveTicketIDs(ctx, limit)
if err != nil {
return nil, fmt.Errorf("failed to fetch active ticket IDs: %w", err)
}
activeTicketCount := int64(len(activeTicketIDs))
b.metrics.recordTicketCountActive(ctx, activeTicketCount)
if activeTicketCount == 0 {
if len(activeTicketIDs) == 0 {
return nil, nil
}
if activeTicketCount > limit {
activeTicketIDs = activeTicketIDs[:limit]
}
tickets, err := b.store.GetTickets(ctx, activeTicketIDs)
if err != nil {
return nil, fmt.Errorf("failed to fetch active tickets: %w", err)
Expand Down
41 changes: 19 additions & 22 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package minimatch

import (
"context"
"sync/atomic"
"fmt"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"open-match.dev/open-match/pkg/pb"

"github.com/castaneai/minimatch/pkg/statestore"
)

const (
Expand All @@ -19,8 +21,6 @@ var (
defaultHistogramBuckets = []float64{
.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10,
}
keyTicketStatus = attribute.Key("status")
attributeActiveTicket = keyTicketStatus.String("active")
)

type backendMetrics struct {
Expand All @@ -32,11 +32,9 @@ type backendMetrics struct {
matchFunctionLatency metric.Float64Histogram
assignerLatency metric.Float64Histogram
assignToRedisLatency metric.Float64Histogram

ticketCountActive atomic.Int64
}

func newBackendMetrics(provider metric.MeterProvider) (*backendMetrics, error) {
func newBackendMetrics(provider metric.MeterProvider, store statestore.StateStore) (*backendMetrics, error) {
meter := provider.Meter(metricsScopeName)
ticketsFetched, err := meter.Int64Counter("minimatch.backend.tickets_fetched")
if err != nil {
Expand Down Expand Up @@ -70,26 +68,29 @@ func newBackendMetrics(provider metric.MeterProvider) (*backendMetrics, error) {
if err != nil {
return nil, err
}
metrics := &backendMetrics{
meter: meter,
ticketsFetched: ticketsFetched,
ticketsAssigned: ticketsAssigned,
fetchTicketsLatency: fetchTicketsLatency,
matchFunctionLatency: matchFunctionLatency,
assignerLatency: assignerLatency,
assignToRedisLatency: assignToRedisLatency,
}
ticketCount, err := meter.Int64ObservableUpDownCounter("minimatch.store.tickets.count",
metric.WithDescription("Total number of tickets. Do not sum this counter, as a single backend counts all tickets."),
metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error {
o.Observe(metrics.ticketCountActive.Load(), metric.WithAttributes(attributeActiveTicket))
count, err := store.GetTicketCount(ctx)
if err != nil {
return fmt.Errorf("failed to get ticket count from store: %w", err)
}
o.Observe(count)
return nil
}))
if err != nil {
return nil, err
}
metrics.ticketCount = ticketCount
return metrics, nil
return &backendMetrics{
meter: meter,
ticketsFetched: ticketsFetched,
ticketsAssigned: ticketsAssigned,
fetchTicketsLatency: fetchTicketsLatency,
matchFunctionLatency: matchFunctionLatency,
assignerLatency: assignerLatency,
assignToRedisLatency: assignToRedisLatency,
ticketCount: ticketCount,
}, nil
}

func (m *backendMetrics) recordMatchFunctionLatency(ctx context.Context, seconds float64, matchProfile *pb.MatchProfile) {
Expand All @@ -116,10 +117,6 @@ func (m *backendMetrics) recordAssignToRedisLatency(ctx context.Context, latency
m.assignToRedisLatency.Record(ctx, latency.Seconds())
}

func (m *backendMetrics) recordTicketCountActive(ctx context.Context, count int64) {
m.ticketCountActive.Store(count)
}

type matchFunctionWithMetrics struct {
mmf MatchFunction
metrics *backendMetrics
Expand Down
20 changes: 16 additions & 4 deletions pkg/statestore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (s *RedisStore) GetAssignment(ctx context.Context, ticketID string) (*pb.As
// GetActiveTicketIDs may also retrieve tickets deleted by TTL.
// This is because the ticket index and Ticket data are stored in separate keys.
// The next `GetTicket` or `GetTickets` call will resolve this inconsistency.
func (s *RedisStore) GetActiveTicketIDs(ctx context.Context) ([]string, error) {
func (s *RedisStore) GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) {
// Acquire a lock to prevent multiple backends from fetching the same Ticket.
// In order to avoid race conditions with other Ticket Index changes, get tickets and set them to pending state should be done atomically.
lockedCtx, unlock, err := s.locker.WithContext(ctx, redisKeyFetchTicketsLock(s.opts.keyPrefix))
Expand All @@ -214,7 +214,7 @@ func (s *RedisStore) GetActiveTicketIDs(ctx context.Context) ([]string, error) {
}
defer unlock()

allTicketIDs, err := s.getAllTicketIDs(lockedCtx)
allTicketIDs, err := s.getAllTicketIDs(lockedCtx, limit)
if err != nil {
return nil, fmt.Errorf("failed to get all ticket IDs: %w", err)
}
Expand All @@ -235,8 +235,8 @@ func (s *RedisStore) GetActiveTicketIDs(ctx context.Context) ([]string, error) {
return activeTicketIDs, nil
}

func (s *RedisStore) getAllTicketIDs(ctx context.Context) ([]string, error) {
resp := s.client.Do(ctx, s.client.B().Smembers().Key(redisKeyTicketIndex(s.opts.keyPrefix)).Build())
func (s *RedisStore) getAllTicketIDs(ctx context.Context, limit int64) ([]string, error) {
resp := s.client.Do(ctx, s.client.B().Srandmember().Key(redisKeyTicketIndex(s.opts.keyPrefix)).Count(limit).Build())
if err := resp.Error(); err != nil {
if rueidis.IsRedisNil(err) {
return nil, nil
Expand Down Expand Up @@ -322,6 +322,18 @@ func (s *RedisStore) AssignTickets(ctx context.Context, asgs []*pb.AssignmentGro
return nil
}

func (s *RedisStore) GetTicketCount(ctx context.Context) (int64, error) {
resp := s.client.Do(ctx, s.client.B().Scard().Key(redisKeyTicketIndex(s.opts.keyPrefix)).Build())
if err := resp.Error(); err != nil {
return 0, fmt.Errorf("failed to count tickets index: %w", err)
}
count, err := resp.AsInt64()
if err != nil {
return 0, fmt.Errorf("failed to decode redis response of SCARD as int64: %w", err)
}
return count, nil
}

func (s *RedisStore) getTicket(ctx context.Context, client rueidis.Client, ticketID string) (*pb.Ticket, error) {
resp := client.Do(ctx, client.B().Get().Key(redisKeyTicketData(s.opts.keyPrefix, ticketID)).Build())
if err := resp.Error(); err != nil {
Expand Down
28 changes: 16 additions & 12 deletions pkg/statestore/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"open-match.dev/open-match/pkg/pb"
)

const (
defaultGetTicketLimit = 10000
)

func newTestRedisStore(t *testing.T, addr string, opts ...RedisOption) *RedisStore {
copt := rueidis.ClientOption{InitAddress: []string{addr}, DisableCache: true}
locker, err := rueidislock.NewLocker(rueidislock.LockerOption{ClientOption: copt})
Expand All @@ -42,18 +46,18 @@ func TestPendingRelease(t *testing.T) {

require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test1"}))
require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test2"}))
activeTicketIDs, err := store.GetActiveTicketIDs(ctx)
activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test1", "test2"})

activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.Empty(t, activeTicketIDs)

// release one ticket
require.NoError(t, store.ReleaseTickets(ctx, []string{"test1"}))

activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test1"})
}
Expand All @@ -68,12 +72,12 @@ func TestPendingReleaseTimeout(t *testing.T) {
require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test"}))

// get active tickets for proposal (active -> pending)
activeTicketIDs, err := store.GetActiveTicketIDs(ctx)
activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.Len(t, activeTicketIDs, 1)

// 0 active ticket
activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.Len(t, activeTicketIDs, 0)

Expand All @@ -82,7 +86,7 @@ func TestPendingReleaseTimeout(t *testing.T) {
require.NoError(t, err)

// 1 active ticket
activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.Len(t, activeTicketIDs, 1)
}
Expand All @@ -94,7 +98,7 @@ func TestAssignedDeleteTimeout(t *testing.T) {

require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test1"}))
require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test2"}))
activeTicketIDs, err := store.GetActiveTicketIDs(ctx)
activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test1", "test2"})

Expand Down Expand Up @@ -155,7 +159,7 @@ func TestTicketTTL(t *testing.T) {
_, err = store.GetTicket(ctx, "test1")
require.Error(t, err, ErrTicketNotFound)

activeTicketIDs, err := store.GetActiveTicketIDs(ctx)
activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.NotContains(t, activeTicketIDs, "test1")

Expand All @@ -176,7 +180,7 @@ func TestTicketTTL(t *testing.T) {
// This is because the ticket index and Ticket data are stored in separate keys.

// In this example, "test2" and "test3" were deleted by TTL, but remain in the ticket index.
activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test2", "test3", "test4"})
err = store.ReleaseTickets(ctx, []string{"test2", "test3", "test4"})
Expand All @@ -189,7 +193,7 @@ func TestTicketTTL(t *testing.T) {

// Because we called GetTickets, "test2" and "test3" which were deleted by TTL,
// were deleted from the ticket index as well.
activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test4"})
}
Expand All @@ -211,7 +215,7 @@ func TestConcurrentFetchActiveTickets(t *testing.T) {
duplicateMap := map[string]struct{}{}
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
ticketIDs, err := store.GetActiveTicketIDs(ctx)
ticketIDs, err := store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
if err != nil {
return err
}
Expand Down Expand Up @@ -248,7 +252,7 @@ func TestConcurrentFetchAndAssign(t *testing.T) {
eg, _ := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
ticketIDs, err := store.GetActiveTicketIDs(ctx)
ticketIDs, err := store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/statestore/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type StateStore interface {
GetTicket(ctx context.Context, ticketID string) (*pb.Ticket, error)
GetTickets(ctx context.Context, ticketIDs []string) ([]*pb.Ticket, error)
GetAssignment(ctx context.Context, ticketID string) (*pb.Assignment, error)
GetActiveTicketIDs(ctx context.Context) ([]string, error)
GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error)
GetTicketCount(ctx context.Context) (int64, error)
ReleaseTickets(ctx context.Context, ticketIDs []string) error
AssignTickets(ctx context.Context, asgs []*pb.AssignmentGroup) error
}
8 changes: 6 additions & 2 deletions pkg/statestore/ticketcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func (s *StoreWithTicketCache) GetAssignment(ctx context.Context, ticketID strin
return s.origin.GetAssignment(ctx, ticketID)
}

func (s *StoreWithTicketCache) GetActiveTicketIDs(ctx context.Context) ([]string, error) {
return s.origin.GetActiveTicketIDs(ctx)
func (s *StoreWithTicketCache) GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) {
return s.origin.GetActiveTicketIDs(ctx, limit)
}

func (s *StoreWithTicketCache) ReleaseTickets(ctx context.Context, ticketIDs []string) error {
Expand All @@ -112,3 +112,7 @@ func (s *StoreWithTicketCache) ReleaseTickets(ctx context.Context, ticketIDs []s
func (s *StoreWithTicketCache) AssignTickets(ctx context.Context, asgs []*pb.AssignmentGroup) error {
return s.origin.AssignTickets(ctx, asgs)
}

func (s *StoreWithTicketCache) GetTicketCount(ctx context.Context) (int64, error) {
return s.origin.GetTicketCount(ctx)
}

0 comments on commit 1b8e6f9

Please sign in to comment.