From 045e499c8f1e2bf68287c9708c6359f493dd9b70 Mon Sep 17 00:00:00 2001 From: jholdstock Date: Tue, 5 Nov 2024 09:53:58 +0000 Subject: [PATCH] bolt: Improve delete inactive orders/matches perf. Running these functions on a resource contrained machine and a large database was incredibly slow due to overuse of the newestBuckets func. Every key was accessed and sorted every time a batch of orders/matches was processed. Rather than repeatedly performing this work, the full set of keys is now accessed only once. The resulting code runs ~200x faster. There is a slight change in behaviour because database transactions are now committed with a *maximum* of 1000 deletions rather than *exactly* 1000. This is not an issue because the exact size of each transaction doesn't matter, the important point is to limit the maximum size of each transaction to prevent excess memory consumption. This performance gain removes the need to log the progress of each individual batch, so now only a summary is logged after the entire deletion process is completed. --- client/db/bolt/db.go | 158 +++++++++++++++++++++++++++---------------- 1 file changed, 100 insertions(+), 58 deletions(-) diff --git a/client/db/bolt/db.go b/client/db/bolt/db.go index 5e0b13c6b9..65b60f8f12 100644 --- a/client/db/bolt/db.go +++ b/client/db/bolt/db.go @@ -2123,10 +2123,7 @@ func (idx *timeIndexNewest) add(t uint64, k []byte, b *bbolt.Bucket) { func (db *BoltDB) DeleteInactiveOrders(ctx context.Context, olderThan *time.Time, perOrderFn func(ords *dexdb.MetaOrder) error) (int, error) { const batchSize = 1000 - var ( - finished bool - olderThanB []byte - ) + var olderThanB []byte if olderThan != nil && !olderThan.IsZero() { olderThanB = uint64Bytes(uint64(olderThan.UnixMilli())) } else { @@ -2156,51 +2153,73 @@ func (db *BoltDB) DeleteInactiveOrders(ctx context.Context, olderThan *time.Time return 0, err } + // Get the keys of every archived order. + var keys [][]byte + if err := db.View(func(tx *bbolt.Tx) error { + archivedOB := tx.Bucket(archivedOrdersBucket) + if archivedOB == nil { + return fmt.Errorf("failed to open %s bucket", string(archivedOrdersBucket)) + } + archivedOB.ForEach(func(k, _ []byte) error { + keys = append(keys, bytes.Clone(k)) + return nil + }) + return nil + }); err != nil { + return 0, fmt.Errorf("unable to get archived order keys: %v", err) + } + nDeletedOrders := 0 - for !finished { + start := time.Now() + // Check orders in batches to prevent any single db transaction from + // becoming too large. + for i := 0; i < len(keys); i += batchSize { if err := ctx.Err(); err != nil { return 0, err } - start := time.Now() + + // Check if this is the last batch. + end := i + batchSize + if end > len(keys) { + end = len(keys) + } + nDeletedBatch := 0 err := db.Update(func(tx *bbolt.Tx) error { - // Run through the archived order bucket storing id's keys for - // returning the order data and deletion later. archivedOB := tx.Bucket(archivedOrdersBucket) if archivedOB == nil { return fmt.Errorf("failed to open %s bucket", string(archivedOrdersBucket)) } - // Retrieve one more than we will delete in order to gauge if - // this is the last order to delete. - oneOverSize := batchSize + 1 - filter := func(k []byte, oBkt *bbolt.Bucket) bool { + for j := i; j < end; j++ { + key := keys[j] + oBkt := archivedOB.Bucket(key) + var oid order.OrderID - copy(oid[:], k) + copy(oid[:], key) + + // Don't delete this order if it is still active. if order.OrderStatus(intCoder.Uint16(oBkt.Get(statusKey))).IsActive() { db.log.Warnf("active order %v found in inactive bucket", oid) - return false + continue } + + // Don't delete this order if it still has active matches. if _, has := activeMatchOrders[oid]; has { - return false + continue } + + // Don't delete this order if it is too new. timeB := oBkt.Get(updateTimeKey) - return bytes.Compare(timeB, olderThanB) <= 0 - } - trios := newestBuckets([]*bbolt.Bucket{archivedOB}, oneOverSize, updateTimeKey, filter) - - // Ignore the last order if it exists. Otherwise there are no - // more orders to delete. - if len(trios) == oneOverSize { - trios = trios[:batchSize] - } else { - finished = true - } - for _, trio := range trios { - o, err := decodeOrderBucket(trio.k, trio.b) + if bytes.Compare(timeB, olderThanB) > 0 { + continue + } + + // Proceed with deletion. + o, err := decodeOrderBucket(key, oBkt) if err != nil { return fmt.Errorf("failed to decode order bucket: %v", err) } - if err := archivedOB.DeleteBucket(trio.k); err != nil { + if err := archivedOB.DeleteBucket(key); err != nil { return fmt.Errorf("failed to delete order bucket: %v", err) } if perOrderFn != nil { @@ -2211,8 +2230,6 @@ func (db *BoltDB) DeleteInactiveOrders(ctx context.Context, olderThan *time.Time nDeletedBatch++ } nDeletedOrders += nDeletedBatch - db.log.Infof("Deleted %d orders (%d total) from the database in %v.", - nDeletedBatch, nDeletedOrders, time.Since(start)) return nil }) if err != nil { @@ -2222,6 +2239,10 @@ func (db *BoltDB) DeleteInactiveOrders(ctx context.Context, olderThan *time.Time return 0, fmt.Errorf("unable to delete orders: %v", err) } } + + db.log.Infof("Deleted %d archived orders from the database in %v", + nDeletedOrders, time.Since(start)) + return nDeletedOrders, nil } @@ -2266,10 +2287,7 @@ func orderSide(tx *bbolt.Tx, oid order.OrderID) (sell bool, err error) { func (db *BoltDB) DeleteInactiveMatches(ctx context.Context, olderThan *time.Time, perMatchFn func(mtch *dexdb.MetaMatch, isSell bool) error) (int, error) { const batchSize = 1000 - var ( - finished bool - olderThanB []byte - ) + var olderThanB []byte if olderThan != nil && !olderThan.IsZero() { olderThanB = uint64Bytes(uint64(olderThan.UnixMilli())) } else { @@ -2295,46 +2313,68 @@ func (db *BoltDB) DeleteInactiveMatches(ctx context.Context, olderThan *time.Tim return 0, err } + // Get the keys of every archived match. + var keys [][]byte + if err := db.View(func(tx *bbolt.Tx) error { + archivedMB := tx.Bucket(archivedMatchesBucket) + if archivedMB == nil { + return fmt.Errorf("failed to open %s bucket", string(archivedMatchesBucket)) + } + archivedMB.ForEach(func(k, _ []byte) error { + keys = append(keys, bytes.Clone(k)) + return nil + }) + return nil + }); err != nil { + return 0, fmt.Errorf("unable to get archived match keys: %v", err) + } + nDeletedMatches := 0 - for !finished { + start := time.Now() + // Check matches in batches to prevent any single db transaction from + // becoming too large. + for i := 0; i < len(keys); i += batchSize { if err := ctx.Err(); err != nil { return 0, err } - start := time.Now() + + // Check if this is the last batch. + end := i + batchSize + if end > len(keys) { + end = len(keys) + } + nDeletedBatch := 0 if err := db.Update(func(tx *bbolt.Tx) error { archivedMB := tx.Bucket(archivedMatchesBucket) if archivedMB == nil { return fmt.Errorf("failed to open %s bucket", string(archivedMatchesBucket)) } - // Retrieve one more than we will delete in order to gauge if - // this is the last match to delete. - oneOverSize := batchSize + 1 - filter := func(k []byte, mBkt *bbolt.Bucket) bool { + for j := i; j < end; j++ { + key := keys[j] + mBkt := archivedMB.Bucket(key) + oidB := mBkt.Get(orderIDKey) var oid order.OrderID copy(oid[:], oidB) + + // Don't delete this match if it still has active orders. if _, has := activeOrders[oid]; has { - return false + continue } + + // Don't delete this match if it is too new. timeB := mBkt.Get(stampKey) - return bytes.Compare(timeB, olderThanB) <= 0 - } - trios := newestBuckets([]*bbolt.Bucket{archivedMB}, oneOverSize, stampKey, filter) - - // Ignore the last order if it exists. Otherwise there are no - // more orders to delete. - if len(trios) == oneOverSize { - trios = trios[:batchSize] - } else { - finished = true - } - for _, trio := range trios { - m, err := loadMatchBucket(trio.b, false) + if bytes.Compare(timeB, olderThanB) > 0 { + continue + } + + // Proceed with deletion. + m, err := loadMatchBucket(mBkt, false) if err != nil { return fmt.Errorf("failed to load match bucket: %v", err) } - if err := archivedMB.DeleteBucket(trio.k); err != nil { + if err := archivedMB.DeleteBucket(key); err != nil { return fmt.Errorf("failed to delete match bucket: %v", err) } if perMatchFn != nil { @@ -2349,8 +2389,6 @@ func (db *BoltDB) DeleteInactiveMatches(ctx context.Context, olderThan *time.Tim nDeletedBatch++ } nDeletedMatches += nDeletedBatch - db.log.Infof("Deleted %d matches (%d total) from the database in %v.", - nDeletedBatch, nDeletedMatches, time.Since(start)) return nil }); err != nil { @@ -2360,6 +2398,10 @@ func (db *BoltDB) DeleteInactiveMatches(ctx context.Context, olderThan *time.Tim return 0, fmt.Errorf("unable to delete matches: %v", err) } } + + db.log.Infof("Deleted %d archived matches from the database in %v", + nDeletedMatches, time.Since(start)) + return nDeletedMatches, nil }