diff --git a/CHANGES.md b/CHANGES.md index bc5f17de3..652d735f4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -46,6 +46,7 @@ Release Notes. - Fix panic when removing a expired segment. - Fix panic when reading a disorder block of measure. This block's versions are not sorted in descending order. - Fix the bug that the etcd client doesn't reconnect when facing the context timeout in the startup phase. +- Fix the bug that the long running query doesn't stop when the context is canceled. ### Documentation diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go index 04be967a5..8a9be64e0 100644 --- a/banyand/dquery/measure.go +++ b/banyand/dquery/measure.go @@ -40,7 +40,7 @@ type measureQueryProcessor struct { *queryService } -func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { +func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { queryCriteria, ok := message.Data().(*measurev1.QueryRequest) n := time.Now() now := n.UnixNano() @@ -82,7 +82,6 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { if e := ml.Debug(); e.Enabled() { e.Str("plan", plan.String()).Msg("query plan") } - ctx := context.Background() var tracer *query.Tracer var span *query.Span if queryCriteria.Trace { diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go index 751a83312..397a77d4c 100644 --- a/banyand/dquery/stream.go +++ b/banyand/dquery/stream.go @@ -39,7 +39,7 @@ type streamQueryProcessor struct { *queryService } -func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { +func (p *streamQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { n := time.Now() now := n.UnixNano() queryCriteria, ok := message.Data().(*streamv1.QueryRequest) @@ -78,7 +78,6 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { if p.log.Debug().Enabled() { p.log.Debug().Str("plan", plan.String()).Msg("query plan") } - ctx := context.Background() if queryCriteria.Trace { var tracer *query.Tracer var span *query.Span diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go index a066b6ff1..ef9573893 100644 --- a/banyand/dquery/topn.go +++ b/banyand/dquery/topn.go @@ -44,7 +44,7 @@ type topNQueryProcessor struct { *queryService } -func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { +func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { request, ok := message.Data().(*measurev1.TopNRequest) if !ok { t.log.Warn().Msg("invalid event data type") @@ -64,7 +64,8 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { e.RawJSON("req", logger.Proto(request)).Msg("received a topN query event") } if request.Trace { - tracer, ctx := pkgquery.NewTracer(context.TODO(), n.Format(time.RFC3339Nano)) + var tracer *pkgquery.Tracer + tracer, ctx = pkgquery.NewTracer(ctx, n.Format(time.RFC3339Nano)) span, _ := tracer.StartSpan(ctx, "distributed-client") span.Tag("request", convert.BytesToString(logger.Proto(request))) defer func() { diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index 5a8e4900b..af6b6424c 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -245,7 +245,7 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In if err != nil { return nil, nil, err } - iter, err := s.store.Iterator(fieldKey, rangeOpts, + iter, err := s.store.Iterator(ctx, fieldKey, rangeOpts, opts.Order.Sort, opts.PreloadSize, query, opts.Projection) if err != nil { return nil, nil, err diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 00adeb36a..da21aa1e5 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -69,7 +69,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er if status != modelv1.Status_STATUS_SUCCEED { ms.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group, "measure", "write") } - ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, "measure", "write") + ms.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "measure", "write") if errResp := measure.Send(&measurev1.WriteResponse{Metadata: metadata, Status: status, MessageId: messageId}); errResp != nil { logger.Debug().Err(errResp).Msg("failed to send measure write response") ms.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "measure", "write") @@ -149,7 +149,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er continue } message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr) - _, errWritePub := publisher.Publish(data.TopicMeasureWrite, message) + _, errWritePub := publisher.Publish(ctx, data.TopicMeasureWrite, message) if errWritePub != nil { ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Str("nodeID", nodeID).Msg("failed to send a message") reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled) @@ -161,7 +161,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints: make([]*measurev1.DataPoint, 0)} -func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) { +func (ms *measureService) Query(ctx context.Context, req *measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) { for _, g := range req.Groups { ms.metrics.totalStarted.Inc(1, g, "measure", "query") } @@ -180,7 +180,6 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) } now := time.Now() if req.Trace { - ctx := context.TODO() tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano)) span, _ := tracer.StartSpan(ctx, "measure-grpc") span.Tag("request", convert.BytesToString(logger.Proto(req))) @@ -194,7 +193,7 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) span.Stop() }() } - feat, err := ms.broadcaster.Publish(data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(now.UnixNano()), req)) + feat, err := ms.broadcaster.Publish(ctx, data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(now.UnixNano()), req)) if err != nil { return nil, err } @@ -215,13 +214,12 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) return nil, nil } -func (ms *measureService) TopN(_ context.Context, topNRequest *measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) { +func (ms *measureService) TopN(ctx context.Context, topNRequest *measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) { if err = timestamp.CheckTimeRange(topNRequest.GetTimeRange()); err != nil { return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", topNRequest.GetTimeRange(), err) } now := time.Now() if topNRequest.Trace { - ctx := context.TODO() tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano)) span, _ := tracer.StartSpan(ctx, "topn-grpc") span.Tag("request", convert.BytesToString(logger.Proto(topNRequest))) @@ -236,7 +234,7 @@ func (ms *measureService) TopN(_ context.Context, topNRequest *measurev1.TopNReq }() } message := bus.NewMessage(bus.MessageID(now.UnixNano()), topNRequest) - feat, errQuery := ms.broadcaster.Publish(data.TopicTopNQuery, message) + feat, errQuery := ms.broadcaster.Publish(ctx, data.TopicTopNQuery, message) if errQuery != nil { return nil, errQuery } diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 64458a020..41dcfeda4 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -143,7 +143,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { continue } message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr) - _, errWritePub := publisher.Publish(data.TopicStreamWrite, message) + _, errWritePub := publisher.Publish(ctx, data.TopicStreamWrite, message) if errWritePub != nil { s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message") reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.sampled) @@ -155,7 +155,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { var emptyStreamQueryResponse = &streamv1.QueryResponse{Elements: make([]*streamv1.Element, 0)} -func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (resp *streamv1.QueryResponse, err error) { +func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) (resp *streamv1.QueryResponse, err error) { for _, g := range req.Groups { s.metrics.totalStarted.Inc(1, g, "stream", "query") } @@ -178,7 +178,6 @@ func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (re } now := time.Now() if req.Trace { - ctx := context.TODO() tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano)) span, _ := tracer.StartSpan(ctx, "stream-grpc") span.Tag("request", convert.BytesToString(logger.Proto(req))) @@ -193,7 +192,7 @@ func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (re }() } message := bus.NewMessage(bus.MessageID(now.UnixNano()), req) - feat, errQuery := s.broadcaster.Publish(data.TopicStreamQuery, message) + feat, errQuery := s.broadcaster.Publish(ctx, data.TopicStreamQuery, message) if errQuery != nil { if errors.Is(errQuery, io.EOF) { return emptyStreamQueryResponse, nil diff --git a/banyand/measure/query.go b/banyand/measure/query.go index bf2fa1bf5..ac1ccfa0e 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -86,6 +86,7 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr } } var result queryResult + result.ctx = ctx tsdb := db.(storage.TSDB[*tsTable, option]) result.segments = tsdb.SelectSegments(*mqo.TimeRange) if len(result.segments) < 1 { @@ -252,6 +253,11 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids [] return fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) } for tstIter.nextBlock() { + select { + case <-ctx.Done(): + return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(result.data), len(tstIter.piHeap), len(tstIter.piPool)) + default: + } bc := generateBlockCursor() p := tstIter.piHeap[0] bc.init(p.p, p.curBlock, qo) @@ -419,6 +425,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue { } type queryResult struct { + ctx context.Context sidToIndex map[common.SeriesID]int storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue tagProjection []model.TagProjection @@ -454,9 +461,15 @@ func (qr *queryResult) Pull() *model.MeasureResult { blankCursorList := []int{} for completed := 0; completed < len(qr.data); completed++ { - result := <-cursorChan - if result != -1 { - blankCursorList = append(blankCursorList, result) + select { + case <-qr.ctx.Done(): + return &model.MeasureResult{ + Error: errors.WithMessagef(qr.ctx.Err(), "interrupt: loaded %d/%d cursors", completed, len(qr.data)), + } + case result := <-cursorChan: + if result != -1 { + blankCursorList = append(blankCursorList, result) + } } } sort.Slice(blankCursorList, func(i, j int) bool { diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index 3970259ed..0b515bfea 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -18,6 +18,7 @@ package measure import ( + "context" "errors" "sort" "testing" @@ -1251,6 +1252,7 @@ func TestQueryResult(t *testing.T) { ti.init(bma, pp, sids, tt.minTimestamp, tt.maxTimestamp) var result queryResult + result.ctx = context.TODO() // Query all tags result.tagProjection = allTagProjections for ti.nextBlock() { diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go index bf6182428..750b0f065 100644 --- a/banyand/measure/topn.go +++ b/banyand/measure/topn.go @@ -216,7 +216,7 @@ func (t *topNStreamingProcessor) writeData(publisher queue.BatchPublisher, event ShardId: uint32(shardID), } message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr) - _, errWritePub := publisher.Publish(apiData.TopicMeasureWrite, message) + _, errWritePub := publisher.Publish(context.TODO(), apiData.TopicMeasureWrite, message) return errWritePub } diff --git a/banyand/measure/write.go b/banyand/measure/write.go index dadeb612f..da5ba404e 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -19,6 +19,7 @@ package measure import ( "bytes" + "context" "fmt" "time" @@ -237,7 +238,7 @@ func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPoi return dpt, nil } -func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { +func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { events, ok := message.Data().([]any) if !ok { w.l.Warn().Msg("invalid event data type") @@ -249,6 +250,12 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { } groups := make(map[string]*dataPointsInGroup) for i := range events { + select { + case <-ctx.Done(): + w.l.Warn().Msgf("context is done, handled %d events", i) + break + default: + } var writeEvent *measurev1.InternalWriteRequest switch e := events[i].(type) { case *measurev1.InternalWriteRequest: diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index 00392f3d8..b9a7e6013 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -522,7 +522,9 @@ func (e *etcdSchemaRegistry) revokeLease(lease *clientv3.LeaseGrantResponse) { defer cancel() _, err := e.client.Lease.Revoke(ctx, lease.ID) if err != nil { - e.l.Error().Err(err).Msgf("failed to revoke lease %d", lease.ID) + if !errors.Is(err, context.DeadlineExceeded) { + e.l.Error().Err(err).Msgf("failed to revoke lease %d", lease.ID) + } } } diff --git a/banyand/query/processor.go b/banyand/query/processor.go index a6b47bbbb..13fbe623f 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -55,7 +55,7 @@ type streamQueryProcessor struct { *queryService } -func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { +func (p *streamQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { n := time.Now() now := n.UnixNano() queryCriteria, ok := message.Data().(*streamv1.QueryRequest) @@ -91,7 +91,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { return } - plan, err := logical_stream.Analyze(context.TODO(), queryCriteria, meta, s) + plan, err := logical_stream.Analyze(ctx, queryCriteria, meta, s) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for stream %s: %v", meta.GetName(), err)) return @@ -100,7 +100,6 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { if p.log.Debug().Enabled() { p.log.Debug().Str("plan", plan.String()).Msg("query plan") } - ctx := context.Background() var tracer *query.Tracer var span *query.Span if queryCriteria.Trace { @@ -146,7 +145,7 @@ type measureQueryProcessor struct { *queryService } -func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { +func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { queryCriteria, ok := message.Data().(*measurev1.QueryRequest) n := time.Now() now := n.UnixNano() @@ -185,12 +184,11 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { return } - plan, err := logical_measure.Analyze(context.TODO(), queryCriteria, meta, s) + plan, err := logical_measure.Analyze(ctx, queryCriteria, meta, s) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for measure %s: %v", meta.GetName(), err)) return } - ctx := context.Background() var tracer *query.Tracer var span *query.Span if queryCriteria.Trace { diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go index 865b2f449..e042df76c 100644 --- a/banyand/query/processor_topn.go +++ b/banyand/query/processor_topn.go @@ -47,7 +47,7 @@ type topNQueryProcessor struct { *queryService } -func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { +func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { request, ok := message.Data().(*measurev1.TopNRequest) n := time.Now() now := n.UnixNano() @@ -74,7 +74,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { Name: request.Name, Group: request.Groups[0], } - topNSchema, err := t.metaService.TopNAggregationRegistry().GetTopNAggregation(context.TODO(), topNMetadata) + topNSchema, err := t.metaService.TopNAggregationRegistry().GetTopNAggregation(ctx, topNMetadata) if err != nil { t.log.Error().Err(err). Str("topN", topNMetadata.GetName()). @@ -108,7 +108,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { Str("topN", topNMetadata.GetName()). Msg("fail to build schema") } - plan, err := logical_measure.TopNAnalyze(context.TODO(), request, topNResultMeasure.GetSchema(), sourceMeasure.GetSchema(), s) + plan, err := logical_measure.TopNAnalyze(ctx, request, topNResultMeasure.GetSchema(), sourceMeasure.GetSchema(), s) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for topn %s: %v", topNMetadata.GetName(), err)) return @@ -117,7 +117,6 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { if e := ml.Debug(); e.Enabled() { e.Str("plan", plan.String()).Msg("topn plan") } - ctx := context.Background() var tracer *query.Tracer var span *query.Span if request.Trace { diff --git a/banyand/queue/local.go b/banyand/queue/local.go index 75cfb47dc..a1268f526 100644 --- a/banyand/queue/local.go +++ b/banyand/queue/local.go @@ -19,6 +19,7 @@ package queue import ( + context "context" "time" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" @@ -61,12 +62,14 @@ func (l *local) Subscribe(topic bus.Topic, listener bus.MessageListener) error { return l.local.Subscribe(topic, listener) } -func (l *local) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, error) { - return l.local.Publish(topic, message...) +func (l *local) Publish(ctx context.Context, topic bus.Topic, message ...bus.Message) (bus.Future, error) { + return l.local.Publish(ctx, topic, message...) } -func (l *local) Broadcast(_ time.Duration, topic bus.Topic, message bus.Message) ([]bus.Future, error) { - f, err := l.Publish(topic, message) +func (l *local) Broadcast(timeout time.Duration, topic bus.Topic, message bus.Message) ([]bus.Future, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + f, err := l.Publish(ctx, topic, message) if err != nil { return nil, err } @@ -91,15 +94,19 @@ func (*local) Register(schema.EventHandler) { } type localBatchPublisher struct { + ctx context.Context local *bus.Bus topic *bus.Topic messages []any } -func (l *localBatchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { +func (l *localBatchPublisher) Publish(ctx context.Context, topic bus.Topic, messages ...bus.Message) (bus.Future, error) { if l.topic == nil { l.topic = &topic } + if l.ctx == nil { + l.ctx = ctx + } for i := range messages { l.messages = append(l.messages, messages[i].Data()) } @@ -111,7 +118,7 @@ func (l *localBatchPublisher) Close() error { return nil } newMessage := bus.NewMessage(1, l.messages) - _, err := l.local.Publish(*l.topic, newMessage) + _, err := l.local.Publish(l.ctx, *l.topic, newMessage) if err != nil { return err } diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index e8b3b0f2c..2689b473f 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -170,8 +170,8 @@ func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages ...bus.Me return f, err } -func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { - return p.publish(5*time.Second, topic, messages...) +func (p *pub) Publish(_ context.Context, _ bus.Topic, _ ...bus.Message) (bus.Future, error) { + panic("should not be called") } // NewBatchPublisher returns a new batch publisher. @@ -235,7 +235,7 @@ func (bp *batchPublisher) Close() (err error) { return err } -func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { +func (bp *batchPublisher) Publish(ctx context.Context, topic bus.Topic, messages ...bus.Message) (bus.Future, error) { var err error for _, m := range messages { r, errM2R := messageToRequest(topic, m) @@ -252,6 +252,8 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus } }() select { + case <-ctx.Done(): + return false case <-stream.client.Context().Done(): return false default: @@ -269,6 +271,12 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus continue } + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + bp.pub.mu.RLock() client, ok := bp.pub.active[node] bp.pub.mu.RUnlock() @@ -276,17 +284,17 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus err = multierr.Append(err, fmt.Errorf("failed to get client for node %s", node)) continue } - ctx, cancel := context.WithTimeout(context.Background(), bp.timeout) + streamCtx, cancel := context.WithTimeout(ctx, bp.timeout) // this assignment is for getting around the go vet lint deferFn := cancel - stream, errCreateStream := client.client.Send(ctx) + stream, errCreateStream := client.client.Send(streamCtx) if errCreateStream != nil { err = multierr.Append(err, fmt.Errorf("failed to get stream for node %s: %w", node, errCreateStream)) continue } bp.streams[node] = writeStream{ client: stream, - ctxDoneCh: ctx.Done(), + ctxDoneCh: streamCtx.Done(), } bp.f.events = append(bp.f.events, make(chan batchEvent)) _ = sendData() @@ -296,6 +304,11 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus deferFn() }() for { + select { + case <-ctx.Done(): + return + default: + } _, errRecv := s.Recv() if errRecv == nil { continue diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go index 0f1bb250a..057c3b628 100644 --- a/banyand/queue/pub/pub_test.go +++ b/banyand/queue/pub/pub_test.go @@ -18,6 +18,7 @@ package pub import ( + "context" "io" "time" @@ -62,8 +63,9 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { bp := p.NewBatchPublisher(3 * time.Second) defer bp.Close() + ctx := context.TODO() for i := 0; i < 10; i++ { - _, err := bp.Publish(data.TopicStreamWrite, + _, err := bp.Publish(ctx, data.TopicStreamWrite, bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), ) @@ -88,8 +90,9 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { p.OnAddOrUpdate(node2) bp := p.NewBatchPublisher(3 * time.Second) + ctx := context.TODO() for i := 0; i < 10; i++ { - _, err := bp.Publish(data.TopicStreamWrite, + _, err := bp.Publish(ctx, data.TopicStreamWrite, bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), ) @@ -131,8 +134,9 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { p.OnAddOrUpdate(node2) bp := p.NewBatchPublisher(3 * time.Second) + ctx := context.TODO() for i := 0; i < 10; i++ { - _, err := bp.Publish(data.TopicStreamWrite, + _, err := bp.Publish(ctx, data.TopicStreamWrite, bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), ) diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go index 41736bcca..0b3f2ae9e 100644 --- a/banyand/queue/sub/sub.go +++ b/banyand/queue/sub/sub.go @@ -52,10 +52,12 @@ func (s *server) Send(stream clusterv1.Service_SendServer) error { var topic *bus.Topic var m bus.Message var dataCollection []any - var start time.Time + start := time.Now() defer func() { - s.metrics.totalFinished.Inc(1, topic.String()) - s.metrics.totalLatency.Inc(time.Since(start).Seconds(), topic.String()) + if topic != nil { + s.metrics.totalFinished.Inc(1, topic.String()) + s.metrics.totalLatency.Inc(time.Since(start).Seconds(), topic.String()) + } }() for { select { @@ -73,7 +75,7 @@ func (s *server) Send(stream clusterv1.Service_SendServer) error { reply(writeEntity, err, "no listener found") return nil } - _ = listener.Rev(bus.NewMessage(bus.MessageID(0), dataCollection)) + _ = listener.Rev(ctx, bus.NewMessage(bus.MessageID(0), dataCollection)) return nil } if err != nil { @@ -134,7 +136,7 @@ func (s *server) Send(stream clusterv1.Service_SendServer) error { continue } - m = listener.Rev(m) + m = listener.Rev(ctx, m) if m.Data() == nil { if errSend := stream.Send(&clusterv1.SendResponse{ MessageId: writeEntity.MessageId, diff --git a/banyand/stream/index.go b/banyand/stream/index.go index 56e213276..128308cd1 100644 --- a/banyand/stream/index.go +++ b/banyand/stream/index.go @@ -21,6 +21,8 @@ import ( "context" "path" + "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" @@ -53,10 +55,10 @@ func newElementIndex(ctx context.Context, root string, flushTimeoutSeconds int64 return ei, nil } -func (e *elementIndex) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, +func (e *elementIndex) Sort(ctx context.Context, sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preloadSize int, ) (index.FieldIterator[*index.DocumentResult], error) { - iter, err := e.store.Sort(sids, fieldKey, order, timeRange, preloadSize) + iter, err := e.store.Sort(ctx, sids, fieldKey, order, timeRange, preloadSize) if err != nil { return nil, err } @@ -69,9 +71,14 @@ func (e *elementIndex) Write(docs index.Documents) error { }) } -func (e *elementIndex) Search(seriesList pbv1.SeriesList, filter index.Filter) (posting.List, error) { +func (e *elementIndex) Search(ctx context.Context, seriesList pbv1.SeriesList, filter index.Filter) (posting.List, error) { var result posting.List - for _, series := range seriesList { + for i, series := range seriesList { + select { + case <-ctx.Done(): + return nil, errors.WithMessagef(ctx.Err(), "search series %d/%d", i, len(seriesList)) + default: + } pl, err := filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { return e.store, nil }, series.ID) diff --git a/banyand/stream/query.go b/banyand/stream/query.go index 7c7661360..ee335c3ca 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -20,10 +20,11 @@ package stream import ( "container/heap" "context" - "errors" "fmt" "sort" + "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" @@ -100,7 +101,7 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m result.qo.seriesToEntity[seriesList[i].ID] = seriesList[i].EntityValues result.qo.sortedSids = append(result.qo.sortedSids, seriesList[i].ID) } - if result.qo.elementFilter, err = indexSearch(sqo, result.tabs, seriesList); err != nil { + if result.qo.elementFilter, err = indexSearch(ctx, sqo, result.tabs, seriesList); err != nil { return nil, err } result.tagNameIndex = make(map[string]partition.TagLocator) @@ -121,7 +122,7 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m if sqo.Order.Index == nil { result.orderByTS = true - } else if result.sortingIter, err = s.indexSort(sqo, result.tabs, seriesList); err != nil { + } else if result.sortingIter, err = s.indexSort(ctx, sqo, result.tabs, seriesList); err != nil { return nil, err } if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { @@ -198,6 +199,11 @@ func (qr *queryResult) scanParts(ctx context.Context, qo queryOptions) error { return fmt.Errorf("cannot init tstIter: %w", ti.Error()) } for ti.nextBlock() { + select { + case <-ctx.Done(): + return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), len(ti.piHeap), len(ti.piPool)) + default: + } bc := generateBlockCursor() p := ti.piHeap[0] bc.init(p.p, p.curBlock, qo) @@ -278,9 +284,15 @@ func (qr *queryResult) load(ctx context.Context, qo queryOptions) *model.StreamR blankCursorList := []int{} for completed := 0; completed < len(qr.data); completed++ { - result := <-cursorChan - if result != -1 { - blankCursorList = append(blankCursorList, result) + select { + case <-ctx.Done(): + return &model.StreamResult{ + Error: errors.WithMessagef(ctx.Err(), "interrupt: loaded %d/%d cursors", completed, len(qr.data)), + } + case result := <-cursorChan: + if result != -1 { + blankCursorList = append(blankCursorList, result) + } } } sort.Slice(blankCursorList, func(i, j int) bool { @@ -520,7 +532,7 @@ func (qr *queryResult) mergeByTimestamp() *model.StreamResult { return result } -func indexSearch(sqo model.StreamQueryOptions, +func indexSearch(ctx context.Context, sqo model.StreamQueryOptions, tabs []*tsTable, seriesList pbv1.SeriesList, ) (posting.List, error) { if sqo.Filter == nil || sqo.Filter == logicalstream.ENode { @@ -529,7 +541,7 @@ func indexSearch(sqo model.StreamQueryOptions, result := roaring.NewPostingList() for _, tw := range tabs { index := tw.Index() - pl, err := index.Search(seriesList, sqo.Filter) + pl, err := index.Search(ctx, seriesList, sqo.Filter) if err != nil { return nil, err } @@ -543,13 +555,13 @@ func indexSearch(sqo model.StreamQueryOptions, return result, nil } -func (s *stream) indexSort(sqo model.StreamQueryOptions, tabs []*tsTable, +func (s *stream) indexSort(ctx context.Context, sqo model.StreamQueryOptions, tabs []*tsTable, seriesList pbv1.SeriesList, ) (itersort.Iterator[*index.DocumentResult], error) { if sqo.Order == nil || sqo.Order.Index == nil { return nil, nil } - iters, err := s.buildItersByIndex(tabs, seriesList, sqo) + iters, err := s.buildItersByIndex(ctx, tabs, seriesList, sqo) if err != nil { return nil, err } @@ -557,7 +569,7 @@ func (s *stream) indexSort(sqo model.StreamQueryOptions, tabs []*tsTable, return itersort.NewItemIter[*index.DocumentResult](iters, desc), nil } -func (s *stream) buildItersByIndex(tables []*tsTable, +func (s *stream) buildItersByIndex(ctx context.Context, tables []*tsTable, seriesList pbv1.SeriesList, sqo model.StreamQueryOptions, ) (iters []itersort.Iterator[*index.DocumentResult], err error) { indexRuleForSorting := sqo.Order.Index @@ -571,7 +583,7 @@ func (s *stream) buildItersByIndex(tables []*tsTable, IndexRuleID: indexRuleForSorting.GetMetadata().GetId(), Analyzer: indexRuleForSorting.GetAnalyzer(), } - iter, err = tw.Index().Sort(sids, fieldKey, sqo.Order.Sort, sqo.TimeRange, sqo.MaxElementSize) + iter, err = tw.Index().Sort(ctx, sids, fieldKey, sqo.Order.Sort, sqo.TimeRange, sqo.MaxElementSize) if err != nil { return nil, err } diff --git a/banyand/stream/write.go b/banyand/stream/write.go index 2ba742ba0..1349eecad 100644 --- a/banyand/stream/write.go +++ b/banyand/stream/write.go @@ -19,6 +19,7 @@ package stream import ( "bytes" + "context" "fmt" "strings" @@ -218,7 +219,7 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre return dst, nil } -func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { +func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { events, ok := message.Data().([]any) if !ok { w.l.Warn().Msg("invalid event data type") @@ -231,6 +232,12 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { groups := make(map[string]*elementsInGroup) var builder strings.Builder for i := range events { + select { + case <-ctx.Done(): + w.l.Warn().Msgf("context is done, handled %d events", i) + break + default: + } var writeEvent *streamv1.InternalWriteRequest switch e := events[i].(type) { case *streamv1.InternalWriteRequest: diff --git a/docs/menu.yml b/docs/menu.yml index 2b1fe0d9b..bc31ebd20 100644 --- a/docs/menu.yml +++ b/docs/menu.yml @@ -133,3 +133,5 @@ catalog: path: "/concept/clustering" - name: "TSDB" path: "/concept/tsdb" + - name: "Data Rotation" + path: "/concept/rotation" diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index b4bb5efda..7c875026f 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -20,6 +20,7 @@ package bus import ( + "context" "errors" "io" "sync" @@ -84,7 +85,7 @@ func NewMessageWithNode(id MessageID, node string, data interface{}) Message { // MessageListener is the signature of functions that can handle an EventMessage. type MessageListener interface { - Rev(message Message) Message + Rev(ctx context.Context, message Message) Message } // Subscriber allow subscribing a Topic's messages. @@ -94,7 +95,7 @@ type Subscriber interface { // Publisher allow sending Messages to a Topic. type Publisher interface { - Publish(topic Topic, message ...Message) (Future, error) + Publish(ctx context.Context, topic Topic, message ...Message) (Future, error) } // Broadcaster allow sending Messages to a Topic and receiving the responses. @@ -180,7 +181,7 @@ func (l *localFuture) GetAll() ([]Message, error) { } // Publish sends Messages to a Topic. -func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) { +func (b *Bus) Publish(ctx context.Context, topic Topic, message ...Message) (Future, error) { if topic.id == "" { return nil, errTopicEmpty } @@ -200,9 +201,9 @@ func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) { for _, ml := range mll { for _, m := range message { if f != nil { - f.messages = append(f.messages, ml.Rev(m)) + f.messages = append(f.messages, ml.Rev(ctx, m)) } else { - ml.Rev(m) + ml.Rev(ctx, m) } } } diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index f8bbfeba0..7d98ed69e 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -18,6 +18,7 @@ package bus import ( + "context" "errors" "reflect" "sort" @@ -175,7 +176,7 @@ func TestBus_PubAndSub(t *testing.T) { for _, id := range m.messageIDs { mm = append(mm, NewMessage(id, nil)) } - f, err := e.Publish(m.topic, mm...) + f, err := e.Publish(context.TODO(), m.topic, mm...) if err != nil && !m.wantErr { t.Errorf("Publish() error = %v, wantErr %v", err, m.wantErr) continue @@ -239,7 +240,7 @@ type mockListener struct { queue []MessageID } -func (m *mockListener) Rev(message Message) Message { +func (m *mockListener) Rev(_ context.Context, message Message) Message { m.queue = append(m.queue, message.id) sort.SliceStable(m.queue, func(i, j int) bool { return uint64(m.queue[i]) < uint64(m.queue[j]) diff --git a/pkg/grpchelper/client.go b/pkg/grpchelper/client.go index 516dd45ac..34ea546d3 100644 --- a/pkg/grpchelper/client.go +++ b/pkg/grpchelper/client.go @@ -72,11 +72,11 @@ func Request(ctx context.Context, rpcTimeout time.Duration, fn func(rpcCtx conte err := fn(rpcCtx) if err != nil { if stat, ok := status.FromError(err); ok && stat.Code() == codes.Unimplemented { - l.Info().Str("stat", stat.Message()).Msg("error: this server does not implement the service") + l.Debug().Str("stat", stat.Message()).Msg("error: this server does not implement the service") } else if stat, ok := status.FromError(err); ok && stat.Code() == codes.DeadlineExceeded { - l.Info().Dur("rpcTimeout", rpcTimeout).Msg("timeout: rpc did not complete within") + l.Debug().Dur("rpcTimeout", rpcTimeout).Msg("timeout: rpc did not complete within") } else { - l.Info().Err(err).Msg("error: rpc failed:") + l.Debug().Err(err).Msg("error: rpc failed:") } return err } diff --git a/pkg/index/index.go b/pkg/index/index.go index e209b36cc..644909916 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -174,8 +174,10 @@ type Writer interface { // FieldIterable allows building a FieldIterator. type FieldIterable interface { BuildQuery(seriesMatchers []SeriesMatcher, secondaryQuery Query) (Query, error) - Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int, query Query, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error) - Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int) (FieldIterator[*DocumentResult], error) + Iterator(ctx context.Context, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, + preLoadSize int, query Query, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error) + Sort(ctx context.Context, sids []common.SeriesID, fieldKey FieldKey, + order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int) (FieldIterator[*DocumentResult], error) } // Searcher allows searching a field either by its key or by its key and term. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index aa43d07c4..4987ddb5c 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -21,7 +21,6 @@ package inverted import ( "bytes" "context" - "errors" "io" "log" "math" @@ -32,6 +31,7 @@ import ( "github.com/blugelabs/bluge/analysis/analyzer" blugeIndex "github.com/blugelabs/bluge/index" "github.com/blugelabs/bluge/search" + "github.com/pkg/errors" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" @@ -177,7 +177,7 @@ func (s *store) Close() error { return s.writer.Close() } -func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, +func (s *store) Iterator(ctx context.Context, fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, preLoadSize int, indexQuery index.Query, fieldKeys []index.FieldKey, ) (iter index.FieldIterator[*index.DocumentResult], err error) { if termRange.Lower != nil && @@ -247,6 +247,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord sortedKey: sortedKey, size: preLoadSize, closer: s.closer, + ctx: ctx, } return result, nil } @@ -332,7 +333,7 @@ func getMatchOptions(analyzerOnIndexRule string, opts *modelv1.Condition_MatchOp } func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) { - iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize, nil, nil) + iter, err := s.Iterator(context.TODO(), fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize, nil, nil) if err != nil { return roaring.DummyPostingList, err } @@ -350,6 +351,7 @@ type blugeMatchIterator struct { closer io.Closer needToLoadFields []string current index.DocumentResult + hit int } func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer io.Closer, @@ -371,12 +373,14 @@ func (bmi *blugeMatchIterator) Next() bool { var match *search.DocumentMatch match, bmi.err = bmi.delegated.Next() if bmi.err != nil { + bmi.err = errors.WithMessagef(bmi.err, "failed to get next document, hit: %d", bmi.hit) return false } if match == nil { bmi.err = io.EOF return false } + bmi.hit++ for i := range bmi.current.Values { bmi.current.Values[i] = nil } @@ -409,7 +413,7 @@ func (bmi *blugeMatchIterator) Next() bool { } return true }) - bmi.err = multierr.Combine(bmi.err, err) + bmi.err = errors.WithMessagef(err, "visit stored fields, hit: %d", bmi.hit) return bmi.err == nil } @@ -428,5 +432,5 @@ func (bmi *blugeMatchIterator) Close() error { if errors.Is(bmi.err, io.EOF) { return err } - return errors.Join(bmi.err, bmi.closer.Close()) + return multierr.Combine(bmi.err, bmi.closer.Close()) } diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index 11ee28b3e..f9b700f4f 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -144,7 +144,7 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey return true }) if err != nil { - return nil, errors.WithMessage(err, "visit stored fields") + return nil, errors.WithMessagef(err, "visit stored fields, hit: %d", len(result)) } if doc.Key.ID > 0 { result = append(result, doc) @@ -152,7 +152,7 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey next, err = dmi.Next() } if err != nil { - return nil, errors.WithMessage(err, "iterate document match iterator") + return nil, errors.WithMessagef(err, "iterate document match iterator, hit: %d", len(result)) } return result, nil } diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go index 730b9e363..e7dfd310b 100644 --- a/pkg/index/inverted/sort.go +++ b/pkg/index/inverted/sort.go @@ -33,7 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/timestamp" ) -func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, +func (s *store) Sort(ctx context.Context, sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int, ) (iter index.FieldIterator[*index.DocumentResult], err error) { reader, err := s.writer.Reader() @@ -73,6 +73,7 @@ func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order mode reader: reader, sortedKey: sortedKey, size: preLoadSize, + ctx: ctx, } return result, nil } @@ -80,6 +81,7 @@ func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order mode type sortIterator struct { query index.Query err error + ctx context.Context reader *bluge.Reader current *blugeMatchIterator closer *run.Closer @@ -115,7 +117,7 @@ func (si *sortIterator) loadCurrent() bool { topNSearch = topNSearch.SetFrom(si.skipped) } - documentMatchIterator, err := si.reader.Search(context.Background(), topNSearch) + documentMatchIterator, err := si.reader.Search(si.ctx, topNSearch) if err != nil { si.err = err return false diff --git a/pkg/index/inverted/sort_test.go b/pkg/index/inverted/sort_test.go index a03367b4c..9b78dc9f6 100644 --- a/pkg/index/inverted/sort_test.go +++ b/pkg/index/inverted/sort_test.go @@ -18,6 +18,7 @@ package inverted import ( + "context" "fmt" "sort" "testing" @@ -155,7 +156,7 @@ func TestStore_Sort(t *testing.T) { t.Run(tt.name, func(t *testing.T) { tester := assert.New(t) is := require.New(t) - iter, err := s.Sort(tt.args.sids, index.FieldKey{IndexRuleID: indexRuleID}, tt.args.orderType, &tr, tt.preloadSize) + iter, err := s.Sort(context.TODO(), tt.args.sids, index.FieldKey{IndexRuleID: indexRuleID}, tt.args.orderType, &tr, tt.preloadSize) is.NoError(err) if iter == nil { tester.Empty(tt.want) diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go index 45e3aef28..842901561 100644 --- a/pkg/index/testcases/duration.go +++ b/pkg/index/testcases/duration.go @@ -19,6 +19,7 @@ package testcases import ( + "context" "fmt" "sort" "testing" @@ -288,7 +289,7 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) { t.Run(tt.name, func(t *testing.T) { tester := assert.New(t) is := require.New(t) - iter, err := store.Iterator(tt.args.fieldKey, tt.args.termRange, tt.args.orderType, tt.preloadSize, nil, nil) + iter, err := store.Iterator(context.TODO(), tt.args.fieldKey, tt.args.termRange, tt.args.orderType, tt.preloadSize, nil, nil) is.NoError(err) if iter == nil { tester.Empty(tt.want) diff --git a/pkg/logger/setting.go b/pkg/logger/setting.go index 81b401d66..32d0e1534 100644 --- a/pkg/logger/setting.go +++ b/pkg/logger/setting.go @@ -95,6 +95,13 @@ func GetLogger(scope ...string) *Logger { // Init initializes a rs/zerolog logger from user config. func Init(cfg Logging) (err error) { + switch cfg.Env { + case "prob", "": + os.Setenv("GRPC_GO_LOG_SEVERITY_LEVEL", "ERROR") + os.Setenv("GRPC_GO_LOG_FORMATTER", "json") + case "dev": + os.Setenv("GRPC_GO_LOG_SEVERITY_LEVEL", "INFO") + } return root.set(cfg) } diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go index 2dd0dab7a..9eb0a5a34 100644 --- a/pkg/meter/native/collection.go +++ b/pkg/meter/native/collection.go @@ -19,6 +19,7 @@ package native import ( + "context" "fmt" "time" @@ -86,7 +87,7 @@ func (m *MetricCollection) FlushMetrics() { messages = append(messages, bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)) } } - _, err := publisher.Publish(data.TopicMeasureWrite, messages...) + _, err := publisher.Publish(context.TODO(), data.TopicMeasureWrite, messages...) if err != nil { log.Error().Err(err).Msg("Failed to publish messasges") } diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index 1d74d375a..bf11e1115 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -203,6 +203,7 @@ func indexScan(startTime, endTime time.Time, metadata *commonv1.Metadata, projec type resultMIterator struct { result model.MeasureQueryResult + err error current []*measurev1.DataPoint i int } @@ -220,6 +221,10 @@ func (ei *resultMIterator) Next() bool { if r == nil { return false } + if r.Error != nil { + ei.err = r.Error + return false + } ei.current = ei.current[:0] ei.i = 0 for i := range r.Timestamps { @@ -261,7 +266,7 @@ func (ei *resultMIterator) Close() error { if ei.result != nil { ei.result.Release() } - return nil + return ei.err } func (i *localIndexScan) startSpan(ctx context.Context, tracer *query.Tracer, orderType model.OrderByType, orderBy *model.OrderBy) (context.Context, func(error)) { diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index 71054029a..022bcdbc4 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -84,6 +84,7 @@ type MeasureQueryOptions struct { // MeasureResult is the result of a query. type MeasureResult struct { + Error error Timestamps []int64 Versions []int64 TagFamilies []TagFamily