Skip to content

Commit

Permalink
disk/ad_slot: return better GRPC error message
Browse files Browse the repository at this point in the history
expected new message shown in Kubernetes events:
failed to reserve node i-xxxx for attach: still waiting for other disk(s) to finish attach/detach
  • Loading branch information
huww98 committed Nov 17, 2024
1 parent 1848fe3 commit e59678c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
24 changes: 21 additions & 3 deletions pkg/disk/attachdetach_slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package disk

import (
"context"
"errors"
"sync"
)

Expand Down Expand Up @@ -73,7 +74,7 @@ func (s serialAD_DetachSlot) Aquire(ctx context.Context) error {
case s.slot <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
return maybeWaitingAD(ctx.Err())
}
}

Expand All @@ -85,7 +86,7 @@ func (s serialAD_AttachSlot) Aquire(ctx context.Context) error {
case s.slot <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
return maybeWaitingAD(ctx.Err())
}
}

Expand Down Expand Up @@ -139,7 +140,7 @@ func (s serialSlot) Aquire(ctx context.Context) error {
case s.slot <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
return maybeWaitingAD(ctx.Err())
}
}

Expand Down Expand Up @@ -180,3 +181,20 @@ func NewSlots(serialDetach, serialAttach bool) AttachDetachSlots {
}
return NewPerNodeSlots(makeSlot)
}

type waitingAD struct{}

func (waitingAD) Error() string {
return "still waiting for other disk(s) to finish attach/detach"
}

func (waitingAD) Is(target error) bool {
return target == context.DeadlineExceeded
}

func maybeWaitingAD(err error) error {
if errors.Is(err, context.DeadlineExceeded) {
return waitingAD{}
}
return err
}
14 changes: 14 additions & 0 deletions pkg/disk/attachdetach_slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestDetachPriority(t *testing.T) {
Expand Down Expand Up @@ -144,3 +146,15 @@ func TestSerialDetach_NoRace(t *testing.T) {
t.Fatal("state not updated")
}
}

func TestWaitingADError(t *testing.T) {
s := NewSlots(true, false).GetSlotFor("node1").Detach()
ctx := context.Background()
assert.NoError(t, s.Aquire(ctx))

ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
err := s.Aquire(ctx)
assert.ErrorIs(t, err, waitingAD{})
assert.ErrorIs(t, err, context.DeadlineExceeded)
}
4 changes: 2 additions & 2 deletions pkg/disk/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func attachDisk(ctx context.Context, tenantUserUID, diskID, nodeID string, isSha

slot := GlobalConfigVar.AttachDetachSlots.GetSlotFor(nodeID).Attach()
if err := slot.Aquire(ctx); err != nil {
return "", status.Errorf(codes.Aborted, "AttachDisk: get ad-slot for disk %s failed: %v", diskID, err)
return "", fmt.Errorf("failed to reserve node %s for attach: %w", nodeID, err)
}
defer slot.Release()

Expand Down Expand Up @@ -440,7 +440,7 @@ func detachDisk(ctx context.Context, ecsClient *ecs.Client, diskID, nodeID strin
// NodeStageVolume/NodeUnstageVolume should be called by sequence
slot := GlobalConfigVar.AttachDetachSlots.GetSlotFor(nodeID).Detach()
if err := slot.Aquire(ctx); err != nil {
return status.Errorf(codes.Aborted, "DetachDisk: get ad-slot for disk %s failed: %v", diskID, err)
return fmt.Errorf("failed to reserve node %s for detach: %w", nodeID, err)
}
defer slot.Release()

Expand Down

0 comments on commit e59678c

Please sign in to comment.