diff --git a/CHANGES.md b/CHANGES.md index 13a0e2b62..82ca6e08d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,7 @@ Release Notes. - Check unregistered nodes in background. - Improve sorting performance of stream. +- Add the measure query trace. ### Bugs diff --git a/api/proto/banyandb/common/v1/trace.proto b/api/proto/banyandb/common/v1/trace.proto index 73a4351d6..b2dfc8705 100644 --- a/api/proto/banyandb/common/v1/trace.proto +++ b/api/proto/banyandb/common/v1/trace.proto @@ -48,6 +48,8 @@ message Span { string message = 5; // children is a list of child spans of the span. repeated Span children = 6; + // duration is the duration of the span. + int64 duration = 7; } // Tag is the key-value pair of a span. diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go index c7e1454d3..d378febbe 100644 --- a/banyand/dquery/dquery.go +++ b/banyand/dquery/dquery.go @@ -20,9 +20,11 @@ package dquery import ( "context" + "errors" "go.uber.org/multierr" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/measure" @@ -42,13 +44,14 @@ const ( var _ run.Service = (*queryService)(nil) type queryService struct { - log *logger.Logger metaService metadata.Repo + pipeline queue.Server + log *logger.Logger sqp *streamQueryProcessor mqp *measureQueryProcessor tqp *topNQueryProcessor closer *run.Closer - pipeline queue.Server + nodeID string } // NewService return a new query service. @@ -78,7 +81,13 @@ func (q *queryService) Name() string { return moduleName } -func (q *queryService) PreRun(_ context.Context) error { +func (q *queryService) PreRun(ctx context.Context) error { + val := ctx.Value(common.ContextNodeKey) + if val == nil { + return errors.New("node id is empty") + } + node := val.(common.Node) + q.nodeID = node.NodeID q.log = logger.GetLogger(moduleName) q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log) q.mqp.measureService = measure.NewPortableRepository(q.metaService, q.log) diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go index 5f1c515a0..be6a81d53 100644 --- a/banyand/dquery/measure.go +++ b/banyand/dquery/measure.go @@ -19,6 +19,8 @@ package dquery import ( "context" + "errors" + "fmt" "time" "github.com/apache/skywalking-banyandb/api/common" @@ -27,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/query/executor" logical_measure "github.com/apache/skywalking-banyandb/pkg/query/logical/measure" ) @@ -39,7 +42,8 @@ type measureQueryProcessor struct { func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { queryCriteria, ok := message.Data().(*measurev1.QueryRequest) - now := time.Now().UnixNano() + n := time.Now() + now := n.UnixNano() if !ok { resp = bus.NewMessage(bus.MessageID(now), common.NewError("invalid event data type")) return @@ -79,8 +83,28 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { if e := ml.Debug(); e.Enabled() { e.Str("plan", plan.String()).Msg("query plan") } - - mIterator, err := plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(context.Background(), &distributedContext{ + ctx := context.Background() + var tracer *query.Tracer + var span *query.Span + if queryCriteria.Trace { + tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano)) + span, ctx = tracer.StartSpan(ctx, "distributed-%s", p.queryService.nodeID) + span.Tag("plan", plan.String()) + defer func() { + data := resp.Data() + switch d := data.(type) { + case *measurev1.QueryResponse: + d.Trace = tracer.ToProto() + case common.Error: + span.Error(errors.New(d.Msg())) + resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{Trace: tracer.ToProto()}) + default: + panic("unexpected data type") + } + span.Stop() + }() + } + mIterator, err := plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(ctx, &distributedContext{ Broadcaster: p.broadcaster, timeRange: queryCriteria.TimeRange, })) @@ -92,18 +116,34 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { defer func() { if err = mIterator.Close(); err != nil { ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan") + if span != nil { + span.Error(fmt.Errorf("fail to close the query plan: %w", err)) + } } }() result := make([]*measurev1.DataPoint, 0) - for mIterator.Next() { - current := mIterator.Current() - if len(current) > 0 { - result = append(result, current[0]) + func() { + var r int + if tracer != nil { + iterSpan, _ := tracer.StartSpan(ctx, "iterator") + defer func() { + iterSpan.Tag("rounds", fmt.Sprintf("%d", r)) + iterSpan.Tag("size", fmt.Sprintf("%d", len(result))) + iterSpan.Stop() + }() } - } + for mIterator.Next() { + r++ + current := mIterator.Current() + if len(current) > 0 { + result = append(result, current[0]) + } + } + }() + qr := &measurev1.QueryResponse{DataPoints: result} if e := ml.Debug(); e.Enabled() { - e.RawJSON("ret", logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure") + e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure") } - resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{DataPoints: result}) + resp = bus.NewMessage(bus.MessageID(now), qr) return } diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index 4dcac9d5b..cb926e0e4 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -38,6 +38,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query" + "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -88,15 +90,26 @@ func (s *seriesIndex) Write(docs index.Documents) error { var rangeOpts = index.RangeOpts{} -func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) { +func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series) (sl pbv1.SeriesList, err error) { seriesMatchers := make([]index.SeriesMatcher, len(series)) for i := range series { - var err error seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i]) if err != nil { return nil, err } } + tracer := query.GetTracer(ctx) + var span *query.Span + if tracer != nil { + span, _ = tracer.StartSpan(ctx, "seriesIndex.searchPrimary") + span.Tagf("matchers", "%v", seriesMatchers) + defer func() { + if err != nil { + span.Error(err) + } + span.Stop() + }() + } ss, err := s.store.Search(ctx, seriesMatchers) if err != nil { return nil, err @@ -105,6 +118,9 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series) if err != nil { return nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(ss)) } + if span != nil { + span.Tagf("matched", "%d", len(result)) + } return result, nil } @@ -174,28 +190,54 @@ func convertIndexSeriesToSeriesList(indexSeries []index.Series) (pbv1.SeriesList return seriesList, nil } -func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter index.Filter, order *pbv1.OrderBy, preloadSize int) (pbv1.SeriesList, error) { +func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter index.Filter, order *pbv1.OrderBy, preloadSize int) (sl pbv1.SeriesList, err error) { + tracer := query.GetTracer(ctx) + if tracer != nil { + var span *query.Span + span, ctx = tracer.StartSpan(ctx, "seriesIndex.Search") + defer func() { + if err != nil { + span.Error(err) + } + span.Stop() + }() + } seriesList, err := s.searchPrimary(ctx, series) if err != nil { return nil, err } pl := seriesList.ToList() - if filter != nil { + if filter != nil && filter != logical.ENode { var plFilter posting.List // TODO: merge searchPrimary and filter - plFilter, err = filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { - return s.store, nil - }, 0) + func() { + if tracer != nil { + span, _ := tracer.StartSpan(ctx, "filter") + span.Tag("exp", filter.String()) + defer func() { + if err != nil { + span.Error(err) + } else { + span.Tagf("matched", "%d", plFilter.Len()) + span.Tagf("total", "%d", pl.Len()) + } + span.Stop() + }() + } + if plFilter, err = filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { + return s.store, nil + }, 0); err != nil { + return + } + if plFilter == nil { + return + } + err = pl.Intersect(plFilter) + }() if err != nil { return nil, err } - if plFilter == nil { - return pbv1.SeriesList{}, nil - } - if err = pl.Intersect(plFilter); err != nil { - return nil, err - } } if order == nil || order.Index == nil { @@ -205,6 +247,17 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter fieldKey := index.FieldKey{ IndexRuleID: order.Index.GetMetadata().Id, } + var span *query.Span + if tracer != nil { + span, _ = tracer.StartSpan(ctx, "sort") + span.Tagf("preload", "%d", preloadSize) + defer func() { + if err != nil { + span.Error(err) + } + span.Stop() + }() + } // TODO:// merge searchPrimary and sort iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort, preloadSize) if err != nil { @@ -215,7 +268,9 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter }() var sortedSeriesList pbv1.SeriesList + var r int for iter.Next() { + r++ docID := iter.Val().DocID if !pl.Contains(docID) { continue @@ -225,6 +280,10 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter return nil, err } } + if span != nil { + span.Tagf("rounds", "%d", r) + span.Tagf("size", "%d", len(sortedSeriesList)) + } return sortedSeriesList, err } diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 6b4807383..175c687c9 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -34,8 +34,10 @@ import ( "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/accesslog" "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -145,21 +147,36 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints: make([]*measurev1.DataPoint, 0)} -func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) (*measurev1.QueryResponse, error) { - if err := timestamp.CheckTimeRange(req.GetTimeRange()); err != nil { +func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) { + if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil { return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", req.GetTimeRange(), err) } - message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req) - feat, errQuery := ms.broadcaster.Publish(data.TopicMeasureQuery, message) - if errQuery != nil { - return nil, errQuery + now := time.Now() + if req.Trace { + ctx := context.TODO() + tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano)) + span, _ := tracer.StartSpan(ctx, "measure-grpc") + span.Tag("request", convert.BytesToString(logger.Proto(req))) + defer func() { + if err != nil { + span.Error(err) + } else { + span.AddSubTrace(resp.Trace) + resp.Trace = tracer.ToProto() + } + span.Stop() + }() } - msg, errFeat := feat.Get() - if errFeat != nil { - if errors.Is(errFeat, io.EOF) { + feat, err := ms.broadcaster.Publish(data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(now.UnixNano()), req)) + if err != nil { + return nil, err + } + msg, err := feat.Get() + if err != nil { + if errors.Is(err, io.EOF) { return emptyMeasureQueryResponse, nil } - return nil, errFeat + return nil, err } data := msg.Data() switch d := data.(type) { diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go index 8ba0118af..7fce11d1d 100644 --- a/banyand/measure/introducer.go +++ b/banyand/measure/introducer.go @@ -40,11 +40,12 @@ func generateIntroduction() *introduction { if v == nil { return &introduction{} } - return v.(*introduction) + i := v.(*introduction) + i.reset() + return i } func releaseIntroduction(i *introduction) { - i.reset() introductionPool.Put(i) } @@ -69,11 +70,12 @@ func generateFlusherIntroduction() *flusherIntroduction { flushed: make(map[uint64]*partWrapper), } } - return v.(*flusherIntroduction) + i := v.(*flusherIntroduction) + i.reset() + return i } func releaseFlusherIntroduction(i *flusherIntroduction) { - i.reset() flusherIntroductionPool.Put(i) } @@ -100,11 +102,12 @@ func generateMergerIntroduction() *mergerIntroduction { if v == nil { return &mergerIntroduction{} } - return v.(*mergerIntroduction) + i := v.(*mergerIntroduction) + i.reset() + return i } func releaseMergerIntroduction(i *mergerIntroduction) { - i.reset() mergerIntroductionPool.Put(i) } diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 168a56633..c153a301a 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -63,17 +63,16 @@ type queryOptions struct { maxTimestamp int64 } -func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1.MeasureQueryResult, error) { +func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (mqr pbv1.MeasureQueryResult, err error) { if mqo.TimeRange == nil || len(mqo.Entities) < 1 { return nil, errors.New("invalid query options: timeRange and series are required") } if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 { return nil, errors.New("invalid query options: tagProjection or fieldProjection is required") } - var result queryResult db := s.databaseSupplier.SupplyTSDB() if db == nil { - return &result, nil + return mqr, nil } tsdb := db.(storage.TSDB[*tsTable, option]) tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange) @@ -95,7 +94,7 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 return nil, err } if len(sl) < 1 { - return &result, nil + return mqr, nil } var sids []common.SeriesID for i := range sl { @@ -108,6 +107,7 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 maxTimestamp: mqo.TimeRange.End.UnixNano(), } var n int + var result queryResult for i := range tabWrappers { s := tabWrappers[i].Table().currentSnapshot() if s == nil { @@ -120,48 +120,58 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 } result.snapshots = append(result.snapshots, s) } - bma := generateBlockMetadataArray() - defer releaseBlockMetadataArray(bma) - // TODO: cache tstIter - var tstIter tstIter - defer tstIter.reset() - originalSids := make([]common.SeriesID, len(sids)) - copy(originalSids, sids) - sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) - tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) - if tstIter.Error() != nil { - return nil, fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) - } - projectedEntityOffsets, tagProjectionOnPart := s.parseTagProjection(qo, &result) - result.tagProjection = qo.TagProjection - qo.TagProjection = tagProjectionOnPart - for tstIter.nextBlock() { - bc := generateBlockCursor() - p := tstIter.piHeap[0] - - seriesID := p.curBlock.seriesID - if result.entityValues != nil && result.entityValues[seriesID] == nil { - for i := range sl { - if sl[i].ID == seriesID { - tag := make(map[string]*modelv1.TagValue) - for name, offset := range projectedEntityOffsets { - tag[name] = sl[i].EntityValues[offset] + + func() { + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) + defFn := startBlockScanSpan(ctx, len(sids), parts, &result) + defer defFn() + // TODO: cache tstIter + var tstIter tstIter + defer tstIter.reset() + originalSids := make([]common.SeriesID, len(sids)) + copy(originalSids, sids) + sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) + tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) + if tstIter.Error() != nil { + err = fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) + return + } + projectedEntityOffsets, tagProjectionOnPart := s.parseTagProjection(qo, &result) + result.tagProjection = qo.TagProjection + qo.TagProjection = tagProjectionOnPart + + for tstIter.nextBlock() { + bc := generateBlockCursor() + p := tstIter.piHeap[0] + + seriesID := p.curBlock.seriesID + if result.entityValues != nil && result.entityValues[seriesID] == nil { + for i := range sl { + if sl[i].ID == seriesID { + tag := make(map[string]*modelv1.TagValue) + for name, offset := range projectedEntityOffsets { + tag[name] = sl[i].EntityValues[offset] + } + result.entityValues[seriesID] = tag } - result.entityValues[seriesID] = tag } } + bc.init(p.p, p.curBlock, qo) + result.data = append(result.data, bc) } - bc.init(p.p, p.curBlock, qo) - result.data = append(result.data, bc) - } - if tstIter.Error() != nil { - return nil, fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error()) + if tstIter.Error() != nil { + err = fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error()) + } + result.sidToIndex = make(map[common.SeriesID]int) + for i, si := range originalSids { + result.sidToIndex[si] = i + } + }() + if err != nil { + return nil, err } - result.sidToIndex = make(map[common.SeriesID]int) - for i, si := range originalSids { - result.sidToIndex[si] = i - } if mqo.Order == nil { result.ascTS = true } else if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { diff --git a/banyand/measure/trace.go b/banyand/measure/trace.go new file mode 100644 index 000000000..ccf51f3a4 --- /dev/null +++ b/banyand/measure/trace.go @@ -0,0 +1,74 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "fmt" + "time" + + "github.com/dustin/go-humanize" + + "github.com/apache/skywalking-banyandb/pkg/query" +) + +const ( + partMetadataHeader = "MinTimestamp, MaxTimestamp, CompressionSize, UncompressedSize, TotalCount, BlocksCount" + blockHeader = "PartID, SeriesID, MinTimestamp, MaxTimestamp, Count, UncompressedSize" +) + +func (pm *partMetadata) String() string { + minTimestamp := time.Unix(0, pm.MinTimestamp).Format(time.Stamp) + maxTimestamp := time.Unix(0, pm.MaxTimestamp).Format(time.Stamp) + + return fmt.Sprintf("%s, %s, %s, %s, %s, %s", + minTimestamp, maxTimestamp, humanize.Bytes(pm.CompressedSizeBytes), + humanize.Bytes(pm.UncompressedSizeBytes), humanize.Comma(int64(pm.TotalCount)), + humanize.Comma(int64(pm.BlocksCount))) +} + +func (bc *blockCursor) String() string { + minTimestamp := time.Unix(0, bc.minTimestamp).Format(time.Stamp) + maxTimestamp := time.Unix(0, bc.maxTimestamp).Format(time.Stamp) + + return fmt.Sprintf("%d, %d, %s, %s, %d, %s", + bc.p.partMetadata.ID, bc.bm.seriesID, minTimestamp, maxTimestamp, bc.bm.count, humanize.Bytes(bc.bm.uncompressedSizeBytes)) +} + +func startBlockScanSpan(ctx context.Context, sids int, parts []*part, qr *queryResult) func() { + tracer := query.GetTracer(ctx) + if tracer == nil { + return func() {} + } + + span, _ := tracer.StartSpan(ctx, "scan-blocks") + span.Tag("series_num", fmt.Sprintf("%d", sids)) + span.Tag("part_header", partMetadataHeader) + for i := range parts { + span.Tag(fmt.Sprintf("part_%d_%s", parts[i].partMetadata.ID, parts[i].path), + parts[i].partMetadata.String()) + } + + return func() { + span.Tag("block_header", blockHeader) + for i := range qr.data { + span.Tag(fmt.Sprintf("block_%d", i), qr.data[i].String()) + } + span.Stop() + } +} diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index c60ef7d0c..61ba8164f 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -150,8 +150,10 @@ func (s *clientService) Serve() run.StopNotify { func (s *clientService) GracefulStop() { s.closer.Done() s.closer.CloseThenWait() - if err := s.schemaRegistry.Close(); err != nil { - logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to close schema registry") + if s.schemaRegistry != nil { + if err := s.schemaRegistry.Close(); err != nil { + logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to close schema registry") + } } } diff --git a/banyand/metadata/embeddedserver/server.go b/banyand/metadata/embeddedserver/server.go index 3fbe9f6c9..44bd48645 100644 --- a/banyand/metadata/embeddedserver/server.go +++ b/banyand/metadata/embeddedserver/server.go @@ -86,8 +86,10 @@ func (s *server) Serve() run.StopNotify { func (s *server) GracefulStop() { s.Service.GracefulStop() - s.metaServer.Close() - <-s.metaServer.StopNotify() + if s.metaServer != nil { + s.metaServer.Close() + <-s.metaServer.StopNotify() + } } // NewService returns a new metadata repository Service. diff --git a/banyand/metadata/metadata_test.go b/banyand/metadata/metadata_test.go index 09a5bbaf0..981ab416f 100644 --- a/banyand/metadata/metadata_test.go +++ b/banyand/metadata/metadata_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" @@ -39,29 +40,30 @@ func Test_service_RulesBySubject(t *testing.T) { subject *commonv1.Metadata } is := assert.New(t) + req := require.New(t) is.NoError(logger.Init(logger.Logging{ Env: "dev", Level: flags.LogLevel, })) ctx := context.TODO() s, _ := embeddedserver.NewService(ctx) - is.NotNil(s) + req.NotNil(s) rootDir, deferFn, err := testhelper.NewSpace() - is.NoError(err) + req.NoError(err) err = s.FlagSet().Parse([]string{"--metadata-root-path=" + rootDir}) - is.NoError(err) - is.NoError(s.Validate()) + req.NoError(err) + req.NoError(s.Validate()) ctx = context.WithValue(ctx, common.ContextNodeKey, common.Node{NodeID: "test"}) ctx = context.WithValue(ctx, common.ContextNodeRolesKey, []databasev1.Role{databasev1.Role_ROLE_META}) err = s.PreRun(ctx) - is.NoError(err) + req.NoError(err) defer func() { s.GracefulStop() deferFn() }() err = test.PreloadSchema(ctx, s.SchemaRegistry()) - is.NoError(err) + req.NoError(err) tests := []struct { name string diff --git a/banyand/query/processor.go b/banyand/query/processor.go index ccb039e90..5a3615259 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -19,6 +19,9 @@ package query import ( "context" + "errors" + "fmt" + "runtime/debug" "time" "go.uber.org/multierr" @@ -34,6 +37,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/query/executor" logical_measure "github.com/apache/skywalking-banyandb/pkg/query/logical/measure" logical_stream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" @@ -52,13 +56,13 @@ var ( ) type queryService struct { - log *logger.Logger - // TODO: remove the metaService once https://github.com/apache/skywalking/issues/10121 is fixed. metaService metadata.Repo pipeline queue.Server + log *logger.Logger sqp *streamQueryProcessor mqp *measureQueryProcessor tqp *topNQueryProcessor + nodeID string } type streamQueryProcessor struct { @@ -76,6 +80,12 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { if p.log.Debug().Enabled() { p.log.Debug().RawJSON("criteria", logger.Proto(queryCriteria)).Msg("received a query request") } + defer func() { + if err := recover(); err != nil { + p.log.Error().Interface("err", err).RawJSON("req", logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic") + resp = bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic")) + } + }() // TODO: support multiple groups if len(queryCriteria.Groups) > 1 { resp = bus.NewMessage(bus.MessageID(now), common.NewError("only support one group in the query request")) @@ -124,11 +134,18 @@ type measureQueryProcessor struct { func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { queryCriteria, ok := message.Data().(*measurev1.QueryRequest) - now := time.Now().UnixNano() + n := time.Now() + now := n.UnixNano() if !ok { resp = bus.NewMessage(bus.MessageID(now), common.NewError("invalid event data type")) return } + defer func() { + if err := recover(); err != nil { + p.log.Error().Interface("err", err).RawJSON("req", logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic") + resp = bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic")) + } + }() // TODO: support multiple groups if len(queryCriteria.Groups) > 1 { resp = bus.NewMessage(bus.MessageID(now), common.NewError("only support one group in the query request")) @@ -160,12 +177,33 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for measure %s: %v", meta.GetName(), err)) return } + ctx := context.Background() + var tracer *query.Tracer + var span *query.Span + if queryCriteria.Trace { + tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano)) + span, ctx = tracer.StartSpan(ctx, "data-%s", p.queryService.nodeID) + span.Tag("plan", plan.String()) + defer func() { + data := resp.Data() + switch d := data.(type) { + case *measurev1.QueryResponse: + d.Trace = tracer.ToProto() + case common.Error: + span.Error(errors.New(d.Msg())) + resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{Trace: tracer.ToProto()}) + default: + panic("unexpected data type") + } + span.Stop() + }() + } if e := ml.Debug(); e.Enabled() { e.Str("plan", plan.String()).Msg("query plan") } - mIterator, err := plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(context.Background(), ec)) + mIterator, err := plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(ctx, ec)) if err != nil { ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan") resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to execute the query plan for measure %s: %v", meta.GetName(), err)) @@ -174,19 +212,36 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { defer func() { if err = mIterator.Close(); err != nil { ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan") + if span != nil { + span.Error(fmt.Errorf("fail to close the query plan: %w", err)) + } } }() + result := make([]*measurev1.DataPoint, 0) - for mIterator.Next() { - current := mIterator.Current() - if len(current) > 0 { - result = append(result, current[0]) + func() { + var r int + if tracer != nil { + iterSpan, _ := tracer.StartSpan(ctx, "iterator") + defer func() { + iterSpan.Tag("rounds", fmt.Sprintf("%d", r)) + iterSpan.Tag("size", fmt.Sprintf("%d", len(result))) + iterSpan.Stop() + }() } - } + for mIterator.Next() { + r++ + current := mIterator.Current() + if len(current) > 0 { + result = append(result, current[0]) + } + } + }() + qr := &measurev1.QueryResponse{DataPoints: result} if e := ml.Debug(); e.Enabled() { - e.RawJSON("ret", logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure") + e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure") } - resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{DataPoints: result}) + resp = bus.NewMessage(bus.MessageID(now), qr) return } @@ -194,7 +249,13 @@ func (q *queryService) Name() string { return moduleName } -func (q *queryService) PreRun(_ context.Context) error { +func (q *queryService) PreRun(ctx context.Context) error { + val := ctx.Value(common.ContextNodeKey) + if val == nil { + return errors.New("node id is empty") + } + node := val.(common.Node) + q.nodeID = node.NodeID q.log = logger.GetLogger(moduleName) return multierr.Combine( q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp), diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go index cd215263d..76c6e659d 100644 --- a/banyand/stream/introducer.go +++ b/banyand/stream/introducer.go @@ -40,11 +40,12 @@ func generateIntroduction() *introduction { if v == nil { return &introduction{} } - return v.(*introduction) + intro := v.(*introduction) + intro.reset() + return intro } func releaseIntroduction(i *introduction) { - i.reset() introductionPool.Put(i) } @@ -69,11 +70,12 @@ func generateFlusherIntroduction() *flusherIntroduction { flushed: make(map[uint64]*partWrapper), } } - return v.(*flusherIntroduction) + fi := v.(*flusherIntroduction) + fi.reset() + return fi } func releaseFlusherIntroduction(i *flusherIntroduction) { - i.reset() flusherIntroductionPool.Put(i) } @@ -100,11 +102,12 @@ func generateMergerIntroduction() *mergerIntroduction { if v == nil { return &mergerIntroduction{} } - return v.(*mergerIntroduction) + mi := v.(*mergerIntroduction) + mi.reset() + return mi } func releaseMergerIntroduction(i *mergerIntroduction) { - i.reset() mergerIntroductionPool.Put(i) } diff --git a/docs/api-reference.md b/docs/api-reference.md index 2c2522f1b..c4107a80e 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -416,6 +416,7 @@ Span is the basic unit of a trace. | tags | [Tag](#banyandb-common-v1-Tag) | repeated | tags is a list of tags of the span. | | message | [string](#string) | | message is the message generated by the span. | | children | [Span](#banyandb-common-v1-Span) | repeated | children is a list of child spans of the span. | +| duration | [int64](#int64) | | duration is the duration of the span. | diff --git a/pkg/query/doc.go b/pkg/query/doc.go new file mode 100644 index 000000000..1c2d3bf7a --- /dev/null +++ b/pkg/query/doc.go @@ -0,0 +1,19 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package query provides the query common interfaces and utilities. +package query diff --git a/pkg/query/logical/index_filter.go b/pkg/query/logical/index_filter.go index ceafa6956..8450c94ed 100644 --- a/pkg/query/logical/index_filter.go +++ b/pkg/query/logical/index_filter.go @@ -141,28 +141,44 @@ func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, ex return newNot(indexRule, newEq(indexRule, expr)), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_HAVING: bb := expr.Bytes() - and := newAnd(len(bb)) + l := len(bb) + if l < 1 { + return ENode, [][]*modelv1.TagValue{entity}, nil + } + and := newAnd(l) for _, b := range bb { and.append(newEq(indexRule, newBytesLiteral(b))) } return and, [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_NOT_HAVING: bb := expr.Bytes() - and := newAnd(len(bb)) + l := len(bb) + if l < 1 { + return ENode, [][]*modelv1.TagValue{entity}, nil + } + and := newAnd(l) for _, b := range bb { and.append(newEq(indexRule, newBytesLiteral(b))) } return newNot(indexRule, and), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_IN: bb := expr.Bytes() - or := newOr(len(bb)) + l := len(bb) + if l < 1 { + return ENode, [][]*modelv1.TagValue{entity}, nil + } + or := newOr(l) for _, b := range bb { or.append(newEq(indexRule, newBytesLiteral(b))) } return or, [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_NOT_IN: bb := expr.Bytes() - or := newOr(len(bb)) + l := len(bb) + if l < 1 { + return ENode, [][]*modelv1.TagValue{entity}, nil + } + or := newOr(l) for _, b := range bb { or.append(newEq(indexRule, newBytesLiteral(b))) } diff --git a/pkg/query/logical/measure/measure_plan_distributed.go b/pkg/query/logical/measure/measure_plan_distributed.go index 312269842..bd4377a39 100644 --- a/pkg/query/logical/measure/measure_plan_distributed.go +++ b/pkg/query/logical/measure/measure_plan_distributed.go @@ -33,7 +33,9 @@ import ( "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/iter/sort" + "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" ) @@ -155,29 +157,46 @@ type distributedPlan struct { maxDataPointsSize uint32 } -func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator, error) { +func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator, err error) { dctx := executor.FromDistributedExecutionContext(ctx) - query := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest) - query.TimeRange = dctx.TimeRange() + queryRequest := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest) + queryRequest.TimeRange = dctx.TimeRange() if t.maxDataPointsSize > 0 { - query.Limit = t.maxDataPointsSize + queryRequest.Limit = t.maxDataPointsSize } - var allErr error - ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) + tracer := query.GetTracer(ctx) + var span *query.Span + if tracer != nil { + span, _ = tracer.StartSpan(ctx, "distributed-client") + queryRequest.Trace = true + span.Tag("request", convert.BytesToString(logger.Proto(queryRequest))) + defer func() { + if err != nil { + span.Error(err) + } else { + span.Stop() + } + }() + } + ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest)) if err != nil { return nil, err } var see []sort.Iterator[*comparableDataPoint] for _, f := range ff { if m, getErr := f.Get(); getErr != nil { - allErr = multierr.Append(allErr, getErr) + err = multierr.Append(err, getErr) } else { d := m.Data() if d == nil { continue } + resp := d.(*measurev1.QueryResponse) + if span != nil { + span.AddSubTrace(resp.Trace) + } see = append(see, - newSortableElements(d.(*measurev1.QueryResponse).DataPoints, + newSortableElements(resp.DataPoints, t.sortByTime, t.sortTagSpec)) } } @@ -185,7 +204,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator, erro Iterator: sort.NewItemIter(see, t.desc), } smi.init() - return smi, allErr + return smi, err } func (t *distributedPlan) String() string { diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index c5de0da23..2e8d7304a 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -30,6 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -153,6 +154,8 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, e orderByType = pbv1.OrderByTypeSeries } ec := executor.FromMeasureExecutionContext(ctx) + ctx, stop := i.startSpan(ctx, query.GetTracer(ctx), orderByType, orderBy) + defer stop(err) result, err := ec.Query(ctx, pbv1.MeasureQueryOptions{ Name: i.metadata.GetName(), TimeRange: &i.timeRange, @@ -209,6 +212,9 @@ type resultMIterator struct { } func (ei *resultMIterator) Next() bool { + if ei.result == nil { + return false + } ei.i++ if ei.i < len(ei.current) { return true @@ -256,6 +262,33 @@ func (ei *resultMIterator) Current() []*measurev1.DataPoint { } func (ei *resultMIterator) Close() error { - ei.result.Release() + if ei.result != nil { + ei.result.Release() + } return nil } + +func (i *localIndexScan) startSpan(ctx context.Context, tracer *query.Tracer, orderType pbv1.OrderByType, orderBy *pbv1.OrderBy) (context.Context, func(error)) { + if tracer == nil { + return ctx, func(error) {} + } + + span, ctx := tracer.StartSpan(ctx, "indexScan-%s", i.metadata) + sortName := modelv1.Sort_name[int32(orderBy.Sort)] + switch orderType { + case pbv1.OrderByTypeTime: + span.Tag("orderBy", "time "+sortName) + case pbv1.OrderByTypeIndex: + span.Tag("orderBy", fmt.Sprintf("indexRule:%s", orderBy.Index.Metadata.Name)) + case pbv1.OrderByTypeSeries: + span.Tag("orderBy", "series") + } + span.Tag("details", i.String()) + + return ctx, func(err error) { + if err != nil { + span.Error(err) + } + span.Stop() + } +} diff --git a/pkg/query/tracer.go b/pkg/query/tracer.go new file mode 100644 index 000000000..4d7612730 --- /dev/null +++ b/pkg/query/tracer.go @@ -0,0 +1,155 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package query + +import ( + "context" + "fmt" + + "google.golang.org/protobuf/types/known/timestamppb" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" +) + +var ( + spanKey = spanContextKey{} + tracerKey = tracerContextKey{} +) + +type ( + spanContextKey struct{} + tracerContextKey struct{} +) + +// Tracer is a simple tracer for query. +type Tracer struct { + data *commonv1.Trace +} + +// NewTracer creates a new tracer. +func NewTracer(ctx context.Context, id string) (*Tracer, context.Context) { + tracer := GetTracer(ctx) + if tracer != nil { + return tracer, ctx + } + t := &Tracer{ + data: &commonv1.Trace{ + TraceId: id, + }, + } + return t, context.WithValue(ctx, tracerKey, t) +} + +// GetTracer returns the tracer from the context. +func GetTracer(ctx context.Context) *Tracer { + tv := ctx.Value(tracerKey) + if tv == nil { + return nil + } + tracer, ok := ctx.Value(tracerKey).(*Tracer) + if ok { + return tracer + } + panic(fmt.Errorf("invalid tracer context value: %v", tv)) +} + +// StartSpan starts a new span. +func (t *Tracer) StartSpan(ctx context.Context, format string, args ...interface{}) (*Span, context.Context) { + s := &Span{ + data: &commonv1.Span{ + Message: fmt.Sprintf(format, args...), + StartTime: timestamppb.Now(), + }, + tracer: t, + } + sv := ctx.Value(spanKey) + if sv == nil { + t.data.Spans = append(t.data.Spans, s.data) + return s, context.WithValue(ctx, spanKey, s) + } + parentSpan, ok := ctx.Value(spanKey).(*Span) + if ok { + parentSpan.addChild(s.data) + } else { + t.data.Spans = append(t.data.Spans, s.data) + } + return s, context.WithValue(ctx, spanKey, s) +} + +// ToProto returns the proto representation of the tracer. +func (t *Tracer) ToProto() *commonv1.Trace { + return t.data +} + +// Span is a span of the tracer. +type Span struct { + data *commonv1.Span + tracer *Tracer +} + +func (s *Span) addChild(child *commonv1.Span) { + s.data.Children = append(s.data.Children, child) + if child.Error { + s.Error(fmt.Errorf("sub span error")) + } +} + +// AddSubTrace adds a sub trace to the span. +func (s *Span) AddSubTrace(trace *commonv1.Trace) { + if trace == nil { + return + } + for i := range trace.Spans { + s.addChild(trace.Spans[i]) + } +} + +// Tag adds a tag to the span. +func (s *Span) Tag(key, value string) *Span { + s.data.Tags = append(s.data.Tags, &commonv1.Tag{ + Key: key, + Value: value, + }) + return s +} + +// Tagf adds a formatted tag to the span. +func (s *Span) Tagf(key, format string, args ...any) *Span { + s.data.Tags = append(s.data.Tags, &commonv1.Tag{ + Key: key, + Value: fmt.Sprintf(format, args...), + }) + return s +} + +// Error marks the span as an error span. +func (s *Span) Error(err error) *Span { + if s.data.Error { + return s + } + s.data.Error = true + s.Tag("error_msg", err.Error()) + s.tracer.data.Error = true + return s +} + +// Stop stops the span. +func (s *Span) Stop() { + s.data.EndTime = timestamppb.Now() + s.data.Duration = s.data.EndTime.AsTime().Sub(s.data.StartTime.AsTime()).Milliseconds() +} diff --git a/pkg/query/tracer_test.go b/pkg/query/tracer_test.go new file mode 100644 index 000000000..cfa4c6668 --- /dev/null +++ b/pkg/query/tracer_test.go @@ -0,0 +1,131 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package query + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" +) + +func TestNewTracer(t *testing.T) { + ctx := context.Background() + tracer, newCtx := NewTracer(ctx, "test-trace-id") + assert.NotNil(t, tracer) + assert.NotEqual(t, ctx, newCtx, "context should be different after adding a tracer") +} + +func TestGetTracer(t *testing.T) { + ctx := context.Background() + var tracer *Tracer + tracer, ctx = NewTracer(ctx, "test-trace-id") + + retrievedTracer := GetTracer(ctx) + assert.Equal(t, tracer, retrievedTracer, "retrieved tracer should be the same as the original") +} + +func TestStartSpan(t *testing.T) { + ctx := context.Background() + var tracer *Tracer + tracer, ctx = NewTracer(ctx, "test-trace-id") + span, spanCtx := tracer.StartSpan(ctx, "test span %s", "1") + + assert.NotNil(t, span) + assert.NotEqual(t, ctx, spanCtx, "context should be different after starting a span") + assert.Equal(t, "test span 1", span.data.Message) + assert.NotNil(t, span.data.StartTime) +} + +func TestSpan_AddChild(t *testing.T) { + ctx := context.Background() + var tracer *Tracer + tracer, ctx = NewTracer(ctx, "test-trace-id") + var parentSpan *Span + parentSpan, ctx = tracer.StartSpan(ctx, "parent span") + childSpan, _ := tracer.StartSpan(ctx, "child span") + + assert.Contains(t, parentSpan.data.Children, childSpan.data, "parent span should contain the child span") +} + +func TestSpan_AddSubTrace(t *testing.T) { + ctx := context.Background() + var tracer *Tracer + tracer, ctx = NewTracer(ctx, "test-trace-id") + span, _ := tracer.StartSpan(ctx, "span") + + subTrace := &commonv1.Trace{ + Spans: []*commonv1.Span{ + {Message: "sub span 1"}, + {Message: "sub span 2"}, + }, + } + + span.AddSubTrace(subTrace) + + assert.Equal(t, 2, len(span.data.Children), "span should contain two children") + assert.Equal(t, "sub span 1", span.data.Children[0].Message) + assert.Equal(t, "sub span 2", span.data.Children[1].Message) +} + +func TestSpan_Tag(t *testing.T) { + ctx := context.Background() + var tracer *Tracer + tracer, ctx = NewTracer(ctx, "test-trace-id") + span, _ := tracer.StartSpan(ctx, "span") + + span.Tag("key", "value") + + assert.Equal(t, 1, len(span.data.Tags), "span should have one tag") + assert.Equal(t, "key", span.data.Tags[0].Key) + assert.Equal(t, "value", span.data.Tags[0].Value) + + span.Tagf("key", "value %s", "formatted") + assert.Equal(t, 2, len(span.data.Tags), "span should have two tags") + assert.Equal(t, "key", span.data.Tags[1].Key) + assert.Equal(t, "value formatted", span.data.Tags[1].Value) +} + +func TestSpan_Error(t *testing.T) { + ctx := context.Background() + var tracer *Tracer + tracer, ctx = NewTracer(ctx, "test-trace-id") + span, _ := tracer.StartSpan(ctx, "span") + + span.Error(fmt.Errorf("test error")) + + assert.True(t, span.data.Error, "span should be marked as error") + assert.True(t, tracer.data.Error, "tracer should be marked as error") + assert.Equal(t, "test error", span.data.Tags[0].Value, "error message should be added as a tag") +} + +func TestSpan_Stop(t *testing.T) { + ctx := context.Background() + tracer, _ := NewTracer(ctx, "test-trace-id") + span, _ := tracer.StartSpan(ctx, "span") + time.Sleep(10 * time.Millisecond) + + span.Stop() + + assert.NotNil(t, span.data.EndTime, "span end time should be set") + assert.Greater(t, span.data.Duration, int64(0), "span duration should be greater than 0") +} diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go index b796ef012..eb9d342b2 100644 --- a/pkg/test/stream/etcd.go +++ b/pkg/test/stream/etcd.go @@ -45,6 +45,9 @@ var ( // PreloadSchema loads schemas from files in the booting process. func PreloadSchema(ctx context.Context, e schema.Registry) error { + if e == nil { + return nil + } g := &commonv1.Group{} if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil { return err diff --git a/test/stress/trace/docker-compose-cluster.yaml b/test/stress/trace/docker-compose-cluster.yaml index 48b49e7ef..ddf142216 100644 --- a/test/stress/trace/docker-compose-cluster.yaml +++ b/test/stress/trace/docker-compose-cluster.yaml @@ -76,6 +76,10 @@ services: build: dockerfile: ./docker/Dockerfile context: ../../.. + ports: + - 17913:17913 + - 6060:6060 + - 2121:2121 deploy: resources: limits: @@ -89,7 +93,7 @@ services: file: ../../docker/base-compose.yml service: oap # TODO: use the main repo image once v0.6.0 is released and merged into the main repo - image: "ghcr.io/apache/skywalking/data-generator:${SW_OAP_COMMIT}" + image: "hanahmily/data-generator:${SW_OAP_COMMIT}" environment: SW_STORAGE: banyandb SW_STORAGE_BANYANDB_TARGETS: "liaison:17912"