Skip to content

Commit

Permalink
dsort: duplicated records (full coverage & stress); fixes
Browse files Browse the repository at this point in the history
* part twelve, prev. commit: 3789c4d

Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
  • Loading branch information
alex-aizman committed Aug 11, 2023
1 parent 3789c4d commit abdfd7b
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 35 deletions.
2 changes: 1 addition & 1 deletion ais/rebmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (m *rmdModifier) log(nl nl.Listener) {
var (
err = nl.Err()
abrt = nl.Aborted()
name = "rebalance[" + nl.UUID() + "] "
name = "rebalance[" + nl.UUID() + "]"
)
switch {
case err == nil && !abrt:
Expand Down
5 changes: 2 additions & 3 deletions ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (df *dsortFramework) start() {
func (df *dsortFramework) createInputShards() {
const tmpDir = "/tmp"
var (
wg = cos.NewLimitedWaitGroup(40, 0)
wg = cos.NewLimitedWaitGroup(sys.NumCPU(), 0)
errCh = make(chan error, df.shardCnt)

mu = &sync.Mutex{} // to collect inputShards (obj names)
Expand Down Expand Up @@ -1830,10 +1830,9 @@ func TestDsortMissingShards(t *testing.T) {
}
}

// TODO -- FIXME: fails archive.ExtTarLz4 & archive.ExtTarGz
func TestDsortDuplications(t *testing.T) {
tools.CheckSkip(t, tools.SkipTestArgs{Long: true})
for _, ext := range []string{archive.ExtTar, archive.ExtZip} {
for _, ext := range []string{archive.ExtTar, archive.ExtTarLz4, archive.ExtTarGz, archive.ExtZip} { // all supported formats
t.Run(ext, func(t *testing.T) {
runDSortTest(
t, dsortTestSpec{
Expand Down
13 changes: 8 additions & 5 deletions cmn/archive/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,17 @@ func (tgr *tgzReader) init(fh io.Reader) (err error) {
// - when the method returns non-nil reader the responsibility to close the latter goes to the caller (via reader.Close)
// - otherwise, gzip.Reader is closed upon return
// currently, non-nil reader is returned iff filename != "" (indicating extraction of a single named file)
func (tgr *tgzReader) Range(filename string, rcb ReadCB) (reader cos.ReadCloseSizer, err error) {
reader, err = tgr.tr.Range(filename, rcb)
if err == nil && reader != nil {
func (tgr *tgzReader) Range(filename string, rcb ReadCB) (cos.ReadCloseSizer, error) {
reader, err := tgr.tr.Range(filename, rcb)
if err != nil {
tgr.gzr.Close()
return reader, err
}
if reader != nil {
csc := &cslClose{gzr: tgr.gzr /*to close*/, R: reader /*to read from*/, N: reader.Size()}
return csc, err
}
err = tgr.gzr.Close()
return
return nil, tgr.gzr.Close()
}

// zipReader
Expand Down
7 changes: 4 additions & 3 deletions cmn/cos/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,10 @@ func (s *DynSemaphore) Release(cnts ...int) {
// LimitedWaitGroup //
//////////////////////

// usage: no more than `limit` goroutines in parallel
func NewLimitedWaitGroup(limit, have int) WG {
if have == 0 || have > limit {
// usage: no more than `limit` (e.g., sys.NumCPU()) goroutines in parallel
func NewLimitedWaitGroup(limit, want int) WG {
debug.Assert(limit > 0 || want > 0)
if want == 0 || want > limit {
return &LimitedWaitGroup{wg: &sync.WaitGroup{}, sema: NewDynSemaphore(limit)}
}
return &sync.WaitGroup{}
Expand Down
15 changes: 8 additions & 7 deletions ext/dsort/extract/rcb.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,15 @@ func (c *rcbCtx) xtar(_ string, reader cos.ReadCloseSizer, hdr any) (bool /*stop
}

size, err := c.extractor.RecordWithBuffer(args)
if err == nil {
c.extractedSize += size
c.extractedCount++
// .tar format pads all block to 512 bytes
c.offset += cos.CeilAlignInt64(header.Size, archive.TarBlockSize)
}
reader.Close()
return err != nil /*stop*/, err
if err != nil {
return true /*stop*/, err
}
debug.Assert(size > 0)
c.extractedSize += size
c.extractedCount++
c.offset += cos.CeilAlignInt64(header.Size, archive.TarBlockSize) // .tar padding
return false, nil
}

// handles .zip
Expand Down
22 changes: 10 additions & 12 deletions ext/dsort/extract/recm.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,24 @@ func NewRecordManager(t cluster.Target, bck cmn.Bck, extractCreator Creator,

func (recm *RecordManager) RecordWithBuffer(args extractRecordArgs) (size int64, err error) {
var (
storeType string
contentPath string
fullContentPath string
mdSize int64

storeType string
contentPath string
fullContentPath string
mdSize int64
ext = cosExt(args.recordName)
recordUniqueName = genRecordUname(args.shardName, args.recordName)
)

// If the content already exists we should skip it but raise an error
// (caller must handle it properly).
// handle record duplications (see m.react)
if recm.Records.Exists(recordUniqueName, ext) {
msg := fmt.Sprintf("record %q is duplicated", args.recordName)
recm.Records.DeleteDup(recordUniqueName, ext)

// NOTE: no need to remove anything from `recm.extractionPaths`
// or `recm.contents` since it'll be removed anyway during subsequent cleanup.
// The assumption is that there will be not too many duplicates and we can live
// with a few extra files/memory.
return 0, recm.onDuplicatedRecords(msg)
err = recm.onDuplicatedRecords(msg)
if err != nil {
return 0, err // react: abort
}
// react: ignore or warn
}

debug.Assert(!args.extractMethod.Has(ExtractToWriter) || args.w != nil)
Expand Down
7 changes: 5 additions & 2 deletions ext/dsort/extract/targz.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ func (t *targzRW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Record
c.buf = buf

_, err = ar.Range("", c.xtar)

slab.Free(buf)
c.tw.Close()
if err == nil {
cos.Close(c.tw)
} else {
_ = c.tw.Close()
}
cos.Close(wfh)
return c.extractedSize, c.extractedCount, err
}
Expand Down
6 changes: 5 additions & 1 deletion ext/dsort/extract/tarlz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ func (t *tarlz4RW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Recor
_, err = ar.Range("", c.xtar)

slab.Free(buf)
c.tw.Close()
if err == nil {
cos.Close(c.tw)
} else {
_ = c.tw.Close()
}
cos.Close(wfh)

return c.extractedSize, c.extractedCount, err
Expand Down
2 changes: 1 addition & 1 deletion tools/tassert/tassert.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func CheckFatal(tb testing.TB, err error) {
mu.Lock()
if _, ok := fatalities[tb.Name()]; ok {
mu.Unlock()
fmt.Printf("--- %s: duplicate CheckFatal\n", tb.Name()) // see #1057
fmt.Printf("--- %s: duplicate CheckFatal: %v\n", tb.Name(), err) // see #1057
runtime.Goexit()
} else {
fatalities[tb.Name()] = struct{}{}
Expand Down

0 comments on commit abdfd7b

Please sign in to comment.