Skip to content

Commit

Permalink
xactions: when checking inactivity ("is idle")
Browse files Browse the repository at this point in the history
* part two: revise/simplify on-demand base - remove mutex, etc.
* revert etl test

Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
  • Loading branch information
alex-aizman committed Aug 15, 2023
1 parent 0551460 commit 47343cc
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 59 deletions.
15 changes: 3 additions & 12 deletions ais/test/etl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ func TestETLAnyToAnyBucket(t *testing.T) {
}
tests = []testObjConfig{
{transformer: tetl.Echo, comm: etl.Hpull, onlyLong: true},
{transformer: tetl.MD5, comm: etl.Hrev, onlyLong: true},
{transformer: tetl.MD5, comm: etl.Hpush},
{transformer: tetl.MD5, comm: etl.Hrev},
{transformer: tetl.MD5, comm: etl.Hpush, onlyLong: true},
}
)

Expand Down Expand Up @@ -544,17 +544,8 @@ func testETLBucket(t *testing.T, bp api.BaseParams, etlName string, m *ioContext
err = tetl.WaitForFinished(bp, xid, kind, timeout)
tassert.CheckFatal(t, err)

list, err := api.ListObjects(bp, bckTo, &apc.LsoMsg{Props: apc.GetPropsName}, api.ListArgs{})
list, err := api.ListObjects(bp, bckTo, nil, api.ListArgs{})
tassert.CheckFatal(t, err)

// TODO -- FIXME: remove
if len(list.Entries) < m.num {
tlog.Logf("Warning: ETL[%s]: list-objects %d < %d expected - retrying...\n", etlName, len(list.Entries), m.num)
time.Sleep(13 * time.Second)
list, err = api.ListObjects(bp, bckTo, &apc.LsoMsg{Props: apc.GetPropsName}, api.ListArgs{})
tassert.CheckFatal(t, err)
}

tassert.Errorf(t, len(list.Entries) == m.num, "expected %d objects, got %d", m.num, len(list.Entries))

checkETLStats(t, xid, m.num, m.fileSize*uint64(m.num), skipByteStats)
Expand Down
72 changes: 25 additions & 47 deletions xact/demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package xact

import (
"sync"
"time"

"github.com/NVIDIA/aistore/cluster"
Expand Down Expand Up @@ -37,14 +36,12 @@ type (
idle struct {
ticks cos.StopCh
d time.Duration // hk idle
last int64 // mono.NanoTime
last atomic.Int64 // mono.NanoTime
}

Base

pending int64
active int64
mu sync.RWMutex
pending atomic.Int64
hkReg atomic.Bool // mono.NanoTime
}
)
Expand All @@ -54,64 +51,54 @@ type (
////////////////

// NOTE: override `Base.IsIdle`
func (r *DemandBase) IsIdle() bool { return r.likelyIdle() }
func (r *DemandBase) IsIdle() bool {
last := r.idle.last.Load()
return last != 0 && mono.Since(last) >= cos.MaxDuration(cmn.Timeout.MaxKeepalive(), 2*time.Second)
}

func (r *DemandBase) Init(uuid, kind string, bck *meta.Bck, idle time.Duration) (xdb *DemandBase) {
func (r *DemandBase) Init(uuid, kind string, bck *meta.Bck, idle time.Duration) {
r.hkName = kind + "/" + uuid
r.idle.d = idleDefault
if idle > 0 {
r.idle.d = idle
}
r.idle.ticks.Init()
r.InitBase(uuid, kind, bck)
r._initIdle()
return
}

func (r *DemandBase) _initIdle() {
r.active++
r.idle.last = mono.NanoTime()
r.idle.last.Store(mono.NanoTime())
r.hkReg.Store(true)
hk.Reg(r.hkName+hk.NameSuffix, r.hkcb, 0 /*time.Duration*/)
}

func (r *DemandBase) hkcb() time.Duration {
r.mu.Lock()
if r.active == 0 {
r.idle.ticks.Close() // signals the parent to finish and exit
last := r.idle.last.Load()
if last != 0 && mono.Since(last) >= r.idle.d {
// signal parent xaction via IdleTimer() chan
// to finish and exit
r.idle.ticks.Close()
}
r.active = 0
r.mu.Unlock()
return r.idle.d
}

func (r *DemandBase) IdleTimer() <-chan struct{} { return r.idle.ticks.Listen() }

func (r *DemandBase) Pending() (cnt int64) {
r.mu.RLock()
cnt = r.pending
r.mu.RUnlock()
return
}
func (r *DemandBase) DecPending() { r.SubPending(1) }
func (r *DemandBase) Pending() (cnt int64) { return r.pending.Load() }
func (r *DemandBase) DecPending() { r.SubPending(1) }

func (r *DemandBase) IncPending() {
debug.Assert(r.hkReg.Load())
r.mu.Lock()
r.pending++
r.idle.last = 0
r.active++
r.mu.Unlock()
r.pending.Inc()
r.idle.last.Store(0)
}

func (r *DemandBase) SubPending(n int) {
r.mu.Lock()
r.pending -= int64(n)
debug.Assert(r.pending >= 0)
if r.pending == 0 {
r.idle.last = mono.NanoTime()
if n == 0 {
return
}
pending := r.pending.Sub(int64(n))
debug.Assert(pending >= 0)
if pending == 0 {
r.idle.last.Store(mono.NanoTime())
}
r.mu.Unlock()
}

func (r *DemandBase) Stop() {
Expand All @@ -120,20 +107,11 @@ func (r *DemandBase) Stop() {
}

func (r *DemandBase) Abort(err error) (ok bool) {
if err == nil && !r.likelyIdle() {
if err == nil && !r.IsIdle() {
err = cmn.NewErrAborted(r.Name(), "aborting non-idle", nil)
}
if ok = r.Base.Abort(err); ok {
r.Finish()
}
return
}

// private: on-demand quiescence

func (r *DemandBase) likelyIdle() bool {
r.mu.RLock()
last := r.idle.last
r.mu.RUnlock()
return mono.Since(last) >= cos.MaxDuration(cmn.Timeout.MaxKeepalive(), 2*time.Second)
}

0 comments on commit 47343cc

Please sign in to comment.