diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index e25ad7aec..45432c99e 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -110,6 +110,9 @@ func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series, if err != nil { return nil, nil, nil, err } + if len(ss) == 0 { + return nil, nil, nil, nil + } sl, fields, tss, err = convertIndexSeriesToSeriesList(ss, len(projection) > 0) if err != nil { return nil, nil, nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(ss)) diff --git a/banyand/measure/query.go b/banyand/measure/query.go index e0d074944..6519848ba 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -46,6 +46,8 @@ const ( checkDoneEvery = 128 ) +var nilResult = model.MeasureQueryResult(nil) + // Query allow to retrieve measure data points. type Query interface { LoadGroup(name string) (resourceSchema.Group, bool) @@ -91,7 +93,7 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr tsdb := db.(storage.TSDB[*tsTable, option]) segments := tsdb.SelectSegments(*mqo.TimeRange) if len(segments) < 1 { - return nil, nil + return nilResult, nil } if s.schema.IndexMode { @@ -106,7 +108,7 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr for i := range segments { segments[i].DecRef() } - return nil, nil + return nilResult, nil } result := queryResult{ ctx: ctx, @@ -256,7 +258,7 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Series, mqo model.MeasureQueryOptions, segments []storage.Segment[*tsTable, option], -) (*indexSortResult, error) { +) (model.MeasureQueryResult, error) { defer func() { for i := range segments { segments[i].DecRef() @@ -300,7 +302,7 @@ func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Seri PreloadSize: preloadSize, Projection: indexProjection, } - + seriesFilter := roaring.NewPostingList() for i := range segments { if mqo.TimeRange.Include(segments[i].GetTimeRange()) { opts.TimeRange = nil @@ -312,8 +314,22 @@ func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Seri if err != nil { return nil, err } + for j := 0; j < len(sr.sll); j++ { + if seriesFilter.Contains(uint64(sr.sll[j].ID)) { + sr.remove(j) + j-- + continue + } + seriesFilter.Insert(uint64(sr.sll[j].ID)) + } + if len(sr.sll) < 1 { + continue + } r.segResults = append(r.segResults, sr) } + if len(r.segResults) < 1 { + return nilResult, nil + } heap.Init(&r.segResults) return r, nil } @@ -804,10 +820,24 @@ type segResult struct { i int } +func (sr *segResult) remove(i int) { + sr.sll = append(sr.sll[:i], sr.sll[i+1:]...) + if sr.frl != nil { + sr.frl = append(sr.frl[:i], sr.frl[i+1:]...) + } + sr.timestamps = append(sr.timestamps[:i], sr.timestamps[i+1:]...) + if sr.sortedValues != nil { + sr.sortedValues = append(sr.sortedValues[:i], sr.sortedValues[i+1:]...) + } +} + type segResultHeap []*segResult func (h segResultHeap) Len() int { return len(h) } func (h segResultHeap) Less(i, j int) bool { + if h[i].sortedValues == nil { + return h[i].sll[h[i].i].ID < h[j].sll[h[j].i].ID + } return bytes.Compare(h[i].sortedValues[h[i].i], h[j].sortedValues[h[j].i]) < 0 } func (h segResultHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } diff --git a/banyand/stream/query.go b/banyand/stream/query.go index d8e14b286..e375ae165 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -44,6 +44,8 @@ import ( const checkDoneEvery = 128 +var nilResult = model.StreamQueryResult(nil) + func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr model.StreamQueryResult, err error) { if sqo.TimeRange == nil || len(sqo.Entities) < 1 { return nil, errors.New("invalid query options: timeRange and series are required") @@ -53,7 +55,7 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m } db := s.databaseSupplier.SupplyTSDB() if db == nil { - return sqr, nil + return nilResult, nil } var result queryResult tsdb := db.(storage.TSDB[*tsTable, option]) diff --git a/test/cases/init.go b/test/cases/init.go index 7f32b753a..64b60faed 100644 --- a/test/cases/init.go +++ b/test/cases/init.go @@ -41,6 +41,7 @@ func Initialize(addr string, now time.Time) { casesstreamdata.Write(conn, "duplicated", now, 0) // // measure interval = time.Minute + casesmeasuredata.Write(conn, "service_traffic", "sw_metric", "service_traffic_data_old.json", now.AddDate(0, 0, -1), interval) casesmeasuredata.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval) casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval) diff --git a/test/cases/measure/data/input/index_mode_none.yaml b/test/cases/measure/data/input/index_mode_none.yaml new file mode 100644 index 000000000..0b5e5761a --- /dev/null +++ b/test/cases/measure/data/input/index_mode_none.yaml @@ -0,0 +1,30 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "service_traffic" +groups: [ "sw_metric" ] +tagProjection: + tagFamilies: + - name: "default" + tags: [ "id", "service_id", "name", "short_name", "service_group", "layer" ] +criteria: + condition: + name: "layer" + op: "BINARY_OP_EQ" + value: + int: + value: "-1" diff --git a/test/cases/measure/data/testdata/service_traffic_data_old.json b/test/cases/measure/data/testdata/service_traffic_data_old.json new file mode 100644 index 000000000..58246fdd2 --- /dev/null +++ b/test/cases/measure/data/testdata/service_traffic_data_old.json @@ -0,0 +1,154 @@ +[ + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "1" + } + }, + { + "str": { + "value": "service_1_expired" + } + }, + { + "str": { + "value": "service_name_1_expired" + } + }, + { + "str": { + "value": "service_short_name_1_expired" + } + }, + { + "str": { + "value": "group1_expired" + } + }, + { + "int": { + "value": 1 + } + } + ] + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "2" + } + }, + { + "str": { + "value": "service_2_expired" + } + }, + { + "str": { + "value": "service_name_2_expired" + } + }, + { + "str": { + "value": "service_short_name_2_expired" + } + }, + { + "str": { + "value": "group1" + } + }, + { + "int": { + "value": 2 + } + } + ] + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "3" + } + }, + { + "str": { + "value": "service_3_expired" + } + }, + { + "str": { + "value": "service_name_3_expired" + } + }, + { + "str": { + "value": "service_short_name_3_expired" + } + }, + { + "str": { + "value": "group1_expired" + } + }, + { + "int": { + "value": 1 + } + } + ] + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "4" + } + }, + { + "str": { + "value": "service_4" + } + }, + { + "str": { + "value": "service_name_4" + } + }, + { + "str": { + "value": "service_short_name_4" + } + }, + { + "str": { + "value": "group4" + } + }, + { + "int": { + "value": 3 + } + } + ] + } + ] + } +] \ No newline at end of file diff --git a/test/cases/measure/data/want/index_mode_all_segs.yaml b/test/cases/measure/data/want/index_mode_all_segs.yaml new file mode 100644 index 000000000..a10946c64 --- /dev/null +++ b/test/cases/measure/data/want/index_mode_all_segs.yaml @@ -0,0 +1,137 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +dataPoints: +- sid: "15142466043926325685" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "1" + - key: service_id + value: + str: + value: service_1 + - key: name + value: + str: + value: service_name_1 + - key: short_name + value: + str: + value: service_short_name_1 + - key: service_group + value: + str: + value: group1 + - key: layer + value: + int: + value: "1" + timestamp: "2024-11-16T11:17:00Z" + version: "1" +- sid: "3906119849472468294" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "2" + - key: service_id + value: + str: + value: service_2 + - key: name + value: + str: + value: service_name_2 + - key: short_name + value: + str: + value: service_short_name_2 + - key: service_group + value: + str: + value: group1 + - key: layer + value: + int: + value: "2" + timestamp: "2024-11-16T11:18:00Z" + version: "1" +- sid: "12370392692163567533" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "3" + - key: service_id + value: + str: + value: service_3 + - key: name + value: + str: + value: service_name_3 + - key: short_name + value: + str: + value: service_short_name_3 + - key: service_group + value: + str: + value: group1 + - key: layer + value: + int: + value: "1" + timestamp: "2024-11-16T11:19:00Z" + version: "1" +- sid: "16450204962168035869" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: "4" + - key: service_id + value: + str: + value: service_4 + - key: name + value: + str: + value: service_name_4 + - key: short_name + value: + str: + value: service_short_name_4 + - key: service_group + value: + str: + value: group4 + - key: layer + value: + int: + value: "3" + timestamp: "2024-11-15T11:19:00Z" + version: "1" diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go index bc31747d6..39bd2bf3d 100644 --- a/test/cases/measure/measure.go +++ b/test/cases/measure/measure.go @@ -73,6 +73,8 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("match a tag belongs to the entity", helpers.Args{Input: "entity_match", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("all of index mode", helpers.Args{Input: "index_mode_all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("all in all segments of index mode", helpers.Args{Input: "index_mode_all", Want: "index_mode_all_segs", Duration: 72 * time.Hour, Offset: -48 * time.Hour}), g.Entry("order by desc of index mode", helpers.Args{Input: "index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("none of index mode", helpers.Args{Input: "index_mode_none", WantEmpty: true, Duration: 25 * time.Minute, Offset: -20 * time.Minute}), )