diff --git a/client/asset/eth/txdb.go b/client/asset/eth/txdb.go index a9bc6d1f73..60c06d0345 100644 --- a/client/asset/eth/txdb.go +++ b/client/asset/eth/txdb.go @@ -404,9 +404,9 @@ func (db *badgerTxDB) getTxs(n int, refID *common.Hash, past bool, tokenID *uint // getPendingTxs returns a map of nonce to extendedWalletTx for all // pending transactions. func (db *badgerTxDB) getPendingTxs() ([]*extendedWalletTx, error) { - // We will be iterating backwards from the most recent nonce. - // If we find numConfirmedTxsToCheck consecutive confirmed transactions, - // we can stop iterating. + // We will be iterating backwards from the most recent nonce. If we find + // numConfirmedTxsToCheck consecutive confirmed transactions, we can stop + // iterating. const numConfirmedTxsToCheck = 20 txs := make([]*extendedWalletTx, 0, 4) diff --git a/dex/lexi/datum.go b/dex/lexi/datum.go new file mode 100644 index 0000000000..54cbfdf388 --- /dev/null +++ b/dex/lexi/datum.go @@ -0,0 +1,91 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "bytes" + "fmt" + + "github.com/decred/dcrd/wire" +) + +// datum is a value in the key-value database, along with information about +// its index entries. +type datum struct { + version byte + indexes [][]byte + v []byte +} + +func (d *datum) bytes() ([]byte, error) { + if d.version != 0 { + return nil, fmt.Errorf("unknown datum version %d", d.version) + } + + // encoded datum length is 1 byte for version, 1 varint to say how many + // indexes there are then for each index, a varint to specify the size of + // the index entry followed by the entry itself, then a varint to specify + // the size of the value blob followed by the value blob itself. + bLen := 1 + len(d.v) + wire.VarIntSerializeSize(uint64(len(d.v))) + wire.VarIntSerializeSize(uint64(len(d.indexes))) + for _, ib := range d.indexes { + bLen += len(ib) + wire.VarIntSerializeSize(uint64(len(ib))) + } + b := bytes.NewBuffer(make([]byte, 0, bLen)) + if err := b.WriteByte(d.version); err != nil { + return nil, fmt.Errorf("error writing version: %w", err) + } + if err := wire.WriteVarInt(b, 0, uint64(len(d.indexes))); err != nil { + return nil, fmt.Errorf("error writing index count var int: %w", err) + } + for _, ib := range d.indexes { + if err := wire.WriteVarInt(b, 0, uint64(len(ib))); err != nil { + return nil, fmt.Errorf("error writing index var int: %w", err) + } + if _, err := b.Write(ib); err != nil { + return nil, fmt.Errorf("error writing index value: %w", err) + } + } + if err := wire.WriteVarInt(b, 0, uint64(len(d.v))); err != nil { + return nil, fmt.Errorf("error writing value var int: %w", err) + } + if _, err := b.Write(d.v); err != nil { + return nil, fmt.Errorf("error writing value: %w", err) + } + return b.Bytes(), nil +} + +func decodeDatum(blob []byte) (*datum, error) { + if len(blob) < 4 { + return nil, fmt.Errorf("datum blob length cannot be < 4. got %d", len(blob)) + } + d := &datum{version: blob[0]} + if d.version != 0 { + return nil, fmt.Errorf("unknown datum blob version %d", d.version) + } + b := bytes.NewBuffer(blob[1:]) + nIndexes, err := wire.ReadVarInt(b, 0) + if err != nil { + return nil, fmt.Errorf("error reading number of indexes: %w", err) + } + d.indexes = make([][]byte, nIndexes) + for i := 0; i < int(nIndexes); i++ { + indexLen, err := wire.ReadVarInt(b, 0) + if err != nil { + return nil, fmt.Errorf("error reading index length: %w", err) + } + d.indexes[i] = make([]byte, indexLen) + if _, err := b.Read(d.indexes[i]); err != nil { + return nil, fmt.Errorf("error reading index: %w", err) + } + } + valueLen, err := wire.ReadVarInt(b, 0) + if err != nil { + return nil, fmt.Errorf("erro reading value var int: %w", err) + } + d.v = make([]byte, valueLen) + if _, err := b.Read(d.v); err != nil { + return nil, fmt.Errorf("error reading value: %w", err) + } + return d, nil +} diff --git a/dex/lexi/db_test.go b/dex/lexi/db_test.go new file mode 100644 index 0000000000..f2614d17f1 --- /dev/null +++ b/dex/lexi/db_test.go @@ -0,0 +1,216 @@ +package lexi + +import ( + "bytes" + "encoding" + "os" + "path/filepath" + "strings" + "testing" + + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/encode" +) + +func newTestDB(t *testing.T) (*DB, func()) { + tmpDir, err := os.MkdirTemp("", "") + if err != nil { + t.Fatalf("error making temp dir: %v", err) + } + db, err := New(&Config{ + Path: filepath.Join(tmpDir, "test.db"), + Log: dex.StdOutLogger("T", dex.LevelInfo), + }) + if err != nil { + t.Fatalf("error constructing db: %v", err) + } + return db, func() { os.RemoveAll(tmpDir) } +} + +func TestPrefixes(t *testing.T) { + db, shutdown := newTestDB(t) + defer shutdown() + + pfix, err := db.prefixForName("1") + if err != nil { + t.Fatalf("error getting prefix 1: %v", err) + } + if pfix != firstAvailablePrefix { + t.Fatalf("expected prefix %s, got %s", firstAvailablePrefix, pfix) + } + + pfix, err = db.prefixForName("2") + if err != nil { + t.Fatalf("error getting prefix 2: %v", err) + } + if secondPfix := incrementPrefix(firstAvailablePrefix); pfix != secondPfix { + t.Fatalf("expected prefix %s, got %s", secondPfix, pfix) + } + + // Make sure requests for the same table name return the already-registered + // prefix. + pfix, err = db.prefixForName("1") + if err != nil { + t.Fatalf("error getting prefix 1 again: %v", err) + } + if pfix != firstAvailablePrefix { + t.Fatalf("expected prefix %s, got %s", firstAvailablePrefix, pfix) + } +} + +type tValue struct { + k, v, idx []byte +} + +func (v *tValue) MarshalBinary() ([]byte, error) { + return v.v, nil +} + +func valueIndex(k, v encoding.BinaryMarshaler) ([]byte, error) { + return v.(*tValue).idx, nil +} + +func TestIndex(t *testing.T) { + db, shutdown := newTestDB(t) + defer shutdown() + + tbl, err := db.Table("T") + if err != nil { + t.Fatalf("Error creating table: %v", err) + } + + idx, err := tbl.AddIndex("I", valueIndex) + if err != nil { + t.Fatalf("Error adding index: %v", err) + } + + const nVs = 100 + vs := make([]*tValue, nVs) + for i := 0; i < nVs; i++ { + k := append(encode.RandomBytes(5), byte(i)) + v := &tValue{k: []byte{byte(i)}, v: encode.RandomBytes(10), idx: []byte{byte(i)}} + vs[i] = v + if err := tbl.Set(B(k), v); err != nil { + t.Fatalf("Error setting table entry: %v", err) + } + } + + // Iterate forwards. + var i int + idx.Iterate(nil, func(it *Iter) error { + v := vs[i] + it.V(func(vB []byte) error { + if !bytes.Equal(vB, v.v) { + t.Fatalf("Wrong bytes for forward iteration index %d", i) + } + return nil + }) + i++ + return nil + }) + if i != nVs { + t.Fatalf("Expected to iterate %d items but only did %d", nVs, i) + } + + // Iterate backwards + i = nVs + idx.Iterate(nil, func(it *Iter) error { + i-- + v := vs[i] + return it.V(func(vB []byte) error { + if !bytes.Equal(vB, v.v) { + t.Fatalf("Wrong bytes for reverse iteration index %d", i) + } + return nil + }) + }, WithReverse()) + if i != 0 { + t.Fatalf("Expected to iterate back to zero but only got to %d", i) + } + + // Iterate forward and delete the first half. + i = 0 + if err := idx.Iterate(nil, func(it *Iter) error { + if i < 50 { + i++ + return it.Delete() + } + return ErrEndIteration + }, WithUpdate()); err != nil { + t.Fatalf("Error iterating forward to delete entries: %v", err) + } + if i != 50 { + t.Fatalf("Expected to iterate forward to 50, but only got to %d", i) + } + + idx.Iterate(nil, func(it *Iter) error { + return it.V(func(vB []byte) error { + if !bytes.Equal(vB, vs[50].v) { + t.Fatal("Wrong first iteration item after deletion") + } + return ErrEndIteration + }) + }) + + // Seek a specific item. + i = 75 + idx.Iterate(nil, func(it *Iter) error { + if i == 75 { + i-- + return it.V(func(vB []byte) error { + if !bytes.Equal(vB, vs[75].v) { + t.Fatal("first item wasn't 25") + } + return nil + }) + } else if i == 74 { + return ErrEndIteration + } + t.Fatal("reached an unexpected value") + return nil + }, WithSeek(vs[75].idx), WithReverse()) + if i != 74 { + t.Fatal("never reached 74") + } +} + +func TestDatum(t *testing.T) { + testEncodeDecode := func(tag string, d *datum) { + t.Helper() + b, err := d.bytes() + if err != nil { + t.Fatalf("%s: error encoding simple datum: %v", tag, err) + } + reD, err := decodeDatum(b) + if err != nil { + t.Fatalf("%s: error decoding simple datum: %v", tag, err) + } + if !bytes.Equal(reD.v, d.v) { + t.Fatalf("%s: decoding datum value incorrect. %x != %x", tag, reD.v, d.v) + } + if d.version != 0 { + t.Fatalf("%s: wrong datum version. expected %d, got %d", tag, d.version, reD.version) + } + if len(d.indexes) != len(reD.indexes) { + t.Fatalf("%s: wrong number of indexes. wanted %d, got %d", tag, len(d.indexes), reD.indexes) + } + for i, idx := range d.indexes { + if !bytes.Equal(idx, reD.indexes[i]) { + t.Fatalf("%s: Wrong index # %d", tag, i) + } + } + } + + d := &datum{version: 1, v: []byte{0x01}} + if _, err := d.bytes(); err == nil || !strings.Contains(err.Error(), "unknown datum version") { + t.Fatalf("Wrong error for unknown datum version: %v", err) + } + d.version = 0 + + testEncodeDecode("simple", d) + + d = &datum{v: encode.RandomBytes(300)} + d.indexes = append(d.indexes, encode.RandomBytes(5)) + d.indexes = append(d.indexes, encode.RandomBytes(300)) + testEncodeDecode("complex", d) +} diff --git a/dex/lexi/dbid.go b/dex/lexi/dbid.go new file mode 100644 index 0000000000..a99d012dfa --- /dev/null +++ b/dex/lexi/dbid.go @@ -0,0 +1,39 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "encoding" + "encoding/hex" +) + +// DBIDSize is the size of the DBID. It is 8 bytes to match the size of a +// byte-encoded uint64. +const DBIDSize = 8 + +// DBID is a unique ID mapped to a datum's key. Keys can be any length, but to +// prevent long keys from being echoed in all the indexes, every key is +// translated to a DBID for internal use. +type DBID [DBIDSize]byte + +var ( + _ encoding.BinaryMarshaler = DBID{} + + lastDBID = DBID{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} +) + +// MarshalBinary satisfies encoding.BinaryMarshaler for the DBID. +func (dbID DBID) MarshalBinary() ([]byte, error) { + return dbID[:], nil +} + +// String encodes the DBID as a 16-character hexadecimal string. +func (dbID DBID) String() string { + return hex.EncodeToString(dbID[:]) +} + +func newDBIDFromBytes(b []byte) (dbID DBID) { + copy(dbID[:], b) + return dbID +} diff --git a/dex/lexi/index.go b/dex/lexi/index.go new file mode 100644 index 0000000000..a99f99461a --- /dev/null +++ b/dex/lexi/index.go @@ -0,0 +1,296 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "encoding" + "encoding/binary" + "errors" + "fmt" + "time" + + "decred.org/dcrdex/dex" + "github.com/dgraph-io/badger" +) + +const ( + // ErrEndIteration can be returned from the function passed to Iterate + // to end iteration. No error will be returned from Iterate. + ErrEndIteration = dex.ErrorKind("end iteration") +) + +// Index is just a lexicographically-ordered list of byte slices. An Index is +// associated with a Table, and a datum inserted into a table can put entries +// into the Index. The Index can be iterated to view sorted data in the table. +type Index struct { + *DB + name string + table *Table + prefix keyPrefix + f func(k, v encoding.BinaryMarshaler) ([]byte, error) + defaultIterationOptions iteratorOpts +} + +// AddIndex adds an index to a Table. Once an Index is added, every datum +// Set in the Table will generate an entry in the Index too. +func (t *Table) AddIndex(name string, f func(k, v encoding.BinaryMarshaler) ([]byte, error)) (*Index, error) { + p, err := t.prefixForName(t.name + "__idx__" + name) + if err != nil { + return nil, err + } + idx := &Index{ + DB: t.DB, + name: name, + table: t, + prefix: p, + f: f, + } + t.indexes = append(t.indexes, idx) + return idx, nil +} + +func (idx *Index) add(txn *badger.Txn, k, v encoding.BinaryMarshaler, dbID DBID) ([]byte, error) { + idxB, err := idx.f(k, v) + if err != nil { + return nil, fmt.Errorf("error getting index value: %w", err) + } + b := prefixedKey(idx.prefix, append(idxB, dbID[:]...)) + if err := txn.Set(b, nil); err != nil { + return nil, fmt.Errorf("error writing index entry: %w", err) + } + return b, nil +} + +type iteratorOpts struct { + update bool + reverse bool + seek []byte +} + +// IterationOption is a knob to change how Iterate runs on an Index. +type IterationOption func(opts *iteratorOpts) + +// WithUpdate must be used if the caller intends to make modifications during +// iteration, such as deleting elements. +func WithUpdate() IterationOption { + return func(opts *iteratorOpts) { + opts.update = true + } +} + +// WithReverse sets the direction of iteration to reverse-lexicographical. +func WithReverse() IterationOption { + return func(opts *iteratorOpts) { + opts.reverse = true + } +} + +// WithForward sets the direction of iteration to lexicographical. +func WithForward() IterationOption { + return func(opts *iteratorOpts) { + opts.reverse = false + } +} + +// WithSeek starts iteration at the specified prefix. +func WithSeek(prefix []byte) IterationOption { + return func(opts *iteratorOpts) { + opts.seek = prefix + } +} + +// UseDefaultIterationOptions sets default options for Iterate. +func (idx *Index) UseDefaultIterationOptions(optss ...IterationOption) { + for i := range optss { + optss[i](&idx.defaultIterationOptions) + } +} + +// Iter is an entry in the Index. The caller can use Iter to access and delete +// data associated with the index entry and it's datum. +type Iter struct { + idx *Index + item *badger.Item + txn *badger.Txn + dbID DBID + d *datum +} + +// V gives access to the datum bytes. The byte slice passed to f is only valid +// for the duration of the function call. The caller should make a copy if they +// intend to use the bytes outside of the scope of f. +func (i *Iter) V(f func(vB []byte) error) error { + d, err := i.datum() + if err != nil { + return err + } + return f(d.v) +} + +// K is the key for the datum. +func (i *Iter) K() ([]byte, error) { + item, err := i.txn.Get(prefixedKey(idToKeyPrefix, i.dbID[:])) + if err != nil { + return nil, err + } + return item.ValueCopy(nil) +} + +// Entry is the actual index entry. These are the bytes returned by the +// generator passed to AddIndex. +func (i *Iter) Entry(f func(idxB []byte) error) error { + k := i.item.Key() + if len(k) < prefixSize+DBIDSize { + return fmt.Errorf("index entry too small. length = %d", len(k)) + } + return f(k[prefixSize : len(k)-DBIDSize]) +} + +func (i *Iter) datum() (_ *datum, err error) { + if i.d != nil { + return i.d, nil + } + k := i.item.Key() + if len(k) < prefixSize+DBIDSize { + return nil, fmt.Errorf("invalid index entry length %d", len(k)) + } + dbID := newDBIDFromBytes(k[len(k)-DBIDSize:]) + i.d, err = i.idx.table.get(i.txn, dbID) + return i.d, err +} + +// Delete deletes the indexed datum and any associated index entries. +func (i *Iter) Delete() error { + d, err := i.datum() + if err != nil { + return err + } + return i.idx.table.deleteDatum(i.txn, i.dbID, d) +} + +// IndexBucket is any one of a number of common types whose binary encoding is +// straight-forward. An IndexBucket restricts Iterate to the entries in the +// index that have the bytes decoded from the IndexBucket as the prefix. +type IndexBucket interface{} + +func parseIndexBucket(i IndexBucket) (b []byte, err error) { + switch it := i.(type) { + case []byte: + b = it + case uint32: + b = make([]byte, 4) + binary.BigEndian.PutUint32(b[:], it) + case time.Time: + b = make([]byte, 8) + binary.BigEndian.PutUint64(b[:], uint64(it.UnixMilli())) + case nil: + default: + err = fmt.Errorf("unknown IndexBucket type %T", it) + } + return +} + +// Iterate iterates the index, providing access to the index entry, datum, and +// datum key via the Iter. +func (idx *Index) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error { + prefix, err := parseIndexBucket(prefixI) + if err != nil { + return err + } + io := idx.defaultIterationOptions + for i := range iterOpts { + iterOpts[i](&io) + } + iterFunc := iteratePrefix + if io.reverse { + iterFunc = reverseIteratePrefix + } + viewUpdate := idx.View + if io.update { + viewUpdate = idx.Update + } + var seek []byte + if len(io.seek) > 0 { + seek = prefixedKey(idx.prefix, io.seek) + } + return viewUpdate(func(txn *badger.Txn) error { + return iterFunc(txn, prefixedKey(idx.prefix, prefix), seek, func(iter *badger.Iterator) error { + item := iter.Item() + k := item.Key() + if len(k) < prefixSize+DBIDSize { + return fmt.Errorf("invalid index entry length %d", len(k)) + } + return f(&Iter{ + idx: idx, + item: iter.Item(), + txn: txn, + dbID: newDBIDFromBytes(k[len(k)-DBIDSize:]), + }) + }) + }) +} + +type badgerIterationOption func(opts *badger.IteratorOptions) + +func withPrefetchSize(n int) badgerIterationOption { + return func(opts *badger.IteratorOptions) { + opts.PrefetchSize = n + } +} + +func iteratePrefix(txn *badger.Txn, prefix, seek []byte, f func(iter *badger.Iterator) error, iterOpts ...badgerIterationOption) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = prefix + for i := range iterOpts { + iterOpts[i](&opts) + } + iter := txn.NewIterator(opts) + defer iter.Close() + + if len(seek) == 0 { + iter.Rewind() + } else { + iter.Seek(seek) + } + + for ; iter.Valid(); iter.Next() { + if err := f(iter); err != nil { + if errors.Is(err, ErrEndIteration) { + return nil + } + return err + } + } + return nil +} + +func reverseIteratePrefix(txn *badger.Txn, prefix, seek []byte, f func(iter *badger.Iterator) error, iterOpts ...badgerIterationOption) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = prefix + opts.Reverse = true + for i := range iterOpts { + iterOpts[i](&opts) + } + iter := txn.NewIterator(opts) + defer iter.Close() + + if len(seek) == 0 { + var p keyPrefix + copy(p[:], prefix) + nextPrefix := incrementPrefix(p) + seek = nextPrefix[:] + } else { + seek = append(seek, lastDBID[:]...) + } + + for iter.Seek(seek); iter.ValidForPrefix(prefix); iter.Next() { + if err := f(iter); err != nil { + if errors.Is(err, ErrEndIteration) { + return nil + } + return err + } + } + return nil +} diff --git a/dex/lexi/keyprefix.go b/dex/lexi/keyprefix.go new file mode 100644 index 0000000000..ddfea06d0f --- /dev/null +++ b/dex/lexi/keyprefix.go @@ -0,0 +1,61 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "encoding/binary" + "encoding/hex" + + "github.com/dgraph-io/badger" +) + +const prefixSize = 2 + +// keyPrefix is a prefix for a key in the badger DB. Every table and index has +// a unique keyPrefix. This enables sorting and iteration of data. +type keyPrefix [prefixSize]byte + +func (p keyPrefix) String() string { + return hex.EncodeToString(p[:]) +} + +// NO RAW BADGER KEYS CAN BE LENGTH 2. IT CAN'T JUST BE A PREFIX, OR ELSE +// REVERSE ITERATION OF INDEXES FAILS + +var ( + // reserved prefixes + prefixToNamePrefix = keyPrefix{0x00, 0x00} + nameToPrefixPrefix = keyPrefix{0x00, 0x01} + primarySequencePrefix = keyPrefix{0x00, 0x02} + keyToIDPrefix = keyPrefix{0x00, 0x03} + idToKeyPrefix = keyPrefix{0x00, 0x04} + + firstAvailablePrefix = keyPrefix{0x01, 0x00} +) + +func incrementPrefix(prefix keyPrefix) (p keyPrefix) { + v := binary.BigEndian.Uint16(prefix[:]) + binary.BigEndian.PutUint16(p[:], v+1) + return p +} + +func bytesToPrefix(b []byte) (p keyPrefix) { + copy(p[:], b) + return +} + +func lastKeyForPrefix(txn *badger.Txn, p keyPrefix) (k []byte) { + reverseIteratePrefix(txn, p[:], nil, func(iter *badger.Iterator) error { + k = iter.Item().KeyCopy(nil)[prefixSize:] + return ErrEndIteration + }, withPrefetchSize(1)) + return +} + +func prefixedKey(p keyPrefix, k []byte) []byte { + pk := make([]byte, prefixSize+len(k)) + copy(pk, p[:]) + copy(pk[prefixSize:], k) + return pk +} diff --git a/dex/lexi/lexi.go b/dex/lexi/lexi.go new file mode 100644 index 0000000000..99a56d7cc3 --- /dev/null +++ b/dex/lexi/lexi.go @@ -0,0 +1,238 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "context" + "encoding" + "encoding/binary" + "errors" + "fmt" + "sync" + "time" + + "decred.org/dcrdex/dex" + "github.com/dgraph-io/badger" +) + +// ErrKeyNotFound is an alias for badger.ErrKeyNotFound so that the caller +// doesn't have to import badger to use the semantics. Either error will satisfy +// errors.Is the same. +var ErrKeyNotFound = badger.ErrKeyNotFound + +func convertError(err error) error { + switch { + case errors.Is(err, badger.ErrKeyNotFound): + return ErrKeyNotFound + } + return nil +} + +// DB is the Lexi DB. The Lexi DB wraps a badger key-value database and provides +// the ability to add indexed data. +type DB struct { + *badger.DB + log dex.Logger + idSeq *badger.Sequence + wg sync.WaitGroup + updateWG sync.WaitGroup +} + +// Config is the configuration settings for the Lexi DB. +type Config struct { + Path string + Log dex.Logger +} + +// New constructs a new Lexi DB. +func New(cfg *Config) (*DB, error) { + opts := badger.DefaultOptions(cfg.Path).WithLogger(&badgerLoggerWrapper{cfg.Log.SubLogger("BADG")}) + var err error + bdb, err := badger.Open(opts) + if err == badger.ErrTruncateNeeded { + // Probably a Windows thing. + // https://github.com/dgraph-io/badger/issues/744 + cfg.Log.Warnf("Error opening badger db: %v", err) + // Try again with value log truncation enabled. + opts.Truncate = true + cfg.Log.Warnf("Attempting to reopen badger DB with the Truncate option set...") + bdb, err = badger.Open(opts) + } + if err != nil { + return nil, err + } + idSeq, err := bdb.GetSequence(prefixedKey(primarySequencePrefix, []byte{0x00}), 1000) + if err != nil { + return nil, fmt.Errorf("error getting constructing primary sequence: %w", err) + } + + return &DB{ + DB: bdb, + log: cfg.Log, + idSeq: idSeq, + }, nil +} + +// Connect starts the DB, and creates goroutines to perform shutdown when the +// context is canceled. +func (db *DB) Connect(ctx context.Context) (*sync.WaitGroup, error) { + db.wg.Add(1) + go func() { + defer db.wg.Done() + <-ctx.Done() + if err := db.idSeq.Release(); err != nil { + db.log.Errorf("Error releasing sequence: %v", err) + } + }() + + db.wg.Add(1) + go func() { + defer db.wg.Done() + defer db.Close() + defer db.updateWG.Wait() + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := db.RunValueLogGC(0.5) + if err != nil && !errors.Is(err, badger.ErrNoRewrite) { + db.log.Errorf("garbage collection error: %v", err) + } + case <-ctx.Done(): + return + } + } + }() + + return &db.wg, nil +} + +// Update: badger can return an ErrConflict if a read and write happen +// concurrently. This bugs the hell out of me, because I though that if a +// database was ACID-compliant, this was impossible, but I guess not. Either +// way, the solution is to try again. +func (db *DB) Update(f func(txn *badger.Txn) error) (err error) { + db.updateWG.Add(1) + defer db.updateWG.Done() + + const maxRetries = 10 + sleepTime := 5 * time.Millisecond + + for i := 0; i < maxRetries; i++ { + if err = db.DB.Update(f); err == nil || !errors.Is(err, badger.ErrConflict) { + return err + } + sleepTime *= 2 + time.Sleep(sleepTime) + } + + return err +} + +// prefixForName returns a unique prefix for the provided name and logs the +// relationship in the DB. Repeated calls to prefixForName with the same name +// will return the same prefix, including through restarts. +func (db *DB) prefixForName(name string) (prefix keyPrefix, _ error) { + nameKey := prefixedKey(nameToPrefixPrefix, []byte(name)) + return prefix, db.Update(func(txn *badger.Txn) error { + it, err := txn.Get(nameKey) + if err == nil { + return it.Value(func(b []byte) error { + prefix = bytesToPrefix(b) + return nil + }) + } + if !errors.Is(err, badger.ErrKeyNotFound) { + return fmt.Errorf("error getting name: %w", err) + } + lastPrefix := lastKeyForPrefix(txn, prefixToNamePrefix) + if len(lastPrefix) == 0 { + prefix = firstAvailablePrefix + } else { + prefix = incrementPrefix(bytesToPrefix(lastPrefix)) + } + if err := txn.Set(prefixedKey(nameToPrefixPrefix, []byte(name)), prefix[:]); err != nil { + return fmt.Errorf("error setting prefix for table name: %w", err) + } + if err := txn.Set(prefixedKey(prefixToNamePrefix, prefix[:]), []byte(name)); err != nil { + return fmt.Errorf("error setting table name for prefix: %w", err) + } + return nil + }) +} + +func (db *DB) nextID() (dbID DBID, _ error) { + i, err := db.idSeq.Next() + if err != nil { + return dbID, err + } + binary.BigEndian.PutUint64(dbID[:], i) + return +} + +// KeyID returns the DBID for the key. This is the same DBID that will be used +// internally for the key when datum is inserted into a Table with Set. This +// method is provided as a tool to keep database index entries short. +func (db *DB) KeyID(kB []byte) (dbID DBID, err error) { + err = db.View(func(txn *badger.Txn) error { + dbID, err = db.keyID(txn, kB) + return err + }) + return +} + +func (db *DB) keyID(txn *badger.Txn, kB []byte) (dbID DBID, err error) { + item, err := txn.Get(prefixedKey(keyToIDPrefix, kB)) + if err == nil { + err = item.Value(func(v []byte) error { + copy(dbID[:], v) + return nil + }) + return + } + if errors.Is(err, ErrKeyNotFound) { + if dbID, err = db.nextID(); err != nil { + return + } + if err = txn.Set(prefixedKey(keyToIDPrefix, kB), dbID[:]); err != nil { + err = fmt.Errorf("error mapping key to ID: %w", err) + } else if err = txn.Set(prefixedKey(idToKeyPrefix, dbID[:]), kB); err != nil { + err = fmt.Errorf("error mapping ID to key: %w", err) + } + } + return +} + +// deleteDBID deletes the id-to-key mapping and the key-to-id mapping for the +// DBID. +func (db *DB) deleteDBID(txn *badger.Txn, dbID DBID) error { + idK := prefixedKey(idToKeyPrefix, dbID[:]) + item, err := txn.Get(idK) + if err != nil { + return convertError(err) + } + if err := item.Value(func(kB []byte) error { + if err := txn.Delete(prefixedKey(keyToIDPrefix, kB)); err != nil { + return fmt.Errorf("error deleting key to ID mapping: %w", err) + } + return nil + }); err != nil { + return err + } + if err := txn.Delete(idK); err != nil { + return fmt.Errorf("error deleting ID to key mapping: %w", err) + } + return nil +} + +// B is a byte slice that implements encoding.BinaryMarshaler. +type B []byte + +var _ encoding.BinaryMarshaler = B{} + +// MarshalBinary implements encoding.BinaryMarshaler for the B. +func (b B) MarshalBinary() ([]byte, error) { + return b, nil +} diff --git a/dex/lexi/log.go b/dex/lexi/log.go new file mode 100644 index 0000000000..414aa25843 --- /dev/null +++ b/dex/lexi/log.go @@ -0,0 +1,42 @@ +package lexi + +import ( + "decred.org/dcrdex/dex" + "github.com/dgraph-io/badger" +) + +// badgerLoggerWrapper wraps dex.Logger and translates Warnf to Warningf to +// satisfy badger.Logger. It also lowers the log level of Infof to Debugf +// and Debugf to Tracef. +type badgerLoggerWrapper struct { + dex.Logger +} + +var _ badger.Logger = (*badgerLoggerWrapper)(nil) + +// Debugf -> dex.Logger.Tracef +func (log *badgerLoggerWrapper) Debugf(s string, a ...interface{}) { + log.Tracef(s, a...) +} + +func (log *badgerLoggerWrapper) Debug(a ...interface{}) { + log.Trace(a...) +} + +// Infof -> dex.Logger.Debugf +func (log *badgerLoggerWrapper) Infof(s string, a ...interface{}) { + log.Debugf(s, a...) +} + +func (log *badgerLoggerWrapper) Info(a ...interface{}) { + log.Debug(a...) +} + +// Warningf -> dex.Logger.Warnf +func (log *badgerLoggerWrapper) Warningf(s string, a ...interface{}) { + log.Warnf(s, a...) +} + +func (log *badgerLoggerWrapper) Warning(a ...interface{}) { + log.Warn(a...) +} diff --git a/dex/lexi/table.go b/dex/lexi/table.go new file mode 100644 index 0000000000..7f9d47df40 --- /dev/null +++ b/dex/lexi/table.go @@ -0,0 +1,191 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "encoding" + "errors" + "fmt" + + "github.com/dgraph-io/badger" +) + +// Table is a prefixed section of the k-v DB. A Table can have indexes, such +// that data inserted into the Table will generates index entries for use in +// lookup and iteration. +type Table struct { + *DB + name string + prefix keyPrefix + indexes []*Index + defaultSetOptions setOpts +} + +// Table constructs a new table in the DB. +func (db *DB) Table(name string) (*Table, error) { + p, err := db.prefixForName(name) + if err != nil { + return nil, err + } + return &Table{ + DB: db, + name: name, + prefix: p, + }, nil +} + +// Get retrieves a value from the Table. +func (t *Table) Get(k encoding.BinaryMarshaler, v encoding.BinaryUnmarshaler) error { + kB, err := k.MarshalBinary() + if err != nil { + return fmt.Errorf("error marshaling key: %w", err) + } + return t.View(func(txn *badger.Txn) error { + dbID, err := t.keyID(txn, kB) + if err != nil { + return convertError(err) + } + d, err := t.get(txn, dbID) + if err != nil { + return err + } + return v.UnmarshalBinary(d.v) + }) +} + +// func (t *Table) GetDBID(dbID DBID, v encoding.BinaryUnmarshaler) error { +// return t.View(func(txn *badger.Txn) error { +// d, err := t.get(txn, dbID) +// if err != nil { +// return err +// } +// return v.UnmarshalBinary(d.v) +// }) +// } + +func (t *Table) get(txn *badger.Txn, dbID DBID) (d *datum, err error) { + item, err := txn.Get(prefixedKey(t.prefix, dbID[:])) + if err != nil { + return nil, convertError(err) + } + err = item.Value(func(dB []byte) error { + d, err = decodeDatum(dB) + if err != nil { + return fmt.Errorf("error decoding datum: %w", err) + } + return nil + }) + return +} + +type setOpts struct { + replace bool +} + +// SetOptions is an knob to control how items are inserted into the table with +// Set. +type SetOption func(opts *setOpts) + +// WithReplace allows replacing pre-existing values when calling Set. +func WithReplace() SetOption { + return func(opts *setOpts) { + opts.replace = true + } +} + +// UseDefaultSetOptions sets default options for Set. +func (t *Table) UseDefaultSetOptions(setOpts ...SetOption) { + for i := range setOpts { + setOpts[i](&t.defaultSetOptions) + } +} + +// Set inserts a new value for the key, and creates index entries. +func (t *Table) Set(k, v encoding.BinaryMarshaler, setOpts ...SetOption) error { + kB, err := k.MarshalBinary() + if err != nil { + return fmt.Errorf("error marshaling key: %w", err) + } + // zero length keys are not allowed because it screws up the reverse + // iteration scheme. + if len(kB) == 0 { + return errors.New("no zero-length keys allowed") + } + vB, err := v.MarshalBinary() + if err != nil { + return fmt.Errorf("error marshaling value: %w", err) + } + opts := t.defaultSetOptions + for i := range setOpts { + setOpts[i](&opts) + } + d := &datum{v: vB, indexes: make([][]byte, len(t.indexes))} + return t.Update(func(txn *badger.Txn) error { + dbID, err := t.keyID(txn, kB) + if err != nil { + return convertError(err) + } + // See if an entry already exists + oldDatum, err := t.get(txn, dbID) + if !errors.Is(err, ErrKeyNotFound) { + if err != nil { + return fmt.Errorf("error looking for existing entry: %w", err) + } + // We found an old entry + if !opts.replace { + return errors.New("attempted to replace an entry without specifying WithReplace") + } + // Delete any old indexes + for _, k := range oldDatum.indexes { + if err := txn.Delete(k); err != nil { + return fmt.Errorf("error deleting replaced datum's index entry; %w", err) + } + } + } + for i, idx := range t.indexes { + if d.indexes[i], err = idx.add(txn, k, v, dbID); err != nil { + return fmt.Errorf("error adding entry to index %q: %w", idx.name, err) + } + } + dB, err := d.bytes() + if err != nil { + return fmt.Errorf("error encoding datum: %w", err) + } + return txn.Set(prefixedKey(t.prefix, dbID[:]), dB) + }) +} + +// Delete deletes the data associated with the key, including any index entries +// and the id<->key mappings. +func (t *Table) Delete(kB []byte) error { + return t.Update(func(txn *badger.Txn) error { + dbID, err := t.keyID(txn, kB) + if err != nil { + return convertError(err) + } + item, err := txn.Get(dbID[:]) + if err != nil { + return convertError(err) + } + return item.Value(func(dB []byte) error { + d, err := decodeDatum(dB) + if err != nil { + return fmt.Errorf("error decoding datum: %w", err) + } + return t.deleteDatum(txn, dbID, d) + }) + }) +} + +func (t *Table) deleteDatum(txn *badger.Txn, dbID DBID, d *datum) error { + for _, k := range d.indexes { + if err := txn.Delete(k); err != nil { + return fmt.Errorf("error deleting index entry; %w", err) + } + } + if err := txn.Delete(prefixedKey(t.prefix, dbID[:])); err != nil { + return fmt.Errorf("error deleting table entry: %w", err) + } + return t.deleteDBID(txn, dbID) +}