Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bolt: Improve delete inactive orders/matches perf. #3059

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 100 additions & 58 deletions client/db/bolt/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2123,10 +2123,7 @@ func (idx *timeIndexNewest) add(t uint64, k []byte, b *bbolt.Bucket) {
func (db *BoltDB) DeleteInactiveOrders(ctx context.Context, olderThan *time.Time,
perOrderFn func(ords *dexdb.MetaOrder) error) (int, error) {
const batchSize = 1000
var (
finished bool
olderThanB []byte
)
var olderThanB []byte
if olderThan != nil && !olderThan.IsZero() {
olderThanB = uint64Bytes(uint64(olderThan.UnixMilli()))
} else {
Expand Down Expand Up @@ -2156,51 +2153,73 @@ func (db *BoltDB) DeleteInactiveOrders(ctx context.Context, olderThan *time.Time
return 0, err
}

// Get the keys of every archived order.
var keys [][]byte
if err := db.View(func(tx *bbolt.Tx) error {
archivedOB := tx.Bucket(archivedOrdersBucket)
if archivedOB == nil {
return fmt.Errorf("failed to open %s bucket", string(archivedOrdersBucket))
}
archivedOB.ForEach(func(k, _ []byte) error {
keys = append(keys, bytes.Clone(k))
return nil
})
return nil
}); err != nil {
return 0, fmt.Errorf("unable to get archived order keys: %v", err)
}

nDeletedOrders := 0
for !finished {
start := time.Now()
// Check orders in batches to prevent any single db transaction from
// becoming too large.
for i := 0; i < len(keys); i += batchSize {
if err := ctx.Err(); err != nil {
return 0, err
}
start := time.Now()

// Check if this is the last batch.
end := i + batchSize
if end > len(keys) {
end = len(keys)
}

nDeletedBatch := 0
err := db.Update(func(tx *bbolt.Tx) error {
// Run through the archived order bucket storing id's keys for
// returning the order data and deletion later.
archivedOB := tx.Bucket(archivedOrdersBucket)
if archivedOB == nil {
return fmt.Errorf("failed to open %s bucket", string(archivedOrdersBucket))
}
// Retrieve one more than we will delete in order to gauge if
// this is the last order to delete.
oneOverSize := batchSize + 1
filter := func(k []byte, oBkt *bbolt.Bucket) bool {
for j := i; j < end; j++ {
key := keys[j]
oBkt := archivedOB.Bucket(key)

var oid order.OrderID
copy(oid[:], k)
copy(oid[:], key)

// Don't delete this order if it is still active.
if order.OrderStatus(intCoder.Uint16(oBkt.Get(statusKey))).IsActive() {
db.log.Warnf("active order %v found in inactive bucket", oid)
return false
continue
}

// Don't delete this order if it still has active matches.
if _, has := activeMatchOrders[oid]; has {
return false
continue
}

// Don't delete this order if it is too new.
timeB := oBkt.Get(updateTimeKey)
return bytes.Compare(timeB, olderThanB) <= 0
}
trios := newestBuckets([]*bbolt.Bucket{archivedOB}, oneOverSize, updateTimeKey, filter)

// Ignore the last order if it exists. Otherwise there are no
// more orders to delete.
if len(trios) == oneOverSize {
trios = trios[:batchSize]
} else {
finished = true
}
for _, trio := range trios {
o, err := decodeOrderBucket(trio.k, trio.b)
if bytes.Compare(timeB, olderThanB) > 0 {
continue
}

// Proceed with deletion.
o, err := decodeOrderBucket(key, oBkt)
if err != nil {
return fmt.Errorf("failed to decode order bucket: %v", err)
}
if err := archivedOB.DeleteBucket(trio.k); err != nil {
if err := archivedOB.DeleteBucket(key); err != nil {
return fmt.Errorf("failed to delete order bucket: %v", err)
}
if perOrderFn != nil {
Expand All @@ -2211,8 +2230,6 @@ func (db *BoltDB) DeleteInactiveOrders(ctx context.Context, olderThan *time.Time
nDeletedBatch++
}
nDeletedOrders += nDeletedBatch
db.log.Infof("Deleted %d orders (%d total) from the database in %v.",
nDeletedBatch, nDeletedOrders, time.Since(start))
return nil
})
if err != nil {
Expand All @@ -2222,6 +2239,10 @@ func (db *BoltDB) DeleteInactiveOrders(ctx context.Context, olderThan *time.Time
return 0, fmt.Errorf("unable to delete orders: %v", err)
}
}

db.log.Infof("Deleted %d archived orders from the database in %v",
nDeletedOrders, time.Since(start))

return nDeletedOrders, nil
}

Expand Down Expand Up @@ -2266,10 +2287,7 @@ func orderSide(tx *bbolt.Tx, oid order.OrderID) (sell bool, err error) {
func (db *BoltDB) DeleteInactiveMatches(ctx context.Context, olderThan *time.Time,
perMatchFn func(mtch *dexdb.MetaMatch, isSell bool) error) (int, error) {
const batchSize = 1000
var (
finished bool
olderThanB []byte
)
var olderThanB []byte
if olderThan != nil && !olderThan.IsZero() {
olderThanB = uint64Bytes(uint64(olderThan.UnixMilli()))
} else {
Expand All @@ -2295,46 +2313,68 @@ func (db *BoltDB) DeleteInactiveMatches(ctx context.Context, olderThan *time.Tim
return 0, err
}

// Get the keys of every archived match.
var keys [][]byte
if err := db.View(func(tx *bbolt.Tx) error {
archivedMB := tx.Bucket(archivedMatchesBucket)
if archivedMB == nil {
return fmt.Errorf("failed to open %s bucket", string(archivedMatchesBucket))
}
archivedMB.ForEach(func(k, _ []byte) error {
keys = append(keys, bytes.Clone(k))
return nil
})
return nil
}); err != nil {
return 0, fmt.Errorf("unable to get archived match keys: %v", err)
}

nDeletedMatches := 0
for !finished {
start := time.Now()
// Check matches in batches to prevent any single db transaction from
// becoming too large.
for i := 0; i < len(keys); i += batchSize {
if err := ctx.Err(); err != nil {
return 0, err
}
start := time.Now()

// Check if this is the last batch.
end := i + batchSize
if end > len(keys) {
end = len(keys)
}

nDeletedBatch := 0
if err := db.Update(func(tx *bbolt.Tx) error {
archivedMB := tx.Bucket(archivedMatchesBucket)
if archivedMB == nil {
return fmt.Errorf("failed to open %s bucket", string(archivedMatchesBucket))
}
// Retrieve one more than we will delete in order to gauge if
// this is the last match to delete.
oneOverSize := batchSize + 1
filter := func(k []byte, mBkt *bbolt.Bucket) bool {
for j := i; j < end; j++ {
key := keys[j]
mBkt := archivedMB.Bucket(key)

oidB := mBkt.Get(orderIDKey)
var oid order.OrderID
copy(oid[:], oidB)

// Don't delete this match if it still has active orders.
if _, has := activeOrders[oid]; has {
return false
continue
}

// Don't delete this match if it is too new.
timeB := mBkt.Get(stampKey)
return bytes.Compare(timeB, olderThanB) <= 0
}
trios := newestBuckets([]*bbolt.Bucket{archivedMB}, oneOverSize, stampKey, filter)

// Ignore the last order if it exists. Otherwise there are no
// more orders to delete.
if len(trios) == oneOverSize {
trios = trios[:batchSize]
} else {
finished = true
}
for _, trio := range trios {
m, err := loadMatchBucket(trio.b, false)
if bytes.Compare(timeB, olderThanB) > 0 {
continue
}

// Proceed with deletion.
m, err := loadMatchBucket(mBkt, false)
if err != nil {
return fmt.Errorf("failed to load match bucket: %v", err)
}
if err := archivedMB.DeleteBucket(trio.k); err != nil {
if err := archivedMB.DeleteBucket(key); err != nil {
return fmt.Errorf("failed to delete match bucket: %v", err)
}
if perMatchFn != nil {
Expand All @@ -2349,8 +2389,6 @@ func (db *BoltDB) DeleteInactiveMatches(ctx context.Context, olderThan *time.Tim
nDeletedBatch++
}
nDeletedMatches += nDeletedBatch
db.log.Infof("Deleted %d matches (%d total) from the database in %v.",
nDeletedBatch, nDeletedMatches, time.Since(start))

return nil
}); err != nil {
Expand All @@ -2360,6 +2398,10 @@ func (db *BoltDB) DeleteInactiveMatches(ctx context.Context, olderThan *time.Tim
return 0, fmt.Errorf("unable to delete matches: %v", err)
}
}

db.log.Infof("Deleted %d archived matches from the database in %v",
nDeletedMatches, time.Since(start))

return nDeletedMatches, nil
}

Expand Down
Loading