Skip to content

Commit

Permalink
Introduce more query tracing and node selector
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hanahmily committed Jul 15, 2024
1 parent 0ecac9e commit c98c8a7
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ Release Notes.
- Add the measure query trace.
- Assign a separate lookup table to each group in the maglev selector.
- Convert the async local pipeline to a sync pipeline.
- Add the stream query trace.
- Add the topN query trace.
- Introduce the round-robin selector to Liaison Node.

### Bugs

Expand Down
3 changes: 2 additions & 1 deletion pkg/cmdsetup/liaison.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
}
pipeline := pub.New(metaSvc)
localPipeline := queue.Local()
nodeSel := node.NewMaglevSelector()
nodeSel := node.NewRoundRobinSelector()
nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, nodeRegistry)
profSvc := observability.NewProfService()
Expand Down Expand Up @@ -77,6 +77,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
Version: version.Build(),
Short: "Run as the liaison server",
RunE: func(_ *cobra.Command, _ []string) (err error) {
defer nodeSel.Close()
node, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort())
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/node/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Selector interface {
AddNode(node *databasev1.Node)
RemoveNode(node *databasev1.Node)
Pick(group, name string, shardID uint32) (string, error)
Close()
}

// NewPickFirstSelector returns a simple selector that always returns the first node if exists.
Expand All @@ -55,6 +56,8 @@ type pickFirstSelector struct {
mu sync.RWMutex
}

func (p *pickFirstSelector) Close() {}

