Skip to content

Commit

Permalink
[dbnode] Account for Neg/Pos Offsets when building per field roaring …
Browse files Browse the repository at this point in the history
…bitmap posting lists (#2213)

* Account for pos/neg offsets when building per field roaring bitmap posting lists.
  • Loading branch information
notbdu authored Mar 16, 2020
1 parent 5289a5f commit 548b247
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,35 @@ func newFieldPostingsListIterFromSegments(
continue
}

multiIter.add(iter, seg.segment)
multiIter.add(&fieldsKeyIter{
iter: iter,
segment: seg,
})
}

return multiIter, nil
}

// fieldsKeyIter needs to be a keyIterator and contains a terms iterator
var _ keyIterator = &fieldsKeyIter{}

type fieldsKeyIter struct {
iter segment.FieldsIterator
segment segmentMetadata
}

func (i *fieldsKeyIter) Next() bool {
return i.iter.Next()
}

func (i *fieldsKeyIter) Current() []byte {
return i.iter.Current()
}

func (i *fieldsKeyIter) Err() error {
return i.iter.Err()
}

func (i *fieldsKeyIter) Close() error {
return i.iter.Close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package builder

import (
"bytes"
"testing"

"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index/segment"

"github.com/stretchr/testify/require"
)

func TestFieldPostingsListIterFromSegments(t *testing.T) {
segments := []segment.Segment{
newTestSegmentWithDocs(t, []doc.Document{
{
ID: []byte("bux_0"),
Fields: []doc.Field{
{Name: []byte("fruit"), Value: []byte("apple")},
{Name: []byte("vegetable"), Value: []byte("carrot")},
{Name: []byte("infrequent"), Value: []byte("val0")},
},
},
{
ID: []byte("bar_0"),
Fields: []doc.Field{
{Name: []byte("cat"), Value: []byte("rhymes")},
{Name: []byte("hat"), Value: []byte("with")},
{Name: []byte("bat"), Value: []byte("pat")},
},
},
}),
newTestSegmentWithDocs(t, []doc.Document{
{
ID: []byte("foo_0"),
Fields: []doc.Field{
{Name: []byte("fruit"), Value: []byte("apple")},
{Name: []byte("vegetable"), Value: []byte("carrot")},
{Name: []byte("infrequent"), Value: []byte("val0")},
},
},
{
ID: []byte("bux_1"),
Fields: []doc.Field{
{Name: []byte("delta"), Value: []byte("22")},
{Name: []byte("gamma"), Value: []byte("33")},
{Name: []byte("theta"), Value: []byte("44")},
},
},
}),
newTestSegmentWithDocs(t, []doc.Document{
{
ID: []byte("bar_1"),
Fields: []doc.Field{
{Name: []byte("cat"), Value: []byte("rhymes")},
{Name: []byte("hat"), Value: []byte("with")},
{Name: []byte("bat"), Value: []byte("pat")},
},
},
{
ID: []byte("foo_1"),
Fields: []doc.Field{
{Name: []byte("fruit"), Value: []byte("apple")},
{Name: []byte("vegetable"), Value: []byte("carrot")},
{Name: []byte("infrequent"), Value: []byte("val1")},
},
},
{
ID: []byte("baz_0"),
Fields: []doc.Field{
{Name: []byte("fruit"), Value: []byte("watermelon")},
{Name: []byte("color"), Value: []byte("green")},
{Name: []byte("alpha"), Value: []byte("0.5")},
},
},
{
ID: []byte("bux_2"),
Fields: []doc.Field{
{Name: []byte("delta"), Value: []byte("22")},
{Name: []byte("gamma"), Value: []byte("33")},
{Name: []byte("theta"), Value: []byte("44")},
},
},
}),
}
builder := NewBuilderFromSegments(testOptions)
builder.Reset(0)

b, ok := builder.(*builderFromSegments)
require.True(t, ok)
require.NoError(t, builder.AddSegments(segments))
iter, err := b.FieldsPostingsList()
require.NoError(t, err)
// Perform both present/not present checks per field/field postings list.
for iter.Next() {
field, pl := iter.Current()
docIter, err := b.AllDocs()
require.NoError(t, err)
for docIter.Next() {
doc := docIter.Current()
pID := docIter.PostingsID()
found := checkIfFieldExistsInDoc(field, doc)
require.Equal(t, found, pl.Contains(pID))
}
}
}

func checkIfFieldExistsInDoc(
field []byte,
doc doc.Document,
) bool {
found := false
for _, f := range doc.Fields {
if bytes.Equal(field, f.Name) {
found = true
}
}
return found
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ import (
var _ segment.FieldsPostingsListIterator = &multiKeyPostingsListIterator{}

type multiKeyPostingsListIterator struct {
err error
firstNext bool
closeIters []keyIterator
iters []keyIterator
currIters []keyIterator
currReaders []index.Reader
segments []segment.Segment
currFieldPostingsList postings.MutableList
currFieldPostingsLists []postings.List
err error
firstNext bool
closeIters []keyIterator
iters []keyIterator
currIters []keyIterator
currReaders []index.Reader
currFieldPostingsList postings.MutableList
bitmapIter *bitmap.Iterator
}

func newMultiKeyPostingsListIterator() *multiKeyPostingsListIterator {
b := bitmap.NewBitmapWithDefaultPooling(defaultBitmapContainerPooling)
i := &multiKeyPostingsListIterator{
currFieldPostingsList: roaring.NewPostingsListFromBitmap(b),
bitmapIter: &bitmap.Iterator{},
}
i.reset()
return i
Expand All @@ -72,17 +72,11 @@ func (i *multiKeyPostingsListIterator) reset() {
i.currIters[j] = nil
}
i.currIters = i.currIters[:0]

for j := range i.segments {
i.segments[j] = nil
}
i.segments = i.segments[:0]
}

func (i *multiKeyPostingsListIterator) add(iter keyIterator, segment segment.Segment) {
func (i *multiKeyPostingsListIterator) add(iter keyIterator) {
i.closeIters = append(i.closeIters, iter)
i.iters = append(i.iters, iter)
i.segments = append(i.segments, segment)
i.tryAddCurr(iter)
}

Expand Down Expand Up @@ -123,46 +117,72 @@ func (i *multiKeyPostingsListIterator) Next() bool {
return false
}

prevField := i.currIters[0].Current()

// Re-evaluate current value
i.currEvaluate()

// NB(bodu): Build the postings list for this field if the field has changed.
defer func() {
for idx := range i.currFieldPostingsLists {
i.currFieldPostingsLists[idx] = nil
}
i.currFieldPostingsLists = i.currFieldPostingsLists[:0]
for idx := range i.currReaders {
for idx, reader := range i.currReaders {
if err := reader.Close(); err != nil {
i.err = err
}
i.currReaders[idx] = nil
}
i.currReaders = i.currReaders[:0]
}()

i.currFieldPostingsList.Reset()
currField := i.currIters[0].Current()

if !bytes.Equal(prevField, currField) {
i.currFieldPostingsList.Reset()
for _, segment := range i.segments {
reader, err := segment.Reader()
if err != nil {
i.err = err
return false
}
pl, err := reader.MatchField(currField)
if err != nil {
i.err = err
return false
}
for _, iter := range i.currIters {
fieldsKeyIter := iter.(*fieldsKeyIter)
reader, err := fieldsKeyIter.segment.segment.Reader()
if err != nil {
i.err = err
return false
}
i.currReaders = append(i.currReaders, reader)

i.currFieldPostingsLists = append(i.currFieldPostingsLists, pl)
i.currReaders = append(i.currReaders, reader)
pl, err := reader.MatchField(currField)
if err != nil {
i.err = err
return false
}

i.currFieldPostingsList.UnionMany(i.currFieldPostingsLists)
if fieldsKeyIter.segment.offset == 0 {
// No offset, which means is first segment we are combining from
// so can just direct union
i.currFieldPostingsList.Union(pl)
continue
}

for _, reader := range i.currReaders {
if err := reader.Close(); err != nil {
// We have to taken into account the offset and duplicates
var (
iter = i.bitmapIter
duplicates = fieldsKeyIter.segment.duplicatesAsc
negativeOffset postings.ID
)
bitmap, ok := roaring.BitmapFromPostingsList(pl)
if !ok {
i.err = errPostingsListNotRoaring
return false
}

iter.Reset(bitmap)
for v, eof := iter.Next(); !eof; v, eof = iter.Next() {
curr := postings.ID(v)
for len(duplicates) > 0 && curr > duplicates[0] {
duplicates = duplicates[1:]
negativeOffset++
}
if len(duplicates) > 0 && curr == duplicates[0] {
duplicates = duplicates[1:]
negativeOffset++
// Also skip this value, as itself is a duplicate
continue
}
value := curr + fieldsKeyIter.segment.offset - negativeOffset
if err := i.currFieldPostingsList.Insert(value); err != nil {
i.err = err
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ func (i *termsIterFromSegments) Next() bool {
continue
}
value := curr + termsKeyIter.segment.offset - negativeOffset
_ = i.currPostingsList.Insert(value)
if err := i.currPostingsList.Insert(value); err != nil {
i.err = err
return false
}
}
}

Expand Down

0 comments on commit 548b247

Please sign in to comment.