Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add historic storage for keeping track of flagsets #242

Merged
merged 4 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/splitio/gincache v1.0.1
github.com/splitio/go-split-commons/v5 v5.0.1-0.20230926022914-2101c4dc74c0
github.com/splitio/go-toolkit/v5 v5.3.2-0.20230920032539-d08915cf020a
github.com/stretchr/testify v1.8.4
go.etcd.io/bbolt v1.3.6
)

Expand All @@ -19,6 +20,7 @@ require (
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
Expand All @@ -34,6 +36,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.0.4 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg=
Expand Down
205 changes: 205 additions & 0 deletions splitio/proxy/storage/optimized/historic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package optimized

import (
"slices"
"sort"
"strings"
"sync"

"github.com/splitio/go-split-commons/v5/dtos"
)

type HistoricChanges struct {
data []FeatureView
mutex sync.RWMutex
}

func (h *HistoricChanges) GetUpdatedSince(since int64, flagSets []string) []FeatureView {
slices.Sort(flagSets)
h.mutex.RLock()
views := h.findNewerThan(since)
toRet := copyAndFilter(views, flagSets, since)
h.mutex.RUnlock()
return toRet
}

func (h *HistoricChanges) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, newCN int64) {
h.mutex.Lock()
h.updateFrom(toAdd)
h.updateFrom(toRemove)
sort.Slice(h.data, func(i, j int) bool { return h.data[i].LastUpdated < h.data[j].LastUpdated })
h.mutex.Unlock()
}

// public interface ends here

func (h *HistoricChanges) updateFrom(source []dtos.SplitDTO) {
for idx := range source {
if current := h.findByName(source[idx].Name); current != nil {
current.updateFrom(&source[idx])
} else {
var toAdd FeatureView
toAdd.updateFrom(&source[idx])
h.data = append(h.data, toAdd)
}
}

}

func (h *HistoricChanges) findByName(name string) *FeatureView {
for idx := range h.data {
if h.data[idx].Name == name { // TODO(mredolatti): optimize!
return &h.data[idx]
}
}
return nil
}

func (h *HistoricChanges) findNewerThan(since int64) []FeatureView {
// precondition: h.data is sorted by CN
start := sort.Search(len(h.data), func(i int) bool { return h.data[i].LastUpdated > since })
if start == len(h.data) {
return nil
}
return h.data[start:]
}

type FeatureView struct {
Name string
Active bool
LastUpdated int64
TrafficTypeName string
FlagSets []FlagSetView
}

func (f *FeatureView) updateFrom(s *dtos.SplitDTO) {
f.Name = s.Name
f.Active = s.Status == "ACTIVE"
f.LastUpdated = s.ChangeNumber
f.TrafficTypeName = s.TrafficTypeName
f.updateFlagsets(s.Sets, s.ChangeNumber)
}

func (f *FeatureView) updateFlagsets(incoming []string, lastUpdated int64) {
// TODO(mredolatti): need a copy of incoming?
for idx := range f.FlagSets {
if itemIdx := slices.Index(incoming, f.FlagSets[idx].Name); itemIdx != -1 {
if !f.FlagSets[idx].Active { // Association changed from ARCHIVED to ACTIVE
f.FlagSets[idx].Active = true
f.FlagSets[idx].LastUpdated = lastUpdated

}

// "soft delete" the item so that it's not traversed later on
// (replaces the item with the last one, clears the latter and shrinks the slice by 1)
incoming[itemIdx] = incoming[len(incoming)-1]
incoming[len(incoming)-1] = ""
incoming = incoming[:len(incoming)-1]

} else { // Association changed from ARCHIVED to ACTIVE
f.FlagSets[idx].Active = false
f.FlagSets[idx].LastUpdated = lastUpdated
}
}

for idx := range incoming {
// the only leftover in `incoming` should be the items that were not
// present in the feature's previously associated flagsets, so they're new & active
f.FlagSets = append(f.FlagSets, FlagSetView{
Name: incoming[idx],
Active: true,
LastUpdated: lastUpdated,
})
}

sort.Slice(f.FlagSets, func(i, j int) bool { return f.FlagSets[i].Name < f.FlagSets[j].Name })
}

func (f *FeatureView) findFlagSetByName(name string) *FlagSetView {
// precondition: f.FlagSets is sorted by name
idx := sort.Search(len(f.FlagSets), func(i int) bool { return f.FlagSets[i].Name >= name })
if idx != len(f.FlagSets) && name == f.FlagSets[idx].Name {
return &f.FlagSets[idx]
}
return nil
}

func (f *FeatureView) clone() FeatureView {
toRet := FeatureView{
Name: f.Name,
Active: f.Active,
LastUpdated: f.LastUpdated,
TrafficTypeName: f.TrafficTypeName,
FlagSets: make([]FlagSetView, len(f.FlagSets)),
}
copy(toRet.FlagSets, f.FlagSets) // we need to deep clone to avoid race conditions
return toRet

}

func copyAndFilter(views []FeatureView, sets []string, since int64) []FeatureView {
// precondition: f.Flagsets is sorted by name
// precondition: sets is sorted
toRet := make([]FeatureView, 0, len(views))

// this code computes the intersection in o(views * )
for idx := range views {
if featureShouldBeReturned(&views[idx], since, sets) {
toRet = append(toRet, views[idx].clone())
}
}
return toRet
}

func featureShouldBeReturned(view *FeatureView, since int64, sets []string) bool {

// if fetching from sratch & the feature is not active,
// or it hasn't been updated since `since`, it shouldn't even be considered for being returned
if since == -1 && !view.Active || view.LastUpdated < since {
return false
}

// all updated features should be returned if no set filter is being used
if len(sets) == 0 {
return true
}

// compare the sets for intersection of user supplied sets with currently active ones.
// takes linear o(len(feature.sets) + len(sets)) time since both the incoming sets are sorted
viewFlagSetIndex, requestedSetIndex := 0, 0
for viewFlagSetIndex < len(view.FlagSets) {
switch strings.Compare(view.FlagSets[viewFlagSetIndex].Name, sets[requestedSetIndex]) {
case 0: // we got a match
fsinfo := view.FlagSets[viewFlagSetIndex]
// if an association is active, it's considered and the Feature is added to the result set.
// if an association is inactive and we're fetching from scratch (since=-1), it's not considered.
// if an association was already inactive at the time of the provided `since`, it's not considered.
// if an association was active on the provided `since` and now isn't, the feature IS added to the returned payload.
if fsinfo.Active || (since > -1 && since < fsinfo.LastUpdated) {
return true
}
viewFlagSetIndex++
incrUpTo(&requestedSetIndex, len(sets))
case -1:
viewFlagSetIndex++
case 1:
if incrUpTo(&requestedSetIndex, len(sets)); requestedSetIndex+1 == len(sets) {
viewFlagSetIndex++
}
}
}
return false
}

type FlagSetView struct {
Name string
Active bool
LastUpdated int64
}

func incrUpTo(toIncr *int, limit int) {
if *toIncr+1 >= limit {
return
}
*toIncr++
}
Loading