-
Notifications
You must be signed in to change notification settings - Fork 2
/
cuckoo_filter_redis.go
366 lines (342 loc) · 13.7 KB
/
cuckoo_filter_redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
/*
Provides data structures and methods for creating probabilistic filters.
This package provides implementation Cuckoo Filter.
*/
package gostatix
import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"strconv"
"github.com/kwertop/gostatix/internal/util"
"github.com/redis/go-redis/v9"
)
// CuckooFilterRedis is the Redis backed implementation of BaseCuckooFilter
// _buckets_ is a slice of BucketRedis
// _key_ holds the Redis key to the list which has the Redis keys of all buckets
// _metadataKey_ is used to store the additional information about CuckooFilterRedis
// for retrieving the filter by the Redis key
type CuckooFilterRedis struct {
buckets map[string]*BucketRedis
key string
metadataKey string
*AbstractCuckooFilter
}
// NewCuckooFilter creates a new CuckooFilterRedis
// _size_ is the size of the BucketRedis slice
// _bucketSize_ is the size of the individual buckets inside the bucket slice
// _fingerPrintLength_ is fingerprint hash of the input to be inserted/removed/lookup
func NewCuckooFilterRedis(size, bucketSize, fingerPrintLength uint64) (*CuckooFilterRedis, error) {
return NewCuckooFilterRedisWithRetries(size, bucketSize, fingerPrintLength, 500)
}
// NewCuckooFilterWithRetries creates new CuckooFilterRedis with specified _retries_
// _size_ is the size of the BucketRedis slice
// _bucketSize_ is the size of the individual buckets inside the bucket slice
// _fingerPrintLength_ is fingerprint hash of the input to be inserted/removed/lookup
// _retries_ is the number of retries that the Cuckoo filter makes if the first two indices obtained
// after hashing the input is already occupied in the filter
func NewCuckooFilterRedisWithRetries(size, bucketSize, fingerPrintLength, retries uint64) (*CuckooFilterRedis, error) {
filterKey := util.GenerateRandomString(16)
baseFilter := makeAbstractCuckooFilter(size, bucketSize, fingerPrintLength, retries)
metadataKey := util.GenerateRandomString(16)
filter := &CuckooFilterRedis{make(map[string]*BucketRedis, size), filterKey, metadataKey, baseFilter}
err := filter.setMetadata(0)
if err != nil {
return nil, fmt.Errorf("gostatix: error while creating cuckoo filter redis. error: %v", err)
}
err = filter.initBuckets()
if err != nil {
return nil, fmt.Errorf("gostatix: error while creating cuckoo filter redis: %v", err)
}
return filter, nil
}
// NewCuckooFilterWithErrorRate creates an CuckooFilterRedis with a specified false positive
// rate : _errorRate_
// _size_ is the size of the BucketRedis slice
// _bucketSize_ is the size of the individual buckets inside the bucket slice
// _retries_ is the number of retries that the Cuckoo filter makes if the first two indices obtained
// _errorRate_ is the desired false positive rate of the filter. fingerPrintLength is calculated
// according to this error rate.
func NewCuckooFilterRedisWithErrorRate(size, bucketSize, retries uint64, errorRate float64) (*CuckooFilterRedis, error) {
fingerPrintLength := util.CalculateFingerPrintLength(size, errorRate)
capacity := uint64(math.Ceil(float64(size) * 0.955 / float64(bucketSize)))
return NewCuckooFilterRedisWithRetries(capacity, bucketSize, fingerPrintLength, retries)
}
// NewCuckooFilterRedisFromKey is used to create a new Redis backed Cuckoo Filter from the
// _metadataKey_ (the Redis key used to store the metadata about the cuckoo filter) passed
// For this to work, value should be present in Redis at _key_
func NewCuckooFilterRedisFromKey(metadataKey string) (*CuckooFilterRedis, error) {
values, err := getRedisClient().HGetAll(context.Background(), metadataKey).Result()
if err != nil {
return nil, fmt.Errorf("gostatix: error while fetching hash from redis, error: %v", err)
}
size, _ := strconv.Atoi(values["size"])
bucketSize, _ := strconv.Atoi(values["bucketSize"])
fingerPrintLength, _ := strconv.Atoi(values["fingerPrintLength"])
retries, _ := strconv.Atoi(values["retries"])
cuckooFilter := &CuckooFilterRedis{}
baseFilter := makeAbstractCuckooFilter(uint64(size), uint64(bucketSize), uint64(fingerPrintLength), uint64(retries))
cuckooFilter.AbstractCuckooFilter = baseFilter
cuckooFilter.metadataKey = metadataKey
cuckooFilter.key = values["key"]
cuckooFilter.buckets = make(map[string]*BucketRedis)
cuckooFilter.localInitBuckets()
return cuckooFilter, nil
}
// Key returns the value of the _key_ to the Redis list where all bucket keys are stored
func (cuckooFilter CuckooFilterRedis) Key() string {
return cuckooFilter.key
}
// MetadataKey return _metadataKey_
func (cuckooFilter CuckooFilterRedis) MetadataKey() string {
return cuckooFilter.metadataKey
}
// Length returns the current length of the Cuckoo Filter or the current number of entries
// present in the Cuckoo Filter. In CuckooFilterRedis, length is tracked using a key-value pair
// in Redis and modified using INCRBY Redis command
func (cuckooFilter *CuckooFilterRedis) Length() uint64 {
value, _ := getRedisClient().HGet(context.Background(), cuckooFilter.metadataKey, "length").Int64()
return uint64(value)
}
// Insert writes the _data_ in the Cuckoo Filter for future lookup
// _destructive_ parameter is used to specify if the previous ordering of the
// present entries is to be preserved after the retries (if that case arises)
func (cuckooFilter *CuckooFilterRedis) Insert(data []byte, destructive bool) bool {
fingerPrint, firstBucketIndex, secondBucketIndex, _ := cuckooFilter.getPositions(data)
fIndex := cuckooFilter.getIndexKey(firstBucketIndex)
sIndex := cuckooFilter.getIndexKey(secondBucketIndex)
if cuckooFilter.buckets[fIndex].isFree() {
cuckooFilter.buckets[fIndex].add(fingerPrint)
} else if cuckooFilter.buckets[sIndex].isFree() {
cuckooFilter.buckets[sIndex].add(fingerPrint)
} else {
var index uint64
if rand.Float32() < 0.5 {
index = firstBucketIndex
} else {
index = secondBucketIndex
}
indexKey := "cuckoo_" + cuckooFilter.key + "_bucket_" + strconv.FormatUint(index, 10)
currFingerPrint := fingerPrint
var items []entry
for i := uint64(0); i < cuckooFilter.retries; i++ {
randIndex := uint64(math.Ceil(rand.Float64() * float64(cuckooFilter.buckets[indexKey].getLength()-1)))
prevFingerPrint, _ := cuckooFilter.buckets[indexKey].at(randIndex)
items = append(items, entry{prevFingerPrint, index, randIndex})
cuckooFilter.buckets[indexKey].set(randIndex, currFingerPrint)
hash := getHash([]byte(prevFingerPrint))
newIndex := (index ^ hash) % uint64(len(cuckooFilter.buckets))
newIndexKey := "cuckoo_" + cuckooFilter.key + "_bucket_" + strconv.FormatUint(newIndex, 10)
if cuckooFilter.buckets[newIndexKey].isFree() {
cuckooFilter.buckets[newIndexKey].add(prevFingerPrint)
cuckooFilter.incrLength()
return true
}
}
if !destructive {
for i := len(items) - 1; i >= 0; i-- {
item := items[i]
firstIndexKey := "cuckoo_" + cuckooFilter.key + "_bucket_" + strconv.FormatUint(item.firstIndex, 10)
cuckooFilter.buckets[firstIndexKey].set(item.secondIndex, item.fingerPrint)
}
}
panic("cannot insert element, cuckoofilter is full")
}
cuckooFilter.incrLength()
return true
}
// Lookup returns true if the _data_ is present in the Cuckoo Filter, else false
func (cuckooFilter *CuckooFilterRedis) Lookup(data []byte) (bool, error) {
fingerPrint, firstBucketIndex, secondBucketIndex, _ := cuckooFilter.getPositions(data)
fIndex := cuckooFilter.getIndexKey(firstBucketIndex)
sIndex := cuckooFilter.getIndexKey(secondBucketIndex)
isAtFirstIndex, err := cuckooFilter.buckets[fIndex].lookup(fingerPrint)
if err != nil {
return false, fmt.Errorf("gostatix: error while lookup of data: %v", err)
}
if isAtFirstIndex {
return isAtFirstIndex, nil
}
isAtSecondIndex, err := cuckooFilter.buckets[sIndex].lookup(fingerPrint)
if err != nil {
return false, fmt.Errorf("gostatix: error while lookup of data: %v", err)
}
return isAtFirstIndex || isAtSecondIndex, nil
}
// Remove deletes the _data_ from the Cuckoo Filter
func (cuckooFilter *CuckooFilterRedis) Remove(data []byte) (bool, error) {
fingerPrint, firstBucketIndex, secondBucketIndex, _ := cuckooFilter.getPositions(data)
fIndex := cuckooFilter.getIndexKey(firstBucketIndex)
sIndex := cuckooFilter.getIndexKey(secondBucketIndex)
isPresent, err := cuckooFilter.buckets[fIndex].lookup(fingerPrint)
if err != nil {
return false, fmt.Errorf("gostatix: error while removing the data, error: %v", err)
}
if isPresent {
cuckooFilter.buckets[fIndex].remove(fingerPrint)
cuckooFilter.decrLength()
return true, nil
}
isPresent, err = cuckooFilter.buckets[sIndex].lookup(fingerPrint)
if err != nil {
return false, fmt.Errorf("gostatix: error while removing the data, error: %v", err)
}
if isPresent {
cuckooFilter.buckets[sIndex].remove(fingerPrint)
cuckooFilter.decrLength()
return true, nil
}
return false, nil
}
// bucketRedisJSON is internal struct used to json marshal/unmarshal redis backed buckets
type bucketRedisJSON struct {
Size uint64 `json:"s"`
Length uint64 `json:"l"`
Elements []string `json:"e"`
Key string `json:"k"`
}
// cuckooFilterRedisJSON is internal struct used to json marshal/unmarshal redis backed cuckoo filter
type cuckooFilterRedisJSON struct {
Size uint64 `json:"s"`
BucketSize uint64 `json:"bs"`
FingerPrintLength uint64 `json:"fpl"`
Length uint64 `json:"l"`
Retries uint64 `json:"r"`
Buckets []bucketRedisJSON `json:"b"`
Key string `json:"k"`
MetadataKey string `json:"mk"`
}
// Export JSON marshals the CuckooFilterRedis and returns a byte slice containing the data
func (filter *CuckooFilterRedis) Export() ([]byte, error) {
bucketsJSON := make([]bucketRedisJSON, filter.size)
for i := uint64(0); i < filter.size; i++ {
bucketKey := filter.getIndexKey(i)
bucket := filter.buckets[bucketKey]
elements, _ := bucket.getElements()
bucketJSON := bucketRedisJSON{bucket.Size(), bucket.getLength(), elements, bucketKey}
bucketsJSON[i] = bucketJSON
}
return json.Marshal(cuckooFilterRedisJSON{
filter.size,
filter.bucketSize,
filter.fingerPrintLength,
filter.Length(),
filter.retries,
bucketsJSON,
filter.key,
filter.metadataKey,
})
}
// Import JSON unmarshals the _data_ into the CuckooFilterRedis
func (filter *CuckooFilterRedis) Import(data []byte, withNewRedisKey bool) error {
var f cuckooFilterRedisJSON
err := json.Unmarshal(data, &f)
if err != nil {
return fmt.Errorf("gostatix: error importing data, error %v", err)
}
filter.size = f.Size
filter.bucketSize = f.BucketSize
filter.fingerPrintLength = f.FingerPrintLength
filter.retries = f.Retries
if withNewRedisKey {
filter.key = util.GenerateRandomString(16)
filter.metadataKey = util.GenerateRandomString(16)
} else {
filter.key = f.Key
filter.metadataKey = f.MetadataKey
}
filter.setMetadata(f.Length)
filter.initBuckets()
filters := make(map[string]*BucketRedis, f.Size)
for i := range f.Buckets {
bucketJSON := f.Buckets[i]
bucketKey := filter.getIndexKey(uint64(i))
bucket := newBucketRedis(bucketKey, f.BucketSize)
for j := range bucketJSON.Elements {
bucket.add(bucketJSON.Elements[j])
}
filters[bucketKey] = bucket
}
filter.buckets = filters
return nil
}
func (aFilter CuckooFilterRedis) Equals(bFilter CuckooFilterRedis) (bool, error) {
count := 0
result := true
for result && count < len(aFilter.buckets) {
bucket := aFilter.buckets[aFilter.getIndexKey(uint64(count))]
ok, err := bFilter.buckets[bFilter.getIndexKey(uint64(count))].equals(bucket)
if err != nil {
return false, err
}
if !ok {
return false, nil
}
count++
}
return true, nil
}
func (cuckooFilter *CuckooFilterRedis) incrLength() error {
return getRedisClient().HIncrBy(context.Background(), cuckooFilter.metadataKey, "length", 1).Err()
}
func (cuckooFilter *CuckooFilterRedis) decrLength() error {
return getRedisClient().HIncrBy(context.Background(), cuckooFilter.metadataKey, "length", -1).Err()
}
func (cuckooFilter *CuckooFilterRedis) setMetadata(length uint64) error {
metadata := make(map[string]interface{})
metadata["size"] = cuckooFilter.size
metadata["bucketSize"] = cuckooFilter.bucketSize
metadata["fingerPrintLength"] = cuckooFilter.fingerPrintLength
metadata["retries"] = cuckooFilter.retries
metadata["key"] = cuckooFilter.key
metadata["length"] = 0
return getRedisClient().HSet(context.Background(), cuckooFilter.metadataKey, metadata).Err()
}
func (filter *CuckooFilterRedis) initBuckets() error {
var bucketKeys []string
for i := uint64(0); i < filter.size; i++ {
bucketKey := "cuckoo_" + filter.key + "_bucket_" + strconv.FormatUint(i, 10)
bucketKeys = append(bucketKeys, bucketKey)
}
initCuckooFilterRedis := redis.NewScript(`
local key = KEYS[1]
local size = ARGV[1]
local bucketSize = ARGV[2]
redis.call("DEL", key)
for i=2, tonumber(size)+1 do
redis.call("LPUSH", key, KEYS[i])
end
return true
`)
_, err := initCuckooFilterRedis.Run(
context.Background(),
getRedisClient(),
append([]string{filter.key}, bucketKeys...),
filter.size,
filter.bucketSize,
).Bool()
if err != nil {
return fmt.Errorf("error while init buckets in redis, error: %v", err)
}
for i := range bucketKeys {
bucketKey := bucketKeys[i]
filter.buckets[bucketKey] = newBucketRedis(bucketKey, filter.bucketSize)
}
return nil
}
func (filter *CuckooFilterRedis) localInitBuckets() {
var bucketKeys []string
for i := uint64(0); i < filter.size; i++ {
bucketKey := "cuckoo_" + filter.key + "_bucket_" + strconv.FormatUint(i, 10)
bucketKeys = append(bucketKeys, bucketKey)
}
for i := range bucketKeys {
bucketKey := bucketKeys[i]
filter.buckets[bucketKey] = newBucketRedis(bucketKey, filter.bucketSize)
}
}
func (cuckooFilter *CuckooFilterRedis) getIndexKey(index uint64) string {
return "cuckoo_" + cuckooFilter.key + "_bucket_" + strconv.FormatUint(index, 10)
}