Skip to content

Commit

Permalink
fix(session): expired session write request will delete other session…
Browse files Browse the repository at this point in the history
… shadow key (#534)

### Motivation

The expired session write request will delete other session shadow keys.

When processing a write request with an invalid session id, we are
deleting the shadow key even though the write request is going to be
rejected immediately after (when we check for a valid session id).


### Modification

Separate the logic of writing with a session and writing without a
session.

---------

Co-authored-by: Matteo Merli <mmerli@apache.org>
  • Loading branch information
mattisonchao and merlimat authored Sep 25, 2024
1 parent b3c9540 commit 938b7a5
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 22 deletions.
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package main
import (
"context"
"fmt"
"github.com/streamnative/oxia/cmd/wal"
"os"

"github.com/streamnative/oxia/cmd/wal"

"github.com/spf13/cobra"
"go.uber.org/automaxprocs/maxprocs"

Expand Down
1 change: 1 addition & 0 deletions cmd/wal/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package wal

import (
"github.com/spf13/cobra"

"github.com/streamnative/oxia/cmd/wal/common"
"github.com/streamnative/oxia/cmd/wal/truncate"
)
Expand Down
6 changes: 4 additions & 2 deletions cmd/wal/truncate/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package truncate

import (
"log/slog"
"math"

"github.com/spf13/cobra"

"github.com/streamnative/oxia/cmd/wal/common"
"github.com/streamnative/oxia/server/wal"
"log/slog"
"math"
)

type truncateOptions struct {
Expand Down
76 changes: 76 additions & 0 deletions server/leader_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,3 +1289,79 @@ func TestLeaderController_NotificationsDisabled(t *testing.T) {
assert.NoError(t, kvFactory.Close())
assert.NoError(t, walFactory.Close())
}

func TestLeaderController_DuplicateNewTerm_WithSession(t *testing.T) {
var shard int64 = 2

kvFactory, err := kv.NewPebbleKVFactory(testKVOptions)
assert.NoError(t, err)
walFactory := newTestWalFactory(t)

lc, err := NewLeaderController(Config{}, common.DefaultNamespace, shard, newMockRpcClient(), walFactory, kvFactory)
assert.NoError(t, err)

_, err = lc.NewTerm(&proto.NewTermRequest{Shard: shard, Term: 1})
assert.NoError(t, err)

_, err = lc.BecomeLeader(context.Background(), &proto.BecomeLeaderRequest{
Shard: shard,
Term: 1,
ReplicationFactor: 1,
FollowerMaps: nil,
})
assert.NoError(t, err)

csResult, err := lc.CreateSession(&proto.CreateSessionRequest{
Shard: shard,
SessionTimeoutMs: 5_000,
ClientIdentity: "my-identity",
})
assert.NoError(t, err)

sessionId := csResult.SessionId
invalidSessionId := int64(5)

key := "/namespace/sn/system/0xe0000000_0xf0000000"

// Write entry
_, err = lc.Write(context.Background(), &proto.WriteRequest{
Shard: &shard,
Puts: []*proto.PutRequest{{
Key: key,
Value: []byte("value-a"),
SessionId: &invalidSessionId,
}},
})
assert.NoError(t, err)

// Start a new term on the same leader
_, err = lc.NewTerm(&proto.NewTermRequest{Shard: shard, Term: 2})
assert.NoError(t, err)

_, err = lc.BecomeLeader(context.Background(), &proto.BecomeLeaderRequest{
Shard: shard,
Term: 2,
ReplicationFactor: 1,
FollowerMaps: nil,
})
assert.NoError(t, err)

_, err = lc.CloseSession(&proto.CloseSessionRequest{
Shard: shard,
SessionId: sessionId,
})
assert.NoError(t, err)

// Read entry
r := <-lc.Read(context.Background(), &proto.ReadRequest{
Shard: &shard,
Gets: []*proto.GetRequest{{Key: key}},
})

assert.NoError(t, r.Err)
assert.Equal(t, proto.Status_KEY_NOT_FOUND, r.Response.Status)

assert.NoError(t, lc.Close())
assert.NoError(t, kvFactory.Close())
assert.NoError(t, walFactory.Close())
}
29 changes: 17 additions & 12 deletions server/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ type updateCallback struct{}

var SessionUpdateOperationCallback kv.UpdateOperationCallback = &updateCallback{}

func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.PutRequest, _ *proto.StorageEntry) (proto.Status, error) {
func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.PutRequest, existingEntry *proto.StorageEntry) (proto.Status, error) {
var _, closer, err = batch.Get(SessionKey(SessionId(*request.SessionId)))
if err != nil {
if errors.Is(err, kv.ErrKeyNotFound) {
Expand All @@ -320,6 +320,10 @@ func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.Pu
if err = closer.Close(); err != nil {
return proto.Status_SESSION_DOES_NOT_EXIST, err
}
// delete existing session shadow
if status, err := deleteShadow(batch, request.Key, existingEntry); err != nil {
return status, err
}
// Create the session shadow entry
err = batch.Put(ShadowKey(SessionId(*request.SessionId), request.Key), []byte{})
if err != nil {
Expand All @@ -330,26 +334,27 @@ func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.Pu
}

func (c *updateCallback) OnPut(batch kv.WriteBatch, request *proto.PutRequest, existingEntry *proto.StorageEntry) (proto.Status, error) {
if existingEntry != nil && existingEntry.SessionId != nil {
// We are overwriting an ephemeral value, let's delete its shadow
switch {
// override by normal operation
case request.SessionId == nil:
if status, err := deleteShadow(batch, request.Key, existingEntry); err != nil {
return status, err
}
}

sessionId := request.SessionId
if sessionId != nil {
// We are adding an ephemeral value, let's check if the session exists
// override by session operation
case request.SessionId != nil:
return c.OnPutWithinSession(batch, request, existingEntry)
}
return proto.Status_OK, nil
}

func deleteShadow(batch kv.WriteBatch, key string, existingEntry *proto.StorageEntry) (proto.Status, error) {
existingSessionId := SessionId(*existingEntry.SessionId)
err := batch.Delete(ShadowKey(existingSessionId, key))
if err != nil && !errors.Is(err, kv.ErrKeyNotFound) {
return proto.Status_SESSION_DOES_NOT_EXIST, err
// We are overwriting an ephemeral value, let's delete its shadow
if existingEntry != nil && existingEntry.SessionId != nil {
existingSessionId := SessionId(*existingEntry.SessionId)
err := batch.Delete(ShadowKey(existingSessionId, key))
if err != nil && !errors.Is(err, kv.ErrKeyNotFound) {
return proto.Status_SESSION_DOES_NOT_EXIST, err
}
}
return proto.Status_OK, nil
}
Expand Down
76 changes: 69 additions & 7 deletions server/session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) {

writeBatch = mockWriteBatch{
"a/b/c": []byte{},
SessionKey(SessionId(sessionId-1)) + "a%2Fb%2Fc": []byte{},
ShadowKey(SessionId(sessionId-1), "a/b/c"): []byte{},
}

se := &proto.StorageEntry{
Expand All @@ -160,9 +160,9 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) {

writeBatch = mockWriteBatch{
"a/b/c": []byte{},
SessionKey(SessionId(sessionId-1)) + "a%2Fb%2Fc": []byte{},
SessionKey(SessionId(sessionId - 1)): []byte{},
SessionKey(SessionId(sessionId)): []byte{},
ShadowKey(SessionId(sessionId-1), "a/b/c"): []byte{},
SessionKey(SessionId(sessionId - 1)): []byte{},
SessionKey(SessionId(sessionId)): []byte{},
}

status, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, se)
Expand All @@ -178,6 +178,35 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, proto.Status_SESSION_DOES_NOT_EXIST, status)

// session (sessionID -1) entry
tmpSessionId := sessionId - 1
se = &proto.StorageEntry{
Value: []byte("value"),
VersionId: 0,
CreationTimestamp: 0,
ModificationTimestamp: 0,
SessionId: &tmpSessionId,
}
// sessionID has expired
writeBatch = mockWriteBatch{
"a/b/c": []byte{}, // real data
ShadowKey(SessionId(sessionId-1), "a/b/c"): []byte{}, // shadow key
SessionKey(SessionId(sessionId - 1)): []byte{}, // session
}
// try to use current session override the (sessionID -1)
sessionPutRequest = &proto.PutRequest{
Key: "a/b/c",
Value: []byte("b"),
SessionId: &sessionId,
}

status, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, se)
assert.NoError(t, err)
assert.Equal(t, proto.Status_SESSION_DOES_NOT_EXIST, status)
_, closer, err := writeBatch.Get(ShadowKey(SessionId(sessionId-1), "a/b/c"))
assert.NoError(t, err)
closer.Close()

expectedErr := errors.New("error coming from the DB on read")
writeBatch = mockWriteBatch{
SessionKey(SessionId(sessionId)): expectedErr,
Expand All @@ -191,14 +220,14 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) {
status, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, nil)
assert.NoError(t, err)
assert.Equal(t, proto.Status_OK, status)
sessionKey := SessionKey(SessionId(sessionId)) + "/a%2Fb%2Fc"
_, found := writeBatch[sessionKey]
sessionShadowKey := ShadowKey(SessionId(sessionId), "a/b/c")
_, found := writeBatch[sessionShadowKey]
assert.True(t, found)

expectedErr = errors.New("error coming from the DB on write")
writeBatch = mockWriteBatch{
SessionKey(SessionId(sessionId)): []byte{},
sessionKey: expectedErr,
sessionShadowKey: expectedErr,
}
_, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, nil)
assert.ErrorIs(t, err, expectedErr)
Expand Down Expand Up @@ -578,3 +607,36 @@ func reopenLeaderController(t *testing.T, kvFactory kv.Factory, walFactory wal.F

return lc.(*leaderController)
}

func TestSession_PutWithExpiredSession(t *testing.T) {
var oldSessionId int64 = 100
var newSessionId int64 = 101

se := &proto.StorageEntry{
Value: []byte("value"),
VersionId: 0,
CreationTimestamp: 0,
ModificationTimestamp: 0,
SessionId: &oldSessionId,
}
// sessionID has expired
writeBatch := mockWriteBatch{
"a/b/c": []byte{}, // real data
ShadowKey(SessionId(oldSessionId), "a/b/c"): []byte{}, // shadow key
SessionKey(SessionId(oldSessionId)): []byte{}, // session
}
// try to use current session override the (sessionID -1)
sessionPutRequest := &proto.PutRequest{
Key: "a/b/c",
Value: []byte("b"),
SessionId: &newSessionId,
}

status, err := SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, se)
assert.NoError(t, err)
assert.Equal(t, proto.Status_SESSION_DOES_NOT_EXIST, status)

_, closer, err := writeBatch.Get(ShadowKey(SessionId(oldSessionId), "a/b/c"))
assert.NoError(t, err)
assert.NoError(t, closer.Close())
}

0 comments on commit 938b7a5

Please sign in to comment.