Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0]client/db: enable archive pruning #3010

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions client/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import (
)

const (
defaultRPCCertFile = "rpc.cert"
defaultRPCKeyFile = "rpc.key"
defaultMainnetHost = "127.0.0.1"
defaultTestnetHost = "127.0.0.2"
defaultSimnetHost = "127.0.0.3"
walletPairOneHost = "127.0.0.6"
walletPairTwoHost = "127.0.0.7"
defaultRPCPort = "5757"
defaultWebPort = "5758"
defaultLogLevel = "debug"
configFilename = "dexc.conf"
defaultRPCCertFile = "rpc.cert"
defaultRPCKeyFile = "rpc.key"
defaultMainnetHost = "127.0.0.1"
defaultTestnetHost = "127.0.0.2"
defaultSimnetHost = "127.0.0.3"
walletPairOneHost = "127.0.0.6"
walletPairTwoHost = "127.0.0.7"
defaultRPCPort = "5757"
defaultWebPort = "5758"
defaultLogLevel = "debug"
configFilename = "dexc.conf"
defaultArchiveSizeLimit = 1000
)

var (
Expand Down Expand Up @@ -108,6 +109,8 @@ type CoreConfig struct {
UnlockCoinsOnLogin bool `long:"release-wallet-coins" description:"On login or wallet creation, instruct the wallet to release any coins that it may have locked."`

ExtensionModeFile string `long:"extension-mode-file" description:"path to a file that specifies options for running core as an extension."`

PruneArchive uint64 `long:"prunearchive" description:"prune that order archive to the specified number of most recent orders. zero means no pruning."`
}

// WebConfig encapsulates the configuration needed for the web server.
Expand Down Expand Up @@ -216,6 +219,7 @@ func (cfg *Config) Core(log dex.Logger) *core.Config {
NoAutoDBBackup: cfg.NoAutoDBBackup,
ExtensionModeFile: cfg.ExtensionModeFile,
TheOneHost: cfg.TheOneHost,
PruneArchive: cfg.PruneArchive,
}
}

Expand Down
6 changes: 5 additions & 1 deletion client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,8 +1460,11 @@ type Config struct {
// for running core in extension mode, which gives the caller options for
// e.g. limiting the ability to configure wallets.
ExtensionModeFile string

// TheOneHost will run core with only the specified server.
TheOneHost string
// PruneArchive will prune the order archive to the specified number of
// orders.
PruneArchive uint64
}

