Skip to content

Commit

Permalink
Check the context in write and query (#547)
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hanahmily authored Oct 1, 2024
1 parent 40ea208 commit 35306d2
Show file tree
Hide file tree
Showing 35 changed files with 195 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Release Notes.
- Fix panic when removing a expired segment.
- Fix panic when reading a disorder block of measure. This block's versions are not sorted in descending order.
- Fix the bug that the etcd client doesn't reconnect when facing the context timeout in the startup phase.
- Fix the bug that the long running query doesn't stop when the context is canceled.

### Documentation

Expand Down
3 changes: 1 addition & 2 deletions banyand/dquery/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type measureQueryProcessor struct {
*queryService
}

func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
n := time.Now()
now := n.UnixNano()
Expand Down Expand Up @@ -82,7 +82,6 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
if e := ml.Debug(); e.Enabled() {
e.Str("plan", plan.String()).Msg("query plan")
}
ctx := context.Background()
var tracer *query.Tracer
var span *query.Span
if queryCriteria.Trace {
Expand Down
3 changes: 1 addition & 2 deletions banyand/dquery/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type streamQueryProcessor struct {
*queryService
}

func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
func (p *streamQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
n := time.Now()
now := n.UnixNano()
queryCriteria, ok := message.Data().(*streamv1.QueryRequest)
Expand Down Expand Up @@ -78,7 +78,6 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
if p.log.Debug().Enabled() {
p.log.Debug().Str("plan", plan.String()).Msg("query plan")
}
ctx := context.Background()
if queryCriteria.Trace {
var tracer *query.Tracer
var span *query.Span
Expand Down
5 changes: 3 additions & 2 deletions banyand/dquery/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type topNQueryProcessor struct {
*queryService
}

func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
request, ok := message.Data().(*measurev1.TopNRequest)
if !ok {
t.log.Warn().Msg("invalid event data type")
Expand All @@ -64,7 +64,8 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
e.RawJSON("req", logger.Proto(request)).Msg("received a topN query event")
}
if request.Trace {
tracer, ctx := pkgquery.NewTracer(context.TODO(), n.Format(time.RFC3339Nano))
var tracer *pkgquery.Tracer
tracer, ctx = pkgquery.NewTracer(ctx, n.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "distributed-client")
span.Tag("request", convert.BytesToString(logger.Proto(request)))
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
if err != nil {
return nil, nil, err
}
iter, err := s.store.Iterator(fieldKey, rangeOpts,
iter, err := s.store.Iterator(ctx, fieldKey, rangeOpts,
opts.Order.Sort, opts.PreloadSize, query, opts.Projection)
if err != nil {
return nil, nil, err
Expand Down
14 changes: 6 additions & 8 deletions banyand/liaison/grpc/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
if status != modelv1.Status_STATUS_SUCCEED {
ms.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group, "measure", "write")
}
ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, "measure", "write")
ms.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "measure", "write")
if errResp := measure.Send(&measurev1.WriteResponse{Metadata: metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Debug().Err(errResp).Msg("failed to send measure write response")
ms.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "measure", "write")
Expand Down Expand Up @@ -149,7 +149,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
continue
}
message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
_, errWritePub := publisher.Publish(data.TopicMeasureWrite, message)
_, errWritePub := publisher.Publish(ctx, data.TopicMeasureWrite, message)
if errWritePub != nil {
ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Str("nodeID", nodeID).Msg("failed to send a message")
reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled)
Expand All @@ -161,7 +161,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er

var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints: make([]*measurev1.DataPoint, 0)}

func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
func (ms *measureService) Query(ctx context.Context, req *measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
for _, g := range req.Groups {
ms.metrics.totalStarted.Inc(1, g, "measure", "query")
}
Expand All @@ -180,7 +180,6 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest)
}
now := time.Now()
if req.Trace {
ctx := context.TODO()
tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "measure-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(req)))
Expand All @@ -194,7 +193,7 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest)
span.Stop()
}()
}
feat, err := ms.broadcaster.Publish(data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(now.UnixNano()), req))
feat, err := ms.broadcaster.Publish(ctx, data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(now.UnixNano()), req))
if err != nil {
return nil, err
}
Expand All @@ -215,13 +214,12 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest)
return nil, nil
}

