Skip to content

Commit

Permalink
[coordinator] Only honor default aggregation policies if not matched …
Browse files Browse the repository at this point in the history
…by mapping rule (#2203)

* [coordinator] Only honor default aggregation policies if not matched by mapping rule
  • Loading branch information
robskillington authored Mar 16, 2020
1 parent cbb8109 commit 8ddd8ea
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 15 deletions.
147 changes: 141 additions & 6 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,132 @@ func TestDownsamplerAggregationWithRulesConfigMappingRules(t *testing.T) {
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMappingRule(t *testing.T) {
gaugeMetric := testGaugeMetric{
tags: map[string]string{
nameTag: "foo_metric",
"app": "nginx_edge",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 15}, {value: 10}, {value: 30}, {value: 5}, {value: 0},
},
}
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
autoMappingRules: []AutoMappingRule{
{
Aggregations: []aggregation.Type{aggregation.Sum},
Policies: policy.StoragePolicies{
policy.MustParseStoragePolicy("2s:24h"),
policy.MustParseStoragePolicy("4s:48h"),
},
},
},
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Filter: "app:nginx*",
Aggregations: []aggregation.Type{aggregation.Max},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
},
},
},
},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
// Expect the max to be used and override the default auto
// mapping rule for the storage policy 2s:24h.
{
tags: gaugeMetric.tags,
value: 30,
attributes: &storage.Attributes{
MetricsType: storage.AggregatedMetricsType,
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
},
},
// Expect the sum to still be used for the storage
// policy 4s:48h.
{
tags: gaugeMetric.tags,
value: 60,
attributes: &storage.Attributes{
MetricsType: storage.AggregatedMetricsType,
Resolution: 4 * time.Second,
Retention: 48 * time.Hour,
},
},
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigMappingRulesReplaceAutoMappingRule(t *testing.T) {
gaugeMetric := testGaugeMetric{
tags: map[string]string{
nameTag: "foo_metric",
"app": "nginx_edge",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 15}, {value: 10}, {value: 30}, {value: 5}, {value: 0},
},
}
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
autoMappingRules: []AutoMappingRule{
{
Aggregations: []aggregation.Type{aggregation.Sum},
Policies: policy.StoragePolicies{
policy.MustParseStoragePolicy("2s:24h"),
},
},
},
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Filter: "app:nginx*",
Aggregations: []aggregation.Type{aggregation.Max},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
},
},
},
},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
// Expect the max to be used and override the default auto
// mapping rule for the storage policy 2s:24h.
{
tags: gaugeMetric.tags,
value: 30,
attributes: &storage.Attributes{
MetricsType: storage.AggregatedMetricsType,
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
},
},
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigRollupRules(t *testing.T) {
gaugeMetric := testGaugeMetric{
tags: map[string]string{
Expand All @@ -197,7 +323,6 @@ func TestDownsamplerAggregationWithRulesConfigRollupRules(t *testing.T) {
res := 5 * time.Second
ret := 30 * 24 * time.Hour
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
instrumentOpts: instrument.NewTestOptions(t),
rulesConfig: &RulesConfiguration{
RollupRules: []RollupRuleConfiguration{
{
Expand Down Expand Up @@ -472,7 +597,8 @@ CheckAllWritesArrivedLoop:

for _, expectedWrite := range expectedWrites {
name := expectedWrite.tags[nameTag]
if _, ok := findWrite(t, writes, name); !ok {
_, ok := findWrite(t, writes, name, expectedWrite.attributes)
if !ok {
time.Sleep(100 * time.Millisecond)
continue CheckAllWritesArrivedLoop
}
Expand All @@ -497,7 +623,7 @@ CheckAllWritesArrivedLoop:
name := expectedWrite.tags[nameTag]
value := expectedWrite.value

write, found := findWrite(t, writes, name)
write, found := findWrite(t, writes, name, expectedWrite.attributes)
require.True(t, found)
assert.Equal(t, expectedWrite.tags, tagsToStringMap(write.Tags))
require.Equal(t, 1, len(write.Datapoints))
Expand Down Expand Up @@ -724,7 +850,8 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
clockOpts = opts.clockOpts
}

instrumentOpts := instrument.NewOptions()
// Use a test instrument options by default to get the debug logs on by default.
instrumentOpts := instrument.NewTestOptions(t)
if opts.instrumentOpts != nil {
instrumentOpts = opts.instrumentOpts
}
Expand Down Expand Up @@ -827,12 +954,20 @@ func findWrite(
t *testing.T,
writes []*storage.WriteQuery,
name string,
optionalMatchAttrs *storage.Attributes,
) (*storage.WriteQuery, bool) {
for _, w := range writes {
if t, ok := w.Tags.Get([]byte(nameTag)); ok {
if bytes.Equal(t, []byte(name)) {
return w, true
if !bytes.Equal(t, []byte(name)) {
// Does not match name.
continue
}
if optionalMatchAttrs != nil && w.Attributes != *optionalMatchAttrs {
// Tried to match attributes and not matched.
continue
}
// Matches name and all optional lookups.
return w, true
}
}
return nil, false
Expand Down
106 changes: 97 additions & 9 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/m3db/m3/src/metrics/generated/proto/metricpb"
"github.com/m3db/m3/src/metrics/matcher"
"github.com/m3db/m3/src/metrics/metadata"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/serialize"

Expand All @@ -57,6 +58,8 @@ type metricsAppenderOptions struct {
matcher matcher.Matcher
metricTagsIteratorPool serialize.MetricTagsIteratorPool

mappingRuleStoragePolicies []policy.StoragePolicy

clockOpts clock.Options
debugLogging bool
logger *zap.Logger
Expand Down Expand Up @@ -120,11 +123,30 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
})
}
} else {
// Always aggregate any default staged metadats
for _, stagedMetadatas := range a.defaultStagedMetadatas {
a.debugLogMatch("downsampler applying default mapping rule",
// NB(r): First apply mapping rules to see which storage policies
// have been applied, any that have been applied as part of
// mapping rules that exact match a default storage policy will be
// skipped when applying default rules, so as to avoid storing
// the same metrics in the same namespace with the same metric
// name and tags (i.e. overwriting each other).
a.mappingRuleStoragePolicies = a.mappingRuleStoragePolicies[:0]

stagedMetadatas := matchResult.ForExistingIDAt(nowNanos)
if !stagedMetadatas.IsDefault() && len(stagedMetadatas) != 0 {
a.debugLogMatch("downsampler applying matched mapping rule",
debugLogMatchOptions{Meta: stagedMetadatas})

// Collect all the current active mapping rules
for _, stagedMetadata := range stagedMetadatas {
for _, pipe := range stagedMetadata.Pipelines {
for _, sp := range pipe.StoragePolicies {
a.mappingRuleStoragePolicies =
append(a.mappingRuleStoragePolicies, sp)
}
}
}

// Only sample if going to actually aggregate
a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
clientRemote: a.clientRemote,
Expand All @@ -133,12 +155,74 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
})
}

stagedMetadatas := matchResult.ForExistingIDAt(nowNanos)
if !stagedMetadatas.IsDefault() && len(stagedMetadatas) != 0 {
a.debugLogMatch("downsampler applying matched mapping rule",
// Always aggregate any default staged metadatas (unless
// mapping rule has provided an override for a storage policy,
// if so then skip aggregating for that storage policy).
for _, stagedMetadatas := range a.defaultStagedMetadatas {
a.debugLogMatch("downsampler applying default mapping rule",
debugLogMatchOptions{Meta: stagedMetadatas})

// Only sample if going to actually aggregate
stagedMetadataBeforeFilter := stagedMetadatas[:]
if len(a.mappingRuleStoragePolicies) != 0 {
// If mapping rules have applied aggregations for
// storage policies then de-dupe so we don't have two
// active aggregations for the same storage policy.
stagedMetadatasAfterFilter := stagedMetadatas[:0]
for _, stagedMetadata := range stagedMetadatas {
pipesAfterFilter := stagedMetadata.Pipelines[:0]
for _, pipe := range stagedMetadata.Pipelines {
storagePoliciesAfterFilter := pipe.StoragePolicies[:0]
for _, sp := range pipe.StoragePolicies {
// Check aggregation for storage policy not already
// set by a mapping rule.
matchedByMappingRule := false
for _, existing := range a.mappingRuleStoragePolicies {
if sp.Equivalent(existing) {
matchedByMappingRule = true
a.debugLogMatch("downsampler skipping default mapping rule storage policy",
debugLogMatchOptions{Meta: stagedMetadataBeforeFilter})
break
}
}
if !matchedByMappingRule {
// Keep storage policy if not matched by mapping rule.
storagePoliciesAfterFilter =
append(storagePoliciesAfterFilter, sp)
}
}

// Update storage policies slice after filtering.
pipe.StoragePolicies = storagePoliciesAfterFilter

if len(pipe.StoragePolicies) != 0 {
// Keep storage policy if still has some storage policies.
pipesAfterFilter = append(pipesAfterFilter, pipe)
}
}

// Update pipelnes after filtering.
stagedMetadata.Pipelines = pipesAfterFilter

if len(stagedMetadata.Pipelines) != 0 {
// Keep staged metadata if still has some pipelines.
stagedMetadatasAfterFilter =
append(stagedMetadatasAfterFilter, stagedMetadata)
}
}

// Finally set the staged metadatas we're keeping
// as those that were kept after filtering.
stagedMetadatas = stagedMetadatasAfterFilter
}

// Now skip appending if after filtering there's no staged metadatas
// after any filtering that's applied.
if len(stagedMetadatas) == 0 {
a.debugLogMatch("downsampler skipping default mapping rule completely",
debugLogMatchOptions{Meta: stagedMetadataBeforeFilter})
continue
}

a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
clientRemote: a.clientRemote,
Expand Down Expand Up @@ -167,8 +251,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
}

type debugLogMatchOptions struct {
Meta metadata.StagedMetadatas
RollupID []byte
Meta metadata.StagedMetadatas
StoragePolicy policy.StoragePolicy
RollupID []byte
}

func (a *metricsAppender) debugLogMatch(str string, opts debugLogMatchOptions) {
Expand All @@ -184,6 +269,9 @@ func (a *metricsAppender) debugLogMatch(str string, opts debugLogMatchOptions) {
if v := opts.Meta; v != nil {
fields = append(fields, stagedMetadatasLogField(v))
}
if v := opts.StoragePolicy; v != policy.EmptyStoragePolicy {
fields = append(fields, zap.Stringer("storagePolicy", v))
}
a.logger.Debug(str, fields...)
}

Expand Down
8 changes: 8 additions & 0 deletions src/metrics/policy/storage_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ func NewStoragePolicyFromProto(pb *policypb.StoragePolicy) (StoragePolicy, error
return sp, nil
}

// Equivalent returns whether two storage policies are equal by their
// retention width and resolution. The resolution precision is ignored
// for equivalency (hence why the method is not named Equal).
func (p StoragePolicy) Equivalent(other StoragePolicy) bool {
return p.resolution.Window == other.resolution.Window &&
p.retention == other.retention
}

// String is the string representation of a storage policy.
func (p StoragePolicy) String() string {
return fmt.Sprintf("%s%s%s", p.resolution.String(), resolutionRetentionSeparator, p.retention.String())
Expand Down

0 comments on commit 8ddd8ea

Please sign in to comment.