Skip to content

Commit

Permalink
Fix flakes in ./topdown/cache
Browse files Browse the repository at this point in the history
Signed-off-by: Evan Anderson <evan@stacklok.com>
  • Loading branch information
evankanderson committed Nov 22, 2024
1 parent d55dfb0 commit 78fd357
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 14 deletions.
10 changes: 6 additions & 4 deletions topdown/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,18 @@ func NewInterQueryCache(config *Config) InterQueryCache {
func NewInterQueryCacheWithContext(ctx context.Context, config *Config) InterQueryCache {
iqCache := newCache(config)
if iqCache.staleEntryEvictionTimePeriodSeconds() > 0 {
cleanupTicker := time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second)
go func() {
// Tick twice as fast as the eviction time, mostly to help tests run faster
// Elements will actually be removed between 0 and 0.5 * staleEntryEvictionTimePeriodSeconds
// _after_ their nominal expiry. This avoids needing to fetch the time on every Get,
// at the cost that elements live somewhat longer than their expiry time.
cleanupTicker := time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()*500) * time.Millisecond)
defer cleanupTicker.Stop()
for {
select {
case <-cleanupTicker.C:
cleanupTicker.Stop()
iqCache.cleanStaleValues()
cleanupTicker = time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second)
case <-ctx.Done():
cleanupTicker.Stop()
return
}
}
Expand Down
36 changes: 26 additions & 10 deletions topdown/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,18 @@ func TestInsertWithExpiryAndEviction(t *testing.T) {
t.Fatalf("Unexpected error %v", err)
}

cache := NewInterQueryCacheWithContext(context.Background(), config)
// This starts a background ticker at 0.5 stale_entry_eviction_period_seconds to clean up items.
// This means that we need to add around 600ms to any sleeps to ensure that the cleanup routine runs.
ctx, cancel := context.WithCancel(context.Background())
cache := NewInterQueryCacheWithContext(ctx, config)
t.Cleanup(cancel)

cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20)
cache.InsertWithExpiry(ast.StringTerm("force_evicted_foo").Value, cacheValue, time.Now().Add(100*time.Second))
if fetchedCacheValue, found := cache.Get(ast.StringTerm("force_evicted_foo").Value); !found {
t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue)
}
cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second))
cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(900*time.Millisecond))
if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found {
t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue)
}
Expand All @@ -425,7 +429,7 @@ func TestInsertWithExpiryAndEviction(t *testing.T) {
}

// Ensure stale entries clean up routine runs at least once
time.Sleep(2 * time.Second)
time.Sleep(1100 * time.Millisecond)

// Entry deleted even though not expired because force evicted when foo is inserted
if fetchedCacheValue, found := cache.Get(ast.StringTerm("force_evicted_foo").Value); found {
Expand Down Expand Up @@ -454,20 +458,24 @@ func TestInsertHighTTLWithStaleEntryCleanup(t *testing.T) {
t.Fatalf("Unexpected error %v", err)
}

cache := NewInterQueryCacheWithContext(context.Background(), config)
// This starts a background ticker at 0.5 stale_entry_eviction_period_seconds to clean up items.
// This means that we need to add around 600ms to any sleeps to ensure that the cleanup routine runs.
ctx, cancel := context.WithCancel(context.Background())
cache := NewInterQueryCacheWithContext(ctx, config)
t.Cleanup(cancel)

cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20)
cache.InsertWithExpiry(ast.StringTerm("high_ttl_foo").Value, cacheValue, time.Now().Add(100*time.Second))
if fetchedCacheValue, found := cache.Get(ast.StringTerm("high_ttl_foo").Value); !found {
t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue)
}
cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second))
cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(900*time.Millisecond))
if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found {
t.Fatalf("Expected cache entry with value %v, found no entry", fetchedCacheValue)
}

// Ensure stale entries clean up routine runs at least once
time.Sleep(2 * time.Second)
time.Sleep(1100 * time.Millisecond)

cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Now().Add(10*time.Second))
if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found {
Expand Down Expand Up @@ -497,7 +505,11 @@ func TestInsertHighTTLWithoutStaleEntryCleanup(t *testing.T) {
t.Fatalf("Unexpected error %v", err)
}

cache := NewInterQueryCacheWithContext(context.Background(), config)
// This starts a background ticker at 0.5 stale_entry_eviction_period_seconds to clean up items.
// This means that we need to add around 600ms to any sleeps to ensure that the cleanup routine runs.
ctx, cancel := context.WithCancel(context.Background())
cache := NewInterQueryCacheWithContext(ctx, config)
t.Cleanup(cancel)

cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20)
cache.InsertWithExpiry(ast.StringTerm("high_ttl_foo").Value, cacheValue, time.Now().Add(100*time.Second))
Expand Down Expand Up @@ -537,14 +549,18 @@ func TestZeroExpiryTime(t *testing.T) {
t.Fatalf("Unexpected error %v", err)
}

cache := NewInterQueryCacheWithContext(context.Background(), config)
// This starts a background ticker at 0.5 stale_entry_eviction_period_seconds to clean up items.
// This means that we need to add around 600ms to any sleeps to ensure that the cleanup routine runs.
ctx, cancel := context.WithCancel(context.Background())
cache := NewInterQueryCacheWithContext(ctx, config)
t.Cleanup(cancel)
cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20)
cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Time{})
if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found {
t.Fatalf("Expected cache entry with value %v for foo, found %v", cacheValue, fetchedCacheValue)
}

time.Sleep(2 * time.Second)
time.Sleep(600 * time.Millisecond)

// Stale entry cleanup routine skips zero time cache entries
if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found {
Expand Down Expand Up @@ -574,7 +590,7 @@ func TestCancelNewInterQueryCacheWithContext(t *testing.T) {
}

cancel()
time.Sleep(2 * time.Second)
time.Sleep(600 * time.Millisecond)

// Stale entry cleanup routine stopped as context was cancelled
if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found {
Expand Down

0 comments on commit 78fd357

Please sign in to comment.