Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lexi: add new database package #3033

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/asset/eth/txdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ func (db *badgerTxDB) getTxs(n int, refID *common.Hash, past bool, tokenID *uint
// getPendingTxs returns a map of nonce to extendedWalletTx for all
// pending transactions.
func (db *badgerTxDB) getPendingTxs() ([]*extendedWalletTx, error) {
// We will be iterating backwards from the most recent nonce.
// If we find numConfirmedTxsToCheck consecutive confirmed transactions,
// we can stop iterating.
// We will be iterating backwards from the most recent nonce. If we find
// numConfirmedTxsToCheck consecutive confirmed transactions, we can stop
// iterating.
const numConfirmedTxsToCheck = 20

txs := make([]*extendedWalletTx, 0, 4)
Expand Down
91 changes: 91 additions & 0 deletions dex/lexi/datum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.

package lexi

import (
"bytes"
"fmt"

"github.com/decred/dcrd/wire"
)

// datum is a value in the key-value database, along with information about
// its index entries.
type datum struct {
version byte
indexes [][]byte
v []byte
}

func (d *datum) bytes() ([]byte, error) {
if d.version != 0 {
return nil, fmt.Errorf("unknown datum version %d", d.version)
}

// encoded datum length is 1 byte for version, 1 varint to say how many
// indexes there are then for each index, a varint to specify the size of
// the index entry followed by the entry itself, then a varint to specify
// the size of the value blob followed by the value blob itself.
bLen := 1 + len(d.v) + wire.VarIntSerializeSize(uint64(len(d.v))) + wire.VarIntSerializeSize(uint64(len(d.indexes)))
for _, ib := range d.indexes {
bLen += len(ib) + wire.VarIntSerializeSize(uint64(len(ib)))
}
b := bytes.NewBuffer(make([]byte, 0, bLen))
Comment on lines +30 to +34
Copy link
Member Author

@buck54321 buck54321 Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. I forgot to doc this encoding stuff. It's pretty much like BuildyBytes, but I'm pre-allocating the buffer and allowing any blob size. And I'm leveraging wire for the var int stuff.

I guess its a lot of work just to pre-allocate the buffer though. derp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reading the doc I noticed this:

// MarshalJSON satisfies the json.Unmarshaler interface, returns a quoted copy

"...satisfies the json.Unmarshaler interface..."

I think it is the json.Marshaller interface. I could be wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests all PASS on my machine.

if err := b.WriteByte(d.version); err != nil {
return nil, fmt.Errorf("error writing version: %w", err)
}
if err := wire.WriteVarInt(b, 0, uint64(len(d.indexes))); err != nil {
return nil, fmt.Errorf("error writing index count var int: %w", err)
}
for _, ib := range d.indexes {
if err := wire.WriteVarInt(b, 0, uint64(len(ib))); err != nil {
return nil, fmt.Errorf("error writing index var int: %w", err)
}
if _, err := b.Write(ib); err != nil {
return nil, fmt.Errorf("error writing index value: %w", err)
}
}
if err := wire.WriteVarInt(b, 0, uint64(len(d.v))); err != nil {
return nil, fmt.Errorf("error writing value var int: %w", err)
}
if _, err := b.Write(d.v); err != nil {
return nil, fmt.Errorf("error writing value: %w", err)
}
return b.Bytes(), nil
}

func decodeDatum(blob []byte) (*datum, error) {
if len(blob) < 4 {
return nil, fmt.Errorf("datum blob length cannot be < 4. got %d", len(blob))
}
d := &datum{version: blob[0]}
if d.version != 0 {
return nil, fmt.Errorf("unknown datum blob version %d", d.version)
}
b := bytes.NewBuffer(blob[1:])
nIndexes, err := wire.ReadVarInt(b, 0)
if err != nil {
return nil, fmt.Errorf("error reading number of indexes: %w", err)
}
d.indexes = make([][]byte, nIndexes)
for i := 0; i < int(nIndexes); i++ {
indexLen, err := wire.ReadVarInt(b, 0)
if err != nil {
return nil, fmt.Errorf("error reading index length: %w", err)
}
d.indexes[i] = make([]byte, indexLen)
if _, err := b.Read(d.indexes[i]); err != nil {
return nil, fmt.Errorf("error reading index: %w", err)
}
}
valueLen, err := wire.ReadVarInt(b, 0)
if err != nil {
return nil, fmt.Errorf("erro reading value var int: %w", err)
}
d.v = make([]byte, valueLen)
if _, err := b.Read(d.v); err != nil {
return nil, fmt.Errorf("error reading value: %w", err)
}
return d, nil
}
216 changes: 216 additions & 0 deletions dex/lexi/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package lexi

import (
"bytes"
"encoding"
"os"
"path/filepath"
"strings"
"testing"

"decred.org/dcrdex/dex"
"decred.org/dcrdex/dex/encode"
)

func newTestDB(t *testing.T) (*DB, func()) {
tmpDir, err := os.MkdirTemp("", "")
if err != nil {
t.Fatalf("error making temp dir: %v", err)
}
db, err := New(&Config{
Path: filepath.Join(tmpDir, "test.db"),
Log: dex.StdOutLogger("T", dex.LevelInfo),
})
if err != nil {
t.Fatalf("error constructing db: %v", err)
}
return db, func() { os.RemoveAll(tmpDir) }
}

func TestPrefixes(t *testing.T) {
db, shutdown := newTestDB(t)
defer shutdown()

pfix, err := db.prefixForName("1")
if err != nil {
t.Fatalf("error getting prefix 1: %v", err)
}
if pfix != firstAvailablePrefix {
t.Fatalf("expected prefix %s, got %s", firstAvailablePrefix, pfix)
}

pfix, err = db.prefixForName("2")
if err != nil {
t.Fatalf("error getting prefix 2: %v", err)
}
if secondPfix := incrementPrefix(firstAvailablePrefix); pfix != secondPfix {
t.Fatalf("expected prefix %s, got %s", secondPfix, pfix)
}

// Make sure requests for the same table name return the already-registered
// prefix.
pfix, err = db.prefixForName("1")
if err != nil {
t.Fatalf("error getting prefix 1 again: %v", err)
}
if pfix != firstAvailablePrefix {
t.Fatalf("expected prefix %s, got %s", firstAvailablePrefix, pfix)
}
}

type tValue struct {
k, v, idx []byte
}

func (v *tValue) MarshalBinary() ([]byte, error) {
return v.v, nil
}

func valueIndex(k, v encoding.BinaryMarshaler) ([]byte, error) {
return v.(*tValue).idx, nil
}

func TestIndex(t *testing.T) {
db, shutdown := newTestDB(t)
defer shutdown()

tbl, err := db.Table("T")
if err != nil {
t.Fatalf("Error creating table: %v", err)
}

idx, err := tbl.AddIndex("I", valueIndex)
if err != nil {
t.Fatalf("Error adding index: %v", err)
}

const nVs = 100
vs := make([]*tValue, nVs)
for i := 0; i < nVs; i++ {
k := append(encode.RandomBytes(5), byte(i))
v := &tValue{k: []byte{byte(i)}, v: encode.RandomBytes(10), idx: []byte{byte(i)}}
vs[i] = v
if err := tbl.Set(B(k), v); err != nil {
t.Fatalf("Error setting table entry: %v", err)
}
}

// Iterate forwards.
var i int
idx.Iterate(nil, func(it *Iter) error {
v := vs[i]
it.V(func(vB []byte) error {
if !bytes.Equal(vB, v.v) {
t.Fatalf("Wrong bytes for forward iteration index %d", i)
}
return nil
})
i++
return nil
})
if i != nVs {
t.Fatalf("Expected to iterate %d items but only did %d", nVs, i)
}

// Iterate backwards
i = nVs
idx.Iterate(nil, func(it *Iter) error {
i--
v := vs[i]
return it.V(func(vB []byte) error {
if !bytes.Equal(vB, v.v) {
t.Fatalf("Wrong bytes for reverse iteration index %d", i)
}
return nil
})
}, WithReverse())
if i != 0 {
t.Fatalf("Expected to iterate back to zero but only got to %d", i)
}

// Iterate forward and delete the first half.
i = 0
if err := idx.Iterate(nil, func(it *Iter) error {
if i < 50 {
i++
return it.Delete()
}
return ErrEndIteration
}, WithUpdate()); err != nil {
t.Fatalf("Error iterating forward to delete entries: %v", err)
}
if i != 50 {
t.Fatalf("Expected to iterate forward to 50, but only got to %d", i)
}

idx.Iterate(nil, func(it *Iter) error {
return it.V(func(vB []byte) error {
if !bytes.Equal(vB, vs[50].v) {
t.Fatal("Wrong first iteration item after deletion")
}
return ErrEndIteration
})
})

// Seek a specific item.
i = 75
idx.Iterate(nil, func(it *Iter) error {
if i == 75 {
i--
return it.V(func(vB []byte) error {
if !bytes.Equal(vB, vs[75].v) {
t.Fatal("first item wasn't 25")
}
return nil
})
} else if i == 74 {
return ErrEndIteration
}
t.Fatal("reached an unexpected value")
return nil
}, WithSeek(vs[75].idx), WithReverse())
if i != 74 {
t.Fatal("never reached 74")
}
}

func TestDatum(t *testing.T) {
testEncodeDecode := func(tag string, d *datum) {
t.Helper()
b, err := d.bytes()
if err != nil {
t.Fatalf("%s: error encoding simple datum: %v", tag, err)
}
reD, err := decodeDatum(b)
if err != nil {
t.Fatalf("%s: error decoding simple datum: %v", tag, err)
}
if !bytes.Equal(reD.v, d.v) {
t.Fatalf("%s: decoding datum value incorrect. %x != %x", tag, reD.v, d.v)
}
if d.version != 0 {
t.Fatalf("%s: wrong datum version. expected %d, got %d", tag, d.version, reD.version)
}
if len(d.indexes) != len(reD.indexes) {
t.Fatalf("%s: wrong number of indexes. wanted %d, got %d", tag, len(d.indexes), reD.indexes)
}
for i, idx := range d.indexes {
if !bytes.Equal(idx, reD.indexes[i]) {
t.Fatalf("%s: Wrong index # %d", tag, i)
}
}
}

d := &datum{version: 1, v: []byte{0x01}}
if _, err := d.bytes(); err == nil || !strings.Contains(err.Error(), "unknown datum version") {
t.Fatalf("Wrong error for unknown datum version: %v", err)
}
d.version = 0

testEncodeDecode("simple", d)

d = &datum{v: encode.RandomBytes(300)}
d.indexes = append(d.indexes, encode.RandomBytes(5))
d.indexes = append(d.indexes, encode.RandomBytes(300))
testEncodeDecode("complex", d)
}
39 changes: 39 additions & 0 deletions dex/lexi/dbid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.

package lexi

import (
"encoding"
"encoding/hex"
)

// DBIDSize is the size of the DBID. It is 8 bytes to match the size of a
// byte-encoded uint64.
const DBIDSize = 8

// DBID is a unique ID mapped to a datum's key. Keys can be any length, but to
// prevent long keys from being echoed in all the indexes, every key is
// translated to a DBID for internal use.
type DBID [DBIDSize]byte

var (
_ encoding.BinaryMarshaler = DBID{}

lastDBID = DBID{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
)

// MarshalBinary satisfies encoding.BinaryMarshaler for the DBID.
func (dbID DBID) MarshalBinary() ([]byte, error) {
return dbID[:], nil
}

// String encodes the DBID as a 16-character hexadecimal string.
func (dbID DBID) String() string {
return hex.EncodeToString(dbID[:])
}

func newDBIDFromBytes(b []byte) (dbID DBID) {
copy(dbID[:], b)
return dbID
}
Loading
Loading