Skip to content

Commit

Permalink
Fix data race in FST Segment reads (#938)
Browse files Browse the repository at this point in the history
  • Loading branch information
prateek authored Sep 25, 2018
1 parent afe161b commit eef049a
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 18 deletions.
13 changes: 5 additions & 8 deletions src/m3ninx/index/segment/fst/encoding/docs/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,29 +81,26 @@ func (w *DataWriter) Reset(wr io.Writer) {
// DataReader is a reader for the data file for documents.
type DataReader struct {
data []byte
dec *encoding.Decoder
}

// NewDataReader returns a new DataReader.
func NewDataReader(data []byte) *DataReader {
return &DataReader{
data: data,
dec: encoding.NewDecoder(nil),
}
}

func (r *DataReader) Read(offset uint64) (doc.Document, error) {
if offset >= uint64(len(r.data)) {
return doc.Document{}, fmt.Errorf("invalid offset: %v is past the end of the data file", offset)
}
r.dec.Reset(r.data[int(offset):])

id, err := r.dec.Bytes()
dec := encoding.NewDecoder(r.data[int(offset):])
id, err := dec.Bytes()
if err != nil {
return doc.Document{}, err
}

x, err := r.dec.Uvarint()
x, err := dec.Uvarint()
if err != nil {
return doc.Document{}, err
}
Expand All @@ -115,11 +112,11 @@ func (r *DataReader) Read(offset uint64) (doc.Document, error) {
}

for i := 0; i < n; i++ {
name, err := r.dec.Bytes()
name, err := dec.Bytes()
if err != nil {
return doc.Document{}, err
}
val, err := r.dec.Bytes()
val, err := dec.Bytes()
if err != nil {
return doc.Document{}, err
}
Expand Down
9 changes: 4 additions & 5 deletions src/m3ninx/index/segment/fst/encoding/docs/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (w *IndexWriter) Reset(wr io.Writer) {
// IndexReader is a reader for the index file for documents.
type IndexReader struct {
data []byte
dec *encoding.Decoder
base postings.ID
limit postings.ID
len int
Expand All @@ -127,10 +126,10 @@ func NewIndexReader(data []byte) (*IndexReader, error) {

r := &IndexReader{
data: data,
dec: encoding.NewDecoder(data[:8]),
}

base, err := r.dec.Uint64()
dec := encoding.NewDecoder(data[:8])
base, err := dec.Uint64()
if err != nil {
return nil, fmt.Errorf("could not read base postings ID: %v", err)
}
Expand All @@ -146,8 +145,8 @@ func (r *IndexReader) Read(id postings.ID) (uint64, error) {
}

idx := r.index(id)
r.dec.Reset(r.data[idx:])
offset, err := r.dec.Uint64()
dec := encoding.NewDecoder(r.data[idx:])
offset, err := dec.Uint64()
if err != nil {
return 0, err
}
Expand Down
27 changes: 27 additions & 0 deletions src/m3ninx/search/proptest/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package proptest

import "github.com/m3db/m3/src/m3ninx/util"

var (
lotsTestDocuments = util.MustReadDocs("../../util/testdata/node_exporter.json", 2000)
)
102 changes: 102 additions & 0 deletions src/m3ninx/search/proptest/concurrent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package proptest

import (
"math/rand"
"os"
"sync"
"testing"
"time"

"github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment/fst"
"github.com/m3db/m3/src/m3ninx/search"
"github.com/m3db/m3/src/m3ninx/search/executor"

"github.com/leanovate/gopter"
"github.com/leanovate/gopter/prop"
"github.com/stretchr/testify/require"
)

func TestConcurrentQueries(t *testing.T) {
parameters := gopter.DefaultTestParameters()
seed := time.Now().UnixNano()
parameters.MinSuccessfulTests = 100
parameters.MaxSize = 20
parameters.Rng = rand.New(rand.NewSource(seed))
properties := gopter.NewProperties(parameters)

simpleSeg := newTestMemSegment(t, lotsTestDocuments)
simpleReader, err := simpleSeg.Reader()
require.NoError(t, err)
simpleExec := executor.NewExecutor([]index.Reader{simpleReader})

fstSeg := fst.ToTestSegment(t, simpleSeg, fstOptions)
fstReader, err := fstSeg.Reader()
require.NoError(t, err)
fstExec := executor.NewExecutor([]index.Reader{fstReader})

properties.Property("Any concurrent queries segments does not affect fst segments", prop.ForAll(
func(q search.Query) (bool, error) {
dOrg, err := simpleExec.Execute(q)
require.NoError(t, err)
matchedDocs, err := collectDocs(dOrg)
require.NoError(t, err)
docMatcher, err := newDocumentIteratorMatcher(matchedDocs...)
require.NoError(t, err)

var (
wg sync.WaitGroup
errLock sync.Mutex
matchErr error
)
wg.Add(2)

for i := 0; i < 2; i++ {
go func() {
defer wg.Done()
fstDocs, err := fstExec.Execute(q)
require.NoError(t, err)
if err := docMatcher.Matches(fstDocs); err != nil {
errLock.Lock()
matchErr = err
errLock.Unlock()
}
}()
}
wg.Wait()
errLock.Lock()
defer errLock.Unlock()
if matchErr != nil {
return false, matchErr
}

return true, nil
},
genQuery(lotsTestDocuments),
))

reporter := gopter.NewFormatedReporter(true, 160, os.Stdout)
if !properties.Run(reporter) {
t.Errorf("failed with initial seed: %d", seed)
}
}
5 changes: 0 additions & 5 deletions src/m3ninx/search/proptest/prop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,12 @@ import (
"github.com/m3db/m3/src/m3ninx/index/segment/fst"
"github.com/m3db/m3/src/m3ninx/search"
"github.com/m3db/m3/src/m3ninx/search/executor"
"github.com/m3db/m3/src/m3ninx/util"

"github.com/leanovate/gopter"
"github.com/leanovate/gopter/prop"
"github.com/stretchr/testify/require"
)

var (
lotsTestDocuments = util.MustReadDocs("../../util/testdata/node_exporter.json", 2000)
)

func TestSegmentDistributionDoesNotAffectQuery(t *testing.T) {
parameters := gopter.DefaultTestParameters()
seed := time.Now().UnixNano()
Expand Down

0 comments on commit eef049a

Please sign in to comment.