func (ms *measureService) TopN(_ context.Context, topNRequest *measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) {
func (ms *measureService) TopN(ctx context.Context, topNRequest *measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) {
if err = timestamp.CheckTimeRange(topNRequest.GetTimeRange()); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", topNRequest.GetTimeRange(), err)
}
now := time.Now()
if topNRequest.Trace {
ctx := context.TODO()
tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "topn-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(topNRequest)))
Expand All @@ -236,7 +234,7 @@ func (ms *measureService) TopN(_ context.Context, topNRequest *measurev1.TopNReq
}()
}
message := bus.NewMessage(bus.MessageID(now.UnixNano()), topNRequest)
feat, errQuery := ms.broadcaster.Publish(data.TopicTopNQuery, message)
feat, errQuery := ms.broadcaster.Publish(ctx, data.TopicTopNQuery, message)
if errQuery != nil {
return nil, errQuery
}
Expand Down
7 changes: 3 additions & 4 deletions banyand/liaison/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
continue
}
message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
_, errWritePub := publisher.Publish(data.TopicStreamWrite, message)
_, errWritePub := publisher.Publish(ctx, data.TopicStreamWrite, message)
if errWritePub != nil {
s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.sampled)
Expand All @@ -155,7 +155,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {

var emptyStreamQueryResponse = &streamv1.QueryResponse{Elements: make([]*streamv1.Element, 0)}

func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (resp *streamv1.QueryResponse, err error) {
func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) (resp *streamv1.QueryResponse, err error) {
for _, g := range req.Groups {
s.metrics.totalStarted.Inc(1, g, "stream", "query")
}
Expand All @@ -178,7 +178,6 @@ func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (re
}
now := time.Now()
if req.Trace {
ctx := context.TODO()
tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "stream-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(req)))
Expand All @@ -193,7 +192,7 @@ func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (re
}()
}
message := bus.NewMessage(bus.MessageID(now.UnixNano()), req)
feat, errQuery := s.broadcaster.Publish(data.TopicStreamQuery, message)
feat, errQuery := s.broadcaster.Publish(ctx, data.TopicStreamQuery, message)
if errQuery != nil {
if errors.Is(errQuery, io.EOF) {
return emptyStreamQueryResponse, nil
Expand Down
19 changes: 16 additions & 3 deletions banyand/measure/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr
}
}
var result queryResult
result.ctx = ctx
tsdb := db.(storage.TSDB[*tsTable, option])
result.segments = tsdb.SelectSegments(*mqo.TimeRange)
if len(result.segments) < 1 {
Expand Down Expand Up @@ -252,6 +253,11 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids []
return fmt.Errorf("cannot init tstIter: %w", tstIter.Error())
}
for tstIter.nextBlock() {
select {
case <-ctx.Done():
return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(result.data), len(tstIter.piHeap), len(tstIter.piPool))
default:
}
bc := generateBlockCursor()
p := tstIter.piHeap[0]
bc.init(p.p, p.curBlock, qo)
Expand Down Expand Up @@ -419,6 +425,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue {
}

type queryResult struct {
ctx context.Context
sidToIndex map[common.SeriesID]int
storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue
tagProjection []model.TagProjection
Expand Down Expand Up @@ -454,9 +461,15 @@ func (qr *queryResult) Pull() *model.MeasureResult {

blankCursorList := []int{}
for completed := 0; completed < len(qr.data); completed++ {
result := <-cursorChan
if result != -1 {
blankCursorList = append(blankCursorList, result)
select {
case <-qr.ctx.Done():
return &model.MeasureResult{
Error: errors.WithMessagef(qr.ctx.Err(), "interrupt: loaded %d/%d cursors", completed, len(qr.data)),
}
case result := <-cursorChan:
if result != -1 {
blankCursorList = append(blankCursorList, result)
}
}
}
sort.Slice(blankCursorList, func(i, j int) bool {
Expand Down
2 changes: 2 additions & 0 deletions banyand/measure/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package measure

import (
"context"
"errors"
"sort"
"testing"
Expand Down Expand Up @@ -1251,6 +1252,7 @@ func TestQueryResult(t *testing.T) {
ti.init(bma, pp, sids, tt.minTimestamp, tt.maxTimestamp)

var result queryResult
result.ctx = context.TODO()
// Query all tags
result.tagProjection = allTagProjections
for ti.nextBlock() {
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (t *topNStreamingProcessor) writeData(publisher queue.BatchPublisher, event
ShardId: uint32(shardID),
}
message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr)
_, errWritePub := publisher.Publish(apiData.TopicMeasureWrite, message)
_, errWritePub := publisher.Publish(context.TODO(), apiData.TopicMeasureWrite, message)
return errWritePub
}

Expand Down
9 changes: 8 additions & 1 deletion banyand/measure/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package measure

import (
"bytes"
"context"
"fmt"
"time"

Expand Down Expand Up @@ -237,7 +238,7 @@ func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPoi
return dpt, nil
}

func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
events, ok := message.Data().([]any)
if !ok {
w.l.Warn().Msg("invalid event data type")
Expand All @@ -249,6 +250,12 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
}
groups := make(map[string]*dataPointsInGroup)
for i := range events {
select {
case <-ctx.Done():
w.l.Warn().Msgf("context is done, handled %d events", i)
break
default:
}
var writeEvent *measurev1.InternalWriteRequest
switch e := events[i].(type) {
case *measurev1.InternalWriteRequest:
Expand Down
4 changes: 3 additions & 1 deletion banyand/metadata/schema/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,9 @@ func (e *etcdSchemaRegistry) revokeLease(lease *clientv3.LeaseGrantResponse) {
defer cancel()
_, err := e.client.Lease.Revoke(ctx, lease.ID)
if err != nil {
e.l.Error().Err(err).Msgf("failed to revoke lease %d", lease.ID)
if !errors.Is(err, context.DeadlineExceeded) {
e.l.Error().Err(err).Msgf("failed to revoke lease %d", lease.ID)
}
}
}

Expand Down
10 changes: 4 additions & 6 deletions banyand/query/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type streamQueryProcessor struct {
*queryService
}

func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
func (p *streamQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
n := time.Now()
now := n.UnixNano()
queryCriteria, ok := message.Data().(*streamv1.QueryRequest)
Expand Down Expand Up @@ -91,7 +91,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
return
}

plan, err := logical_stream.Analyze(context.TODO(), queryCriteria, meta, s)
plan, err := logical_stream.Analyze(ctx, queryCriteria, meta, s)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for stream %s: %v", meta.GetName(), err))
return
Expand All @@ -100,7 +100,6 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
if p.log.Debug().Enabled() {
p.log.Debug().Str("plan", plan.String()).Msg("query plan")
}
ctx := context.Background()
var tracer *query.Tracer
var span *query.Span
if queryCriteria.Trace {
Expand Down Expand Up @@ -146,7 +145,7 @@ type measureQueryProcessor struct {
*queryService
}

func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
n := time.Now()
now := n.UnixNano()
Expand Down Expand Up @@ -185,12 +184,11 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
return
}

plan, err := logical_measure.Analyze(context.TODO(), queryCriteria, meta, s)
plan, err := logical_measure.Analyze(ctx, queryCriteria, meta, s)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for measure %s: %v", meta.GetName(), err))
return
}
ctx := context.Background()
var tracer *query.Tracer
var span *query.Span
if queryCriteria.Trace {
Expand Down
7 changes: 3 additions & 4 deletions banyand/query/processor_topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type topNQueryProcessor struct {
*queryService
}

func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
request, ok := message.Data().(*measurev1.TopNRequest)
n := time.Now()
now := n.UnixNano()
Expand All @@ -74,7 +74,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
Name: request.Name,
Group: request.Groups[0],
}
topNSchema, err := t.metaService.TopNAggregationRegistry().GetTopNAggregation(context.TODO(), topNMetadata)
topNSchema, err := t.metaService.TopNAggregationRegistry().GetTopNAggregation(ctx, topNMetadata)
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
Expand Down Expand Up @@ -108,7 +108,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
Str("topN", topNMetadata.GetName()).
Msg("fail to build schema")
}
plan, err := logical_measure.TopNAnalyze(context.TODO(), request, topNResultMeasure.GetSchema(), sourceMeasure.GetSchema(), s)
plan, err := logical_measure.TopNAnalyze(ctx, request, topNResultMeasure.GetSchema(), sourceMeasure.GetSchema(), s)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for topn %s: %v", topNMetadata.GetName(), err))
return
Expand All @@ -117,7 +117,6 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
if e := ml.Debug(); e.Enabled() {
e.Str("plan", plan.String()).Msg("topn plan")
}
ctx := context.Background()
var tracer *query.Tracer
var span *query.Span
if request.Trace {
Expand Down
Loading

0 comments on commit 35306d2

Please sign in to comment.