Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OSS sealunwrapper adding extra get + put request to all storage get requests #29050

Merged
merged 2 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/29050.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:bug
core: fix bug in seal unwrapper that caused high storage latency in Vault CE. For every storage read request, the
seal unwrapper was performing the read twice, and would also issue an unnecessary storage write.
```
7 changes: 7 additions & 0 deletions sdk/physical/inmem/inmem_ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ func (i *InmemHABackend) HAEnabled() bool {
return true
}

func (i *InmemHABackend) Underlying() *InmemBackend {
if txBackend, ok := i.Backend.(*TransactionalInmemBackend); ok {
return &txBackend.InmemBackend
}
return i.Backend.(*InmemBackend)
}

// InmemLock is an in-memory Lock implementation for the HABackend
type InmemLock struct {
in *InmemHABackend
Expand Down
70 changes: 38 additions & 32 deletions vault/sealunwrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import (
// NewSealUnwrapper creates a new seal unwrapper
func NewSealUnwrapper(underlying physical.Backend, logger log.Logger) physical.Backend {
ret := &sealUnwrapper{
underlying: underlying,
logger: logger,
locks: locksutil.CreateLocks(),
allowUnwraps: new(uint32),
underlying: underlying,
logger: logger,
locks: locksutil.CreateLocks(),
}

if underTxn, ok := underlying.(physical.Transactional); ok {
Expand All @@ -43,7 +42,7 @@ type sealUnwrapper struct {
underlying physical.Backend
logger log.Logger
locks []*locksutil.LockEntry
allowUnwraps *uint32
allowUnwraps atomic.Bool
}

// transactionalSealUnwrapper is a seal unwrapper that wraps a physical that is transactional
Expand All @@ -63,63 +62,70 @@ func (d *sealUnwrapper) Put(ctx context.Context, entry *physical.Entry) error {
return d.underlying.Put(ctx, entry)
}

// unwrap gets an entry from underlying storage and tries to unwrap it. If the entry was not wrapped, return
// value unwrappedEntry will be nil. If the entry is wrapped and encrypted, an error is returned.
func (d *sealUnwrapper) unwrap(ctx context.Context, key string) (entry, unwrappedEntry *physical.Entry, err error) {
entry, err = d.underlying.Get(ctx, key)
// unwrap gets an entry from underlying storage and tries to unwrap it.
// - If the entry is not wrapped: the entry will be returned unchanged and wasWrapped will be false
// - If the entry is wrapped and encrypted: an error is returned.
// - If the entry is wrapped but not encrypted: the entry will be unwrapped and returned. wasWrapped will be true.
func (d *sealUnwrapper) unwrap(ctx context.Context, key string) (unwrappedEntry *physical.Entry, wasWrapped bool, err error) {
entry, err := d.underlying.Get(ctx, key)
if err != nil {
return nil, nil, err
return nil, false, err
}
if entry == nil {
return nil, nil, err
return nil, false, nil
}

wrappedEntryValue, unmarshaled := UnmarshalSealWrappedValueWithCanary(entry.Value)
switch {
case !unmarshaled:
unwrappedEntry = entry
// Entry is not wrapped
return entry, false, nil
case wrappedEntryValue.isEncrypted():
return nil, nil, fmt.Errorf("cannot decode sealwrapped storage entry %q", entry.Key)
// Entry is wrapped and encrypted
return nil, true, fmt.Errorf("cannot decode sealwrapped storage entry %q", entry.Key)
default:
// Entry is wrapped and not encrypted
pt, err := wrappedEntryValue.getPlaintextValue()
if err != nil {
return nil, nil, err
return nil, true, err
}
unwrappedEntry = &physical.Entry{
return &physical.Entry{
Key: entry.Key,
Value: pt,
}
}, true, nil
}

return entry, unwrappedEntry, nil
}

func (d *sealUnwrapper) Get(ctx context.Context, key string) (*physical.Entry, error) {
entry, unwrappedEntry, err := d.unwrap(ctx, key)
entry, wasWrapped, err := d.unwrap(ctx, key)
switch {
case err != nil:
case err != nil: // Failed to get entry
return nil, err
case entry == nil:
case entry == nil: // Entry doesn't exist
return nil, nil
case atomic.LoadUint32(d.allowUnwraps) != 1:
return unwrappedEntry, nil
case !wasWrapped || !d.allowUnwraps.Load(): // Entry was not wrapped or unwrapping not allowed
return entry, nil
}

// Entry was wrapped, we need to replace it with the unwrapped value

// Grab locks because we are performing a write
locksutil.LockForKey(d.locks, key).Lock()
defer locksutil.LockForKey(d.locks, key).Unlock()

// At this point we need to re-read and re-check
entry, unwrappedEntry, err = d.unwrap(ctx, key)
// Read entry again in case it was changed while we were waiting for the lock
entry, wasWrapped, err = d.unwrap(ctx, key)
switch {
case err != nil:
case err != nil: // Failed to get entry
return nil, err
case entry == nil:
case entry == nil: // Entry doesn't exist
return nil, nil
case atomic.LoadUint32(d.allowUnwraps) != 1:
return unwrappedEntry, nil
case !wasWrapped || !d.allowUnwraps.Load(): // Entry was not wrapped or unwrapping not allowed
return entry, nil
}

return unwrappedEntry, d.underlying.Put(ctx, unwrappedEntry)
// Write out the unwrapped value
return entry, d.underlying.Put(ctx, entry)
}

func (d *sealUnwrapper) Delete(ctx context.Context, key string) error {
Expand Down Expand Up @@ -155,12 +161,12 @@ func (d *transactionalSealUnwrapper) Transaction(ctx context.Context, txns []*ph
// This should only run during preSeal which ensures that it can't be run
// concurrently and that it will be run only by the active node
func (d *sealUnwrapper) stopUnwraps() {
atomic.StoreUint32(d.allowUnwraps, 0)
d.allowUnwraps.Store(false)
}

func (d *sealUnwrapper) runUnwraps() {
// Allow key unwraps on key gets. This gets set only when running on the
// active node to prevent standbys from changing data underneath the
// primary
atomic.StoreUint32(d.allowUnwraps, 1)
d.allowUnwraps.Store(true)
}
53 changes: 36 additions & 17 deletions vault/sealunwrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,38 @@ import (
func TestSealUnwrapper(t *testing.T) {
logger := corehelpers.NewTestLogger(t)

// Test without transactions
phys, err := inmem.NewInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
}
performTestSealUnwrapper(t, phys, logger)
// Test with both cache enabled and disabled
for _, disableCache := range []bool{true, false} {
// Test without transactions
phys, err := inmem.NewInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
}
performTestSealUnwrapper(t, phys, logger, disableCache)

// Test with transactions
tPhys, err := inmem.NewTransactionalInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
// Test with transactions
tPhys, err := inmem.NewTransactionalInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
}
performTestSealUnwrapper(t, tPhys, logger, disableCache)
}
performTestSealUnwrapper(t, tPhys, logger)
}

func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Logger) {
func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Logger, disableCache bool) {
ctx := context.Background()
base := &CoreConfig{
Physical: phys,
Physical: phys,
DisableCache: disableCache,
}
cluster := NewTestCluster(t, base, &TestClusterOptions{
Logger: logger,
})
cluster.Start()
defer cluster.Cleanup()

physImem := phys.(interface{ Underlying() *inmem.InmemBackend }).Underlying()

// Read a value and then save it back in a proto message
entry, err := phys.Get(ctx, "core/master")
if err != nil {
Expand Down Expand Up @@ -78,7 +84,15 @@ func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Lo
// successfully decode it, but be able to unmarshal it when read back from
// the underlying physical store. When we read from active, it should both
// successfully decode it and persist it back.
checkValue := func(core *Core, wrapped bool) {
checkValue := func(core *Core, wrapped bool, ro bool) {
if ro {
physImem.FailPut(true)
physImem.FailDelete(true)
defer func() {
physImem.FailPut(false)
physImem.FailDelete(false)
}()
}
entry, err := core.physical.Get(ctx, "core/master")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -106,7 +120,12 @@ func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Lo
}

TestWaitActive(t, cluster.Cores[0].Core)
checkValue(cluster.Cores[2].Core, true)
checkValue(cluster.Cores[1].Core, true)
checkValue(cluster.Cores[0].Core, false)
checkValue(cluster.Cores[2].Core, true, true)
checkValue(cluster.Cores[1].Core, true, true)
checkValue(cluster.Cores[0].Core, false, false)

// The storage entry should now be unwrapped, so there should be no more writes to storage when we read it
checkValue(cluster.Cores[2].Core, false, true)
checkValue(cluster.Cores[1].Core, false, true)
checkValue(cluster.Cores[0].Core, false, true)
}
Loading