func (p *pickFirstSelector) AddNode(node *databasev1.Node) {
nodeID := node.GetMetadata().GetName()
p.mu.RLock()
Expand Down
2 changes: 2 additions & 0 deletions pkg/node/maglev.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type maglevSelector struct {
mutex sync.RWMutex
}

func (m *maglevSelector) Close() {}

func (m *maglevSelector) AddNode(node *databasev1.Node) {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand Down
177 changes: 177 additions & 0 deletions pkg/node/round_robin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package node

import (
"fmt"
"slices"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pkg/errors"

databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

const (
expiredKeyCleanupInterval = 1 * time.Hour
keyTTL = 24 * time.Hour
)

type roundRobinSelector struct {
clock timestamp.Clock
closeCh chan struct{}
lookupTable sync.Map
nodes []string
mu sync.RWMutex
once sync.Once
tMu sync.Mutex
}

func (r *roundRobinSelector) Close() {
close(r.closeCh)
}

// NewRoundRobinSelector creates a new round-robin selector.
func NewRoundRobinSelector() Selector {
rrs := &roundRobinSelector{
nodes: make([]string, 0),
clock: timestamp.NewClock(),
closeCh: make(chan struct{}),
}
return rrs
}

func (r *roundRobinSelector) AddNode(node *databasev1.Node) {
r.mu.Lock()
defer r.mu.Unlock()
r.nodes = append(r.nodes, node.Metadata.Name)
sort.StringSlice(r.nodes).Sort()
}

func (r *roundRobinSelector) RemoveNode(node *databasev1.Node) {
r.mu.Lock()
defer r.mu.Unlock()
for i, n := range r.nodes {
if n == node.Metadata.Name {
r.nodes = append(r.nodes[:i], r.nodes[i+1:]...)
break
}
}
}

func (r *roundRobinSelector) Pick(group, _ string, shardID uint32) (string, error) {
r.mu.RLock()
defer r.mu.RUnlock()
k := key{group: group, shardID: shardID}
if len(r.nodes) == 0 {
return "", errors.New("no nodes available")
}
entry, ok := r.lookupTable.Load(k)
if ok {
return r.selectNode(entry), nil
}
r.tMu.Lock()
defer r.tMu.Unlock()
if entry, ok := r.lookupTable.Load(k); ok {
return r.selectNode(entry), nil
}

keys := []key{k}
r.lookupTable.Range(func(k, _ any) bool {
keys = append(keys, k.(key))
return true
})
slices.SortFunc(keys, func(a, b key) int {
n := strings.Compare(a.group, b.group)
if n != 0 {
return n
}
return int(a.shardID) - int(b.shardID)
})
for i := range keys {
if entry, ok := r.lookupTable.Load(keys[i]); ok {
entry.(*tableEntry).index = i
} else {
r.lookupTable.Store(keys[i], r.newTableEntry(i))
}
}
r.once.Do(r.startCleanupTicker)
if entry, ok := r.lookupTable.Load(k); ok {
return r.selectNode(entry), nil
}
panic(fmt.Sprintf("key %v not found", k))
}

func (r *roundRobinSelector) selectNode(entry any) string {
e := entry.(*tableEntry)
now := r.clock.Now()
e.lastAccess.Store(&now)
return r.nodes[e.index%len(r.nodes)]
}

type key struct {
group string
shardID uint32
}

type tableEntry struct {
lastAccess *atomic.Pointer[time.Time]
index int
}

func (r *roundRobinSelector) newTableEntry(index int) *tableEntry {
p := atomic.Pointer[time.Time]{}
now := r.clock.Now()
p.Store(&now)
return &tableEntry{
index: index,
lastAccess: &p,
}
}

func (r *roundRobinSelector) cleanupExpiredEntries() {
now := r.clock.Now()
r.tMu.Lock()
defer r.tMu.Unlock()

r.lookupTable.Range(func(k, value any) bool {
e := value.(*tableEntry)
if now.Sub(*e.lastAccess.Load()) > keyTTL {
r.lookupTable.Delete(k)
}
return true
})
}

func (r *roundRobinSelector) startCleanupTicker() {
ticker := r.clock.Ticker(expiredKeyCleanupInterval)
go func() {
select {
case <-r.closeCh:
ticker.Stop()
return
case <-ticker.C:
r.cleanupExpiredEntries()
}
}()
}
112 changes: 112 additions & 0 deletions pkg/node/round_robin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package node

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

func TestPickEmptySelector(t *testing.T) {
selector := NewRoundRobinSelector()
_, err := selector.Pick("group1", "", 0)
assert.Error(t, err)
}

func TestPickSingleSelection(t *testing.T) {
selector := NewRoundRobinSelector()
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node1"}})
node, err := selector.Pick("group1", "", 0)
assert.NoError(t, err)
assert.Equal(t, "node1", node)
}

func TestPickMultipleSelections(t *testing.T) {
selector := NewRoundRobinSelector()
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node1"}})
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node2"}})
// load data
_, err := selector.Pick("group1", "", 0)
assert.NoError(t, err)
_, err = selector.Pick("group1", "", 1)
assert.NoError(t, err)
node1, err := selector.Pick("group1", "", 0)
assert.NoError(t, err)
node2, err := selector.Pick("group1", "", 1)
assert.NoError(t, err)
assert.NotEqual(t, node1, node2, "Different shardIDs in the same group should not result in the same node")
}

func TestPickNodeRemoval(t *testing.T) {
selector := NewRoundRobinSelector()
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node1"}})
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node2"}})
selector.RemoveNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node1"}})
node, err := selector.Pick("group1", "", 0)
assert.NoError(t, err)
assert.Equal(t, "node2", node)
}

func TestPickConsistentSelectionAfterRemoval(t *testing.T) {
selector := NewRoundRobinSelector()
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node1"}})
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node2"}})
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node3"}})
_, err := selector.Pick("group1", "", 0)
assert.NoError(t, err)
_, err = selector.Pick("group1", "", 1)
assert.NoError(t, err)
node, err := selector.Pick("group1", "", 1)
assert.NoError(t, err)
assert.Equal(t, "node2", node)
selector.RemoveNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node2"}})
node, err = selector.Pick("group1", "", 1)
assert.NoError(t, err)
assert.Equal(t, "node3", node)
}

func TestCleanupExpiredEntries(t *testing.T) {
mc := timestamp.NewMockClock()
mc.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
selector := &roundRobinSelector{
nodes: make([]string, 0),
clock: mc,
}
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node1"}})
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node2"}})
_, err := selector.Pick("group1", "", 0)
assert.NoError(t, err)
_, ok := selector.lookupTable.Load(key{group: "group1", shardID: 0})
assert.True(t, ok)
mc.Add(25 * time.Hour)
_, err = selector.Pick("group1", "", 1)
assert.NoError(t, err)
_, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
assert.True(t, ok)
selector.cleanupExpiredEntries()
_, ok = selector.lookupTable.Load(key{group: "group1", shardID: 0})
assert.False(t, ok)
_, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
assert.True(t, ok)
}
29 changes: 24 additions & 5 deletions pkg/query/logical/stream/stream_plan_distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/iter/sort"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
Expand Down Expand Up @@ -129,14 +131,28 @@ type distributedPlan struct {

func (t *distributedPlan) Close() {}

func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, error) {
func (t *distributedPlan) Execute(ctx context.Context) (ee []*streamv1.Element, err error) {
dctx := executor.FromDistributedExecutionContext(ctx)
query := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
query.TimeRange = dctx.TimeRange()
queryRequest := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
queryRequest.TimeRange = dctx.TimeRange()
if t.maxElementSize > 0 {
query.Limit = t.maxElementSize
queryRequest.Limit = t.maxElementSize
}
ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
tracer := query.GetTracer(ctx)
var span *query.Span
if tracer != nil {
span, _ = tracer.StartSpan(ctx, "distributed-client")
queryRequest.Trace = true
span.Tag("request", convert.BytesToString(logger.Proto(queryRequest)))
defer func() {
if err != nil {
span.Error(err)
} else {
span.Stop()
}
}()
}
ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest))
if err != nil {
return nil, err
}
Expand All @@ -151,6 +167,9 @@ func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, err
continue
}
resp := d.(*streamv1.QueryResponse)
if span != nil {
span.AddSubTrace(resp.Trace)
}
see = append(see,
newSortableElements(resp.Elements, t.sortByTime, t.sortTagSpec))
}
Expand Down

0 comments on commit c98c8a7

Please sign in to comment.