Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disk: block for 1s when conflicting #1190

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 109 additions & 54 deletions pkg/disk/attachdetach_slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package disk
import (
"context"
"sync"
"sync/atomic"
"time"
)

type AttachDetachSlots interface {
Expand Down Expand Up @@ -41,15 +43,76 @@ func (a *PerNodeSlots) GetSlotFor(node string) adSlot {
}

type adSlot interface {
Attach() slot
Detach() slot
Attach() blockableSlot
Detach() blockableSlot
}

type slot interface {
Aquire(ctx context.Context) error
Release()
}

type blockableSlot interface {
slot
Block(until time.Time)
}

type blockable struct {
until atomic.Pointer[time.Time]
}

func (s *blockable) Block(until time.Time) {
for {
old := s.until.Load()
if old != nil && old.After(until) {
return
}
swapped := s.until.CompareAndSwap(old, &until)
if swapped {
return
}
}
}

func (s *blockable) Aquire(ctx context.Context) error {
for {
until := s.until.Load()
if until == nil {
return nil
}
select {
case <-time.After(-time.Since(*until)):
swapped := s.until.CompareAndSwap(until, nil)
if swapped {
return nil
} // if not, the block time is extended while we are waiting. Continue waiting.
case <-ctx.Done():
return ctx.Err()
}
}
}

type independentBlockableSlot[TSlot slot] struct {
slot TSlot
blockable
}

func (s *independentBlockableSlot[TSlot]) Aquire(ctx context.Context) error {
err := s.slot.Aquire(ctx)
if err != nil {
return err
}
return s.blockable.Aquire(ctx)
}

func (s *independentBlockableSlot[TSlot]) Release() {
s.slot.Release()
}

func newBlockable[TSlot slot](slot TSlot) *independentBlockableSlot[TSlot] {
return &independentBlockableSlot[TSlot]{slot: slot}
}

type serialADSlot struct {
// slot is a buffered channel with size 1.
// The buffer is filled if and only if an attach or detach is in progress.
Expand All @@ -58,6 +121,8 @@ type serialADSlot struct {
// highPriorityChannel is a channel without buffer.
// detach requests are fulfilled from this channel first.
highPriorityChannel chan struct{}

blockable
}

type serialAD_DetachSlot struct{ *serialADSlot }
Expand All @@ -69,12 +134,11 @@ func (s serialAD_DetachSlot) Aquire(ctx context.Context) error {
}
select {
case s.highPriorityChannel <- struct{}{}:
return nil
case s.slot <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
return s.blockable.Aquire(ctx)
}

func (s serialAD_AttachSlot) Aquire(ctx context.Context) error {
Expand All @@ -83,10 +147,10 @@ func (s serialAD_AttachSlot) Aquire(ctx context.Context) error {
}
select {
case s.slot <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
return s.blockable.Aquire(ctx)
}

func (s *serialADSlot) Release() {
Expand All @@ -99,84 +163,75 @@ func (s *serialADSlot) Release() {
}
}

func (s *serialADSlot) Detach() slot { return serialAD_DetachSlot{s} }
func (s *serialADSlot) Attach() slot { return serialAD_AttachSlot{s} }

type parallelSlot struct{}
func (s *serialADSlot) Detach() blockableSlot { return serialAD_DetachSlot{s} }
func (s *serialADSlot) Attach() blockableSlot { return serialAD_AttachSlot{s} }

func (s parallelSlot) GetSlotFor(node string) adSlot { return s }

func (parallelSlot) Detach() slot { return noOpSlot{} }
func (parallelSlot) Attach() slot { return noOpSlot{} }

type noOpSlot struct{}

func (noOpSlot) Aquire(ctx context.Context) error { return ctx.Err() }
func (noOpSlot) Release() {}

type serialOneDirSlot struct {
parallelSlot
serial serialSlot
type maxConcurrentSlot struct {
slots chan struct{}
}

type serialDetachSlot serialOneDirSlot
type serialAttachSlot serialOneDirSlot

func (s serialDetachSlot) Detach() slot { return s.serial }
func (s serialAttachSlot) Attach() slot { return s.serial }

type serialSlot struct {
// slot is a buffered channel with size 1.
// The buffer is filled if and only if a detach is in progress.
slot chan struct{}
func newMaxConcurrentSlot(maxConcurrency int) maxConcurrentSlot {
return maxConcurrentSlot{
slots: make(chan struct{}, maxConcurrency),
}
}

func (s serialSlot) Aquire(ctx context.Context) error {
func (s maxConcurrentSlot) Aquire(ctx context.Context) error {
if ctx.Err() != nil {
return ctx.Err()
}
select {
case s.slot <- struct{}{}:
case s.slots <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (s serialSlot) Release() {
<-s.slot
func (s maxConcurrentSlot) Release() {
<-s.slots
}

func makeOneSideSlot() serialOneDirSlot {
return serialOneDirSlot{
serial: serialSlot{
slot: make(chan struct{}, 1),
},
}
type noOpSlot struct{}

func (noOpSlot) Aquire(ctx context.Context) error { return ctx.Err() }
func (noOpSlot) Release() {}

type independentSlot struct {
attach blockableSlot
detach blockableSlot
}

func NewSlots(serialDetach, serialAttach bool) AttachDetachSlots {
if !serialAttach && !serialDetach {
return parallelSlot{}
func (s *independentSlot) Detach() blockableSlot { return s.detach }
func (s *independentSlot) Attach() blockableSlot { return s.attach }

func makeOneSide(concurrency int) blockableSlot {
if concurrency == 0 {
return newBlockable(noOpSlot{})
}
return newBlockable(newMaxConcurrentSlot(concurrency))
}

// NewSlots returns a new AttachDetachSlots that allows up to
// detachConcurrency detach operations and attachConcurrency attach operations in parallel.
// 0 means no limitation on concurrency.
// As a special case, if both values are 1, only one of them can be in progress at a time.
func NewSlots(detachConcurrency, attachConcurrency int) AttachDetachSlots {
var makeSlot func() adSlot
if serialDetach && serialAttach {
if detachConcurrency == 1 && attachConcurrency == 1 {
makeSlot = func() adSlot {
return &serialADSlot{
highPriorityChannel: make(chan struct{}),
slot: make(chan struct{}, 1),
}
}
} else if serialDetach {
makeSlot = func() adSlot {
return serialDetachSlot(makeOneSideSlot())
}
} else if serialAttach {
} else {
makeSlot = func() adSlot {
return serialAttachSlot(makeOneSideSlot())
return &independentSlot{
attach: makeOneSide(attachConcurrency),
detach: makeOneSide(detachConcurrency),
}
}
} else {
panic("unreachable")
}
return NewPerNodeSlots(makeSlot)
}
Loading