Skip to content

Commit

Permalink
dsort: add is-compressed; refactor dsort-mem
Browse files Browse the repository at this point in the history
* part nine, prev. commit 5427156

Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
  • Loading branch information
alex-aizman committed Aug 9, 2023
1 parent 27b3c26 commit 6ff1e65
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 74 deletions.
2 changes: 1 addition & 1 deletion ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (df *dsortFramework) checkOutputShards(zeros int) {
}

_, err := api.GetObject(baseParams, bucket, shardName, &getArgs)
if err != nil && df.extension == archive.ExtZip && i > df.outputShardCnt/2 {
if err != nil && archive.IsCompressed(df.extension) && i > df.outputShardCnt/2 {
// We estimated too much output shards to be produced - zip compression
// was so good that we could fit more files inside the shard.
//
Expand Down
11 changes: 10 additions & 1 deletion cmn/archive/mime.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ import (
"strings"

"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/memsys"
)

// supported archive types (file extensions); see also archExts in cmd/cli/cli/const.go
// NOTE: when adding/removing update:
// - FileExtensions
// - IsCompressed
// - allMagics
const (
ExtTar = ".tar"
ExtTgz = ".tgz"
Expand All @@ -41,9 +46,13 @@ type detect struct {
offset int
}

// when adding/removing update `allMagics` below
var FileExtensions = []string{ExtTar, ExtTgz, ExtTarGz, ExtZip, ExtTarLz4}

func IsCompressed(mime string) bool {
debug.Assert(cos.StringInSlice(mime, FileExtensions), mime)
return mime != ExtTar
}

// standard file signatures
var (
magicTar = detect{offset: 257, sig: []byte("ustar"), mime: ExtTar}
Expand Down
4 changes: 2 additions & 2 deletions ext/dsort/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func (m *Manager) start() (err error) {
if curTargetIsFinal {
// assuming uniform distribution estimate avg. output shard size
ratio := m.compressionRatio()
debug.Assertf(m.pars.InputExtension != archive.ExtTar || ratio == 1,
"tar ratio=%f, ext=%q", ratio, m.pars.InputExtension)
debug.Assertf(archive.IsCompressed(m.pars.InputExtension) || ratio == 1, "tar ratio=%f, ext=%q",
ratio, m.pars.InputExtension)

shardSize := int64(float64(m.pars.OutputShardSize) / ratio)

Expand Down
4 changes: 2 additions & 2 deletions ext/dsort/dsort_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ func (ds *dsorterGeneral) createShardsLocally() (err error) {

group, ctx := errgroup.WithContext(context.Background())

CreateAllShards:
outer:
for _, s := range phaseInfo.metadata.Shards {
select {
case <-ds.m.listenAborted():
_ = group.Wait()
return newDSortAbortedError(ds.m.ManagerUUID)
case <-ctx.Done():
break CreateAllShards // context was canceled, therefore we have an error
break outer // context was canceled, therefore we have an error
default:
}

Expand Down
152 changes: 84 additions & 68 deletions ext/dsort/dsort_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,9 @@ func (ds *dsorterMem) createShardsLocally() error {

ds.creationPhase.adjuster.read.start()
ds.creationPhase.adjuster.write.start()
defer func() {
ds.creationPhase.adjuster.write.stop()
ds.creationPhase.adjuster.read.stop()
}()

metrics := ds.m.Metrics.Creation
metrics.begin()
defer metrics.finish()
metrics.mu.Lock()
metrics.ToCreate = int64(len(phaseInfo.metadata.Shards))
metrics.mu.Unlock()
Expand All @@ -335,7 +330,14 @@ func (ds *dsorterMem) createShardsLocally() error {
stopCh = &cos.StopCh{}
)
stopCh.Init()
defer stopCh.Close()

// cleanup
defer func(metrics *ShardCreation, stopCh *cos.StopCh) {
stopCh.Close()
metrics.finish()
ds.creationPhase.adjuster.write.stop()
ds.creationPhase.adjuster.read.stop()
}(metrics, stopCh)

if err := mem.Get(); err != nil {
return err
Expand All @@ -346,73 +348,15 @@ func (ds *dsorterMem) createShardsLocally() error {
// read
wg.Add(1)
go func() {
defer wg.Done()
group, ctx := errgroup.WithContext(context.Background())
SendAllShards:
for {
// If that was last shard to send we need to break and we will
// be waiting for the result.
if len(phaseInfo.metadata.SendOrder) == 0 {
break SendAllShards
}

select {
case shardName := <-ds.creationPhase.requestedShards:
shard, ok := phaseInfo.metadata.SendOrder[shardName]
if !ok {
break
}

ds.creationPhase.adjuster.read.acquireGoroutineSema()
es := &dsmExtractShard{ds, shard}
group.Go(es.do)

delete(phaseInfo.metadata.SendOrder, shardName)
case <-ds.m.listenAborted():
stopCh.Close()
group.Wait()
errCh <- newDSortAbortedError(ds.m.ManagerUUID)
return
case <-ctx.Done(): // context was canceled, therefore we have an error
stopCh.Close()
break SendAllShards
case <-stopCh.Listen(): // writing side stopped we need to do the same
break SendAllShards
}
}

errCh <- group.Wait()
ds.localRead(stopCh, errCh)
wg.Done()
}()

// write
wg.Add(1)
go func() {
defer wg.Done()
group, ctx := errgroup.WithContext(context.Background())
CreateAllShards:
for _, s := range phaseInfo.metadata.Shards {
select {
case <-ds.m.listenAborted():
stopCh.Close()
group.Wait()
errCh <- newDSortAbortedError(ds.m.ManagerUUID)
return
case <-ctx.Done(): // context was canceled, therefore we have an error
stopCh.Close()
break CreateAllShards
case <-stopCh.Listen():
break CreateAllShards // reading side stopped we need to do the same
default:
}

sa.alloc(uint64(s.Size))

ds.creationPhase.adjuster.write.acquireGoroutineSema()
cs := &dsmCreateShard{ds, s, sa}
group.Go(cs.do)
}

errCh <- group.Wait()
ds.localWrite(sa, stopCh, errCh)
wg.Done()
}()

wg.Wait()
Expand All @@ -426,6 +370,78 @@ func (ds *dsorterMem) createShardsLocally() error {
return nil
}

func (ds *dsorterMem) localRead(stopCh *cos.StopCh, errCh chan error) {
var (
phaseInfo = &ds.m.creationPhase
group, ctx = errgroup.WithContext(context.Background())
)
outer:
for {
// If that was the last shard to send we need to break and we will
// be waiting for the result.
if len(phaseInfo.metadata.SendOrder) == 0 {
break outer
}

select {
case shardName := <-ds.creationPhase.requestedShards:
shard, ok := phaseInfo.metadata.SendOrder[shardName]
if !ok {
break
}

ds.creationPhase.adjuster.read.acquireGoroutineSema()
es := &dsmExtractShard{ds, shard}
group.Go(es.do)

delete(phaseInfo.metadata.SendOrder, shardName)
case <-ds.m.listenAborted():
stopCh.Close()
group.Wait()
errCh <- newDSortAbortedError(ds.m.ManagerUUID)
return
case <-ctx.Done(): // context was canceled, therefore we have an error
stopCh.Close()
break outer
case <-stopCh.Listen(): // writing side stopped we need to do the same
break outer
}
}

errCh <- group.Wait()
}

func (ds *dsorterMem) localWrite(sa *inmemShardAllocator, stopCh *cos.StopCh, errCh chan error) {
var (
phaseInfo = &ds.m.creationPhase
group, ctx = errgroup.WithContext(context.Background())
)
outer:
for _, s := range phaseInfo.metadata.Shards {
select {
case <-ds.m.listenAborted():
stopCh.Close()
group.Wait()
errCh <- newDSortAbortedError(ds.m.ManagerUUID)
return
case <-ctx.Done(): // context was canceled, therefore we have an error
stopCh.Close()
break outer
case <-stopCh.Listen():
break outer // reading side stopped we need to do the same
default:
}

sa.alloc(uint64(s.Size))

ds.creationPhase.adjuster.write.acquireGoroutineSema()
cs := &dsmCreateShard{ds, s, sa}
group.Go(cs.do)
}

errCh <- group.Wait()
}

func (ds *dsorterMem) sendRecordObj(rec *extract.Record, obj *extract.RecordObj, toNode *meta.Snode) (err error) {
var (
local = toNode.ID() == ds.m.ctx.node.ID()
Expand Down

0 comments on commit 6ff1e65

Please sign in to comment.