// locale is data associated with the currently selected language.
Expand Down Expand Up @@ -1544,6 +1547,7 @@ func New(cfg *Config) (*Core, error) {
}
dbOpts := bolt.Opts{
BackupOnShutdown: !cfg.NoAutoDBBackup,
PruneArchive: cfg.PruneArchive,
}
boltDB, err := bolt.NewDB(cfg.DBPath, cfg.Logger.SubLogger("DB"), dbOpts)
if err != nil {
Expand Down
131 changes: 131 additions & 0 deletions client/db/bolt/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ var (
// Opts is a set of options for the DB.
type Opts struct {
BackupOnShutdown bool // default is true
PruneArchive uint64
}

var defaultOpts = Opts{
Expand Down Expand Up @@ -217,6 +218,11 @@ func NewDB(dbPath string, logger dex.Logger, opts ...Opts) (dexdb.DB, error) {
return nil, err
}

if bdb.opts.PruneArchive > 0 {
bdb.log.Info("Pruning the order archive")
bdb.pruneArchivedOrders(bdb.opts.PruneArchive)
}

bdb.log.Infof("Started database (version = %d, file = %s)", DBVersion, dbPath)

return bdb, nil
Expand Down Expand Up @@ -407,6 +413,131 @@ func (db *BoltDB) SetPrimaryCredentials(creds *dexdb.PrimaryCredentials) error {
})
}

func (db *BoltDB) pruneArchivedOrders(prunedSize uint64) error {

return db.Update(func(tx *bbolt.Tx) error {
archivedOB := tx.Bucket(archivedOrdersBucket)
if archivedOB == nil {
return fmt.Errorf("failed to open %s bucket", string(archivedOrdersBucket))
}

nOrds := uint64(archivedOB.Stats().BucketN - 1 /* BucketN includes top bucket */)
if nOrds <= prunedSize {
return nil
}

// We won't delete any orders with active matches.
activeMatches := tx.Bucket(activeMatchesBucket)
if activeMatches == nil {
return fmt.Errorf("failed to open %s bucket", string(activeMatchesBucket))
}
oidsWithActiveMatches := make(map[order.OrderID]struct{}, 0)
if err := activeMatches.ForEach(func(k, _ []byte) error {
mBkt := activeMatches.Bucket(k)
if mBkt == nil {
return fmt.Errorf("error getting match bucket %x", k)
}
var oid order.OrderID
copy(oid[:], mBkt.Get(orderIDKey))
oidsWithActiveMatches[oid] = struct{}{}
return nil
}); err != nil {
return fmt.Errorf("error building active match order ID index: %w", err)
}

toClear := int(nOrds - prunedSize)

type orderStamp struct {
oid []byte
stamp uint64
}
deletes := make([]*orderStamp, 0, toClear)
sortDeletes := func() {
sort.Slice(deletes, func(i, j int) bool {
return deletes[i].stamp < deletes[j].stamp
})
}
var sortedAtCapacity bool
if err := archivedOB.ForEach(func(oidB, v []byte) error {
var oid order.OrderID
copy(oid[:], oidB)
if _, found := oidsWithActiveMatches[oid]; found {
return nil
}
oBkt := archivedOB.Bucket(oidB)
if oBkt == nil {
return fmt.Errorf("no order bucket iterated order %x", oidB)
}
stampB := oBkt.Get(updateTimeKey)
if stampB == nil {
// Highly improbable.
stampB = make([]byte, 8)
}
stamp := intCoder.Uint64(stampB)
if len(deletes) < toClear {
deletes = append(deletes, &orderStamp{
stamp: stamp,
oid: oidB,
})
return nil
}
if !sortedAtCapacity {
// Make sure the last element is the newest one once we hit
// capacity.
sortDeletes()
sortedAtCapacity = true
}
if stamp > deletes[len(deletes)-1].stamp {
return nil
}
deletes[len(deletes)-1] = &orderStamp{
stamp: stamp,
oid: oidB,
}
sortDeletes()
return nil
}); err != nil {
return fmt.Errorf("archive iteration error: %v", err)
}

deletedOrders := make(map[order.OrderID]struct{})
for _, del := range deletes {
var oid order.OrderID
copy(oid[:], del.oid)
deletedOrders[oid] = struct{}{}
if err := archivedOB.DeleteBucket(del.oid); err != nil {
return fmt.Errorf("error deleting archived order %q: %v", del.oid, err)
}
}

matchesToDelete := make([][]byte, 0, prunedSize /* just avoid some allocs if we can */)
archivedMatches := tx.Bucket(archivedMatchesBucket)
if archivedMatches == nil {
return errors.New("no archived match bucket")
}
if err := archivedMatches.ForEach(func(k, _ []byte) error {
matchBkt := archivedMatches.Bucket(k)
if matchBkt == nil {
return fmt.Errorf("no bucket found for %x during iteration", k)
}
var oid order.OrderID
copy(oid[:], matchBkt.Get(orderIDKey))
if _, found := deletedOrders[oid]; found {
matchesToDelete = append(matchesToDelete, k)
}
return nil
}); err != nil {
return fmt.Errorf("error finding matches to prune: %w", err)
}
for i := range matchesToDelete {
if err := archivedMatches.DeleteBucket(matchesToDelete[i]); err != nil {
return fmt.Errorf("error deleting pruned match %x: %w", matchesToDelete[i], err)
}
}
return nil
})
}

// validateCreds checks that the PrimaryCredentials fields are properly
// populated.
func validateCreds(creds *dexdb.PrimaryCredentials) error {
Expand Down
137 changes: 134 additions & 3 deletions client/db/bolt/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"math/rand"
"os"
Expand All @@ -21,13 +22,14 @@ import (
)

var (
tLogger = dex.StdOutLogger("db_TEST", dex.LevelTrace)
tLogger = dex.StdOutLogger("db_TEST", dex.LevelTrace)
withLongTests bool
)

func newTestDB(t *testing.T) (*BoltDB, func()) {
func newTestDB(t *testing.T, opts ...Opts) (*BoltDB, func()) {
t.Helper()
dbPath := filepath.Join(t.TempDir(), "db.db")
dbi, err := NewDB(dbPath, tLogger)
dbi, err := NewDB(dbPath, tLogger, opts...)
if err != nil {
t.Fatalf("error creating dB: %v", err)
}
Expand All @@ -50,6 +52,9 @@ func newTestDB(t *testing.T) (*BoltDB, func()) {
}

func TestMain(m *testing.M) {
flag.BoolVar(&withLongTests, "withlongtests", false, "include tests that take a long time to run")
flag.Parse()

defer os.Stdout.Sync()
os.Exit(m.Run())
}
Expand Down Expand Up @@ -1162,6 +1167,10 @@ func testCredentialsUpdate(t *testing.T, boltdb *BoltDB, tester func([]byte, str
}

func TestDeleteInactiveMatches(t *testing.T) {
// TODO: This test takes way too long to run. Why?
if !withLongTests {
return
}
boltdb, shutdown := newTestDB(t)
defer shutdown()

Expand Down Expand Up @@ -1340,6 +1349,10 @@ func TestDeleteInactiveMatches(t *testing.T) {
}

func TestDeleteInactiveOrders(t *testing.T) {
// TODO: This test takes way too long to run. Why?
if !withLongTests {
return
}
boltdb, shutdown := newTestDB(t)
defer shutdown()

Expand Down Expand Up @@ -1613,3 +1626,121 @@ func TestPokes(t *testing.T) {
t.Fatal("Result from second LoadPokes wasn't empty")
}
}

func TestPruneArchivedOrders(t *testing.T) {
const host = "blah"
const prunedSize = 5
boltdb, shutdown := newTestDB(t)
defer shutdown()

archivedOrdersN := func() (n int) {
boltdb.View(func(tx *bbolt.Tx) error {
n = tx.Bucket(archivedOrdersBucket).Stats().BucketN - 1 /* BucketN includes top bucket */
return nil
})
return n
}

var ordStampI uint64
addOrder := func(stamp uint64) order.OrderID {
ord, _ := ordertest.RandomLimitOrder()
if stamp == 0 {
stamp = ordStampI
ordStampI++
}
boltdb.UpdateOrder(&db.MetaOrder{
MetaData: &db.OrderMetaData{
Status: order.OrderStatusExecuted,
Host: host,
Proof: db.OrderProof{DEXSig: []byte{0xa}},
},
Order: ord,
})
oid := ord.ID()
boltdb.ordersUpdate(func(ob, archivedOB *bbolt.Bucket) error {
archivedOB.Bucket(oid[:]).Put(updateTimeKey, uint64Bytes(stamp))
return nil
})
return oid
}
for i := 0; i < prunedSize*2; i++ {
addOrder(0)
}

if n := archivedOrdersN(); n != prunedSize*2 {
t.Fatalf("Expected %d archived orders after intitialization, saw %d", prunedSize*2, n)
}

if err := boltdb.pruneArchivedOrders(prunedSize); err != nil {
t.Fatalf("pruneArchivedOrders error: %v", err)
}

if n := archivedOrdersN(); n != prunedSize {
t.Fatalf("Expected %d archived orders after first pruning, saw %d", prunedSize, n)
}

// Make sure we pruned the first 5.
if err := boltdb.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(archivedOrdersBucket)
return bkt.ForEach(func(oidB, _ []byte) error {
if stamp := intCoder.Uint64(bkt.Bucket(oidB).Get(updateTimeKey)); stamp < prunedSize {
return fmt.Errorf("order stamp %d should have been pruned", stamp)
}
return nil
})
}); err != nil {
t.Fatal(err)
}

// Add an order with an early stamp and an active match
oid := addOrder(1)
m := &db.MetaMatch{
MetaData: &db.MatchMetaData{
DEX: host,
Base: 1,
},
UserMatch: ordertest.RandomUserMatch(),
}
m.OrderID = oid
m.Status = order.NewlyMatched
if err := boltdb.UpdateMatch(m); err != nil {
t.Fatal(err)
}

if err := boltdb.pruneArchivedOrders(prunedSize); err != nil {
t.Fatalf("Error pruning orders when one has an active match: %v", err)
}

if n := archivedOrdersN(); n != prunedSize {
t.Fatalf("Expected %d archived orders after pruning with active match order in place, saw %d", prunedSize, n)
}

// Our active match order should still be available
if _, err := boltdb.Order(oid); err != nil {
t.Fatalf("Error retrieving unpruned active match order: %v", err)
}

// Retire the active match order
m.Status = order.MatchComplete
if err := boltdb.UpdateMatch(m); err != nil {
t.Fatal(err)
}
// Add an order to push the now retirable older order out
addOrder(0)
if err := boltdb.pruneArchivedOrders(prunedSize); err != nil {
t.Fatalf("Error pruning orders after retiring match: %v", err)
}
if n := archivedOrdersN(); n != prunedSize {
t.Fatalf("Expected %d archived orders after pruning with retired match, saw %d", prunedSize, n)
}
// Match should not be archived any longer.
metaID := m.MatchOrderUniqueID()
if err := boltdb.matchesView(func(mb, archivedMB *bbolt.Bucket) error {
if mb.Bucket(metaID) != nil || archivedMB.Bucket(metaID) != nil {
return errors.New("still found bucket for retired match of pruned order")
}
return nil
}); err != nil {
t.Fatal(err)
}
}
Loading