diff --git a/ais/rebmeta.go b/ais/rebmeta.go index fd42063eb9..88f7905343 100644 --- a/ais/rebmeta.go +++ b/ais/rebmeta.go @@ -218,7 +218,7 @@ func (m *rmdModifier) log(nl nl.Listener) { var ( err = nl.Err() abrt = nl.Aborted() - name = "rebalance[" + nl.UUID() + "] " + name = "rebalance[" + nl.UUID() + "]" ) switch { case err == nil && !abrt: diff --git a/ais/test/dsort_test.go b/ais/test/dsort_test.go index c02bf46990..a3b7dbbb6c 100644 --- a/ais/test/dsort_test.go +++ b/ais/test/dsort_test.go @@ -282,7 +282,7 @@ func (df *dsortFramework) start() { func (df *dsortFramework) createInputShards() { const tmpDir = "/tmp" var ( - wg = cos.NewLimitedWaitGroup(40, 0) + wg = cos.NewLimitedWaitGroup(sys.NumCPU(), 0) errCh = make(chan error, df.shardCnt) mu = &sync.Mutex{} // to collect inputShards (obj names) @@ -1830,10 +1830,9 @@ func TestDsortMissingShards(t *testing.T) { } } -// TODO -- FIXME: fails archive.ExtTarLz4 & archive.ExtTarGz func TestDsortDuplications(t *testing.T) { tools.CheckSkip(t, tools.SkipTestArgs{Long: true}) - for _, ext := range []string{archive.ExtTar, archive.ExtZip} { + for _, ext := range []string{archive.ExtTar, archive.ExtTarLz4, archive.ExtTarGz, archive.ExtZip} { // all supported formats t.Run(ext, func(t *testing.T) { runDSortTest( t, dsortTestSpec{ diff --git a/cmn/archive/read.go b/cmn/archive/read.go index e22f557f78..2fc73cbc35 100644 --- a/cmn/archive/read.go +++ b/cmn/archive/read.go @@ -135,14 +135,17 @@ func (tgr *tgzReader) init(fh io.Reader) (err error) { // - when the method returns non-nil reader the responsibility to close the latter goes to the caller (via reader.Close) // - otherwise, gzip.Reader is closed upon return // currently, non-nil reader is returned iff filename != "" (indicating extraction of a single named file) -func (tgr *tgzReader) Range(filename string, rcb ReadCB) (reader cos.ReadCloseSizer, err error) { - reader, err = tgr.tr.Range(filename, rcb) - if err == nil && reader != nil { +func (tgr *tgzReader) Range(filename string, rcb ReadCB) (cos.ReadCloseSizer, error) { + reader, err := tgr.tr.Range(filename, rcb) + if err != nil { + tgr.gzr.Close() + return reader, err + } + if reader != nil { csc := &cslClose{gzr: tgr.gzr /*to close*/, R: reader /*to read from*/, N: reader.Size()} return csc, err } - err = tgr.gzr.Close() - return + return nil, tgr.gzr.Close() } // zipReader diff --git a/cmn/cos/sync.go b/cmn/cos/sync.go index 35b8864e19..dbee479a47 100644 --- a/cmn/cos/sync.go +++ b/cmn/cos/sync.go @@ -244,9 +244,10 @@ func (s *DynSemaphore) Release(cnts ...int) { // LimitedWaitGroup // ////////////////////// -// usage: no more than `limit` goroutines in parallel -func NewLimitedWaitGroup(limit, have int) WG { - if have == 0 || have > limit { +// usage: no more than `limit` (e.g., sys.NumCPU()) goroutines in parallel +func NewLimitedWaitGroup(limit, want int) WG { + debug.Assert(limit > 0 || want > 0) + if want == 0 || want > limit { return &LimitedWaitGroup{wg: &sync.WaitGroup{}, sema: NewDynSemaphore(limit)} } return &sync.WaitGroup{} diff --git a/ext/dsort/extract/rcb.go b/ext/dsort/extract/rcb.go index ea67ef3146..03a278ddad 100644 --- a/ext/dsort/extract/rcb.go +++ b/ext/dsort/extract/rcb.go @@ -79,14 +79,15 @@ func (c *rcbCtx) xtar(_ string, reader cos.ReadCloseSizer, hdr any) (bool /*stop } size, err := c.extractor.RecordWithBuffer(args) - if err == nil { - c.extractedSize += size - c.extractedCount++ - // .tar format pads all block to 512 bytes - c.offset += cos.CeilAlignInt64(header.Size, archive.TarBlockSize) - } reader.Close() - return err != nil /*stop*/, err + if err != nil { + return true /*stop*/, err + } + debug.Assert(size > 0) + c.extractedSize += size + c.extractedCount++ + c.offset += cos.CeilAlignInt64(header.Size, archive.TarBlockSize) // .tar padding + return false, nil } // handles .zip diff --git a/ext/dsort/extract/recm.go b/ext/dsort/extract/recm.go index 1e6d627130..d72ce5b422 100644 --- a/ext/dsort/extract/recm.go +++ b/ext/dsort/extract/recm.go @@ -102,26 +102,24 @@ func NewRecordManager(t cluster.Target, bck cmn.Bck, extractCreator Creator, func (recm *RecordManager) RecordWithBuffer(args extractRecordArgs) (size int64, err error) { var ( - storeType string - contentPath string - fullContentPath string - mdSize int64 - + storeType string + contentPath string + fullContentPath string + mdSize int64 ext = cosExt(args.recordName) recordUniqueName = genRecordUname(args.shardName, args.recordName) ) - // If the content already exists we should skip it but raise an error - // (caller must handle it properly). + // handle record duplications (see m.react) if recm.Records.Exists(recordUniqueName, ext) { msg := fmt.Sprintf("record %q is duplicated", args.recordName) recm.Records.DeleteDup(recordUniqueName, ext) - // NOTE: no need to remove anything from `recm.extractionPaths` - // or `recm.contents` since it'll be removed anyway during subsequent cleanup. - // The assumption is that there will be not too many duplicates and we can live - // with a few extra files/memory. - return 0, recm.onDuplicatedRecords(msg) + err = recm.onDuplicatedRecords(msg) + if err != nil { + return 0, err // react: abort + } + // react: ignore or warn } debug.Assert(!args.extractMethod.Has(ExtractToWriter) || args.w != nil) diff --git a/ext/dsort/extract/targz.go b/ext/dsort/extract/targz.go index ec33aacb3a..c2c75e7f30 100644 --- a/ext/dsort/extract/targz.go +++ b/ext/dsort/extract/targz.go @@ -49,9 +49,12 @@ func (t *targzRW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Record c.buf = buf _, err = ar.Range("", c.xtar) - slab.Free(buf) - c.tw.Close() + if err == nil { + cos.Close(c.tw) + } else { + _ = c.tw.Close() + } cos.Close(wfh) return c.extractedSize, c.extractedCount, err } diff --git a/ext/dsort/extract/tarlz4.go b/ext/dsort/extract/tarlz4.go index 652108439d..642d80d228 100644 --- a/ext/dsort/extract/tarlz4.go +++ b/ext/dsort/extract/tarlz4.go @@ -50,7 +50,11 @@ func (t *tarlz4RW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Recor _, err = ar.Range("", c.xtar) slab.Free(buf) - c.tw.Close() + if err == nil { + cos.Close(c.tw) + } else { + _ = c.tw.Close() + } cos.Close(wfh) return c.extractedSize, c.extractedCount, err diff --git a/tools/tassert/tassert.go b/tools/tassert/tassert.go index ad2b9c17fc..f5effa4202 100644 --- a/tools/tassert/tassert.go +++ b/tools/tassert/tassert.go @@ -28,7 +28,7 @@ func CheckFatal(tb testing.TB, err error) { mu.Lock() if _, ok := fatalities[tb.Name()]; ok { mu.Unlock() - fmt.Printf("--- %s: duplicate CheckFatal\n", tb.Name()) // see #1057 + fmt.Printf("--- %s: duplicate CheckFatal: %v\n", tb.Name(), err) // see #1057 runtime.Goexit() } else { fatalities[tb.Name()] = struct{}{}