Skip to content

Commit

Permalink
Export client request latencies as histograms
Browse files Browse the repository at this point in the history
This adds support for histogram-style metrics instead of using
summaries. It means we can sum on a cluster level and present the user's
experienced latency instead of looking at it on a per-node level.

The current version limits the range of histogram buckets between 0.1ms
and 60s, to avoid exporting huge amounts of buckets that are likely
empty. Further patches could limit this further, for example by going
for a 1.44x increment instead of the 1.2x increment, or by specifying
the ranges in the configuration.

Even with the limits in place, this exports 76 metrics to 3 metric
families per histogram. The original summaries-based code only exports 8
metrics (in 3 families), though in theory those are no longer needed and
could be disabled with a flag.
  • Loading branch information
TvdW committed Sep 3, 2020
1 parent 2767af9 commit a22cde2
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public Iterable<Interval> getIntervals() {
new Interval(Interval.Quantile.P_99_9, (float) timer.get999thPercentile() * durationFactor)
);
}

@Override
public long[] getValues() {
return timer.values();
}
};
}

Expand All @@ -108,6 +113,11 @@ public Iterable<Interval> getIntervals() {
new Interval(Interval.Quantile.P_99_9, (float) histogram.get999thPercentile())
);
}

@Override
public long[] getValues() {
return histogram.values();
}
};
}

Expand All @@ -125,6 +135,11 @@ public Iterable<Interval> getIntervals() {

return Interval.asIntervals(Interval.Quantile.STANDARD_PERCENTILES, q -> (float) snapshot.getValue(q.value));
}

@Override
public long[] getValues() {
return metric.getSnapshot().getValues();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.cassandra.metrics.CassandraMetricsRegistry.JmxMeterMBean;
import org.apache.cassandra.utils.EstimatedHistogram;

import java.util.ArrayList;
import java.util.stream.Stream;

public final class CollectorFunctions {
Expand Down Expand Up @@ -176,4 +177,58 @@ protected static CollectorFunction<SamplingCounting> samplingAndCountingAsSummar
public static CollectorFunction<SamplingCounting> samplingAndCountingAsSummary() {
return samplingAndCountingAsSummary(FloatFloatFunction.identity());
}

/**
* Collect a {@link SamplingCounting} as a Prometheus histogram.
*/
protected static CollectorFunction<SamplingCounting> samplingAndCountingAsHistogram(final FloatFloatFunction bucketScaleFunction) {
// Set some limits on the range so we don't export all 170 buckets
float bucketMin = 0.0001f; // 0.1ms
float bucketMax = 60.0f; // 60sec

// Avoid recomputing the buckets frequently. Cassandra uses ~170 buckets
float[] cachedBuckets = newBucketOffsets(200, bucketScaleFunction);

return group -> {
final Stream<HistogramMetricFamily.Histogram> histogramStream = group.labeledObjects().entrySet().stream()
.map(e -> {
long[] values = e.getValue().getValues();
float[] buckets = values.length <= cachedBuckets.length
? cachedBuckets
: newBucketOffsets(values.length, bucketScaleFunction);

float sum = 0;
long count = 0;
ArrayList<Interval> intervals = new ArrayList<>();
assert values[values.length-1] == 0;

for (int i = 0; i < values.length; i++) {
if (values[i] != 0) {
sum += buckets[i] * values[i];
count += values[i];
}
if (buckets[i] >= bucketMin && buckets[i] <= bucketMax) {
intervals.add(new Interval(new Interval.Quantile(buckets[i]), count));
}
}

return new HistogramMetricFamily.Histogram(e.getKey(), sum, count, intervals);
});

return Stream.of(new HistogramMetricFamily(group.name(), group.help(), histogramStream));
};
}

public static CollectorFunction<SamplingCounting> samplingAndCountingAsHistogram() {
return samplingAndCountingAsHistogram(FloatFloatFunction.identity());
}

private static float[] newBucketOffsets(int size, final FloatFloatFunction bucketScaleFunction) {
long[] rawOffsets = EstimatedHistogram.newOffsets(size, false);
float[] adjustedOffsets = new float[size];
for (int i = 0; i < size; i++) {
adjustedOffsets[i] = bucketScaleFunction.apply(rawOffsets[i]);
}
return adjustedOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,15 @@ private static FactoryBuilder.CollectorConstructor histogramAsSummaryCollectorCo
};
}

private static FactoryBuilder.CollectorConstructor histogramAsHistogramCollectorConstructor() {
return (name, help, labels, mBean) -> {
final NamedObject<SamplingCounting> samplingCountingNamedObject = CassandraMetricsUtilities.jmxHistogramAsSamplingCounting(mBean);

return new FunctionalMetricFamilyCollector<>(name, help, ImmutableMap.of(labels, samplingCountingNamedObject),
samplingAndCountingAsHistogram(MetricValueConversionFunctions::nanosecondsToSeconds));
};
}

private static <T> FactoryBuilder.CollectorConstructor functionalCollectorConstructor(final FunctionalMetricFamilyCollector.CollectorFunction<T> function) {
return (final String name, final String help, final Labels labels, final NamedObject<?> mBean) ->
new FunctionalMetricFamilyCollector<>(name, help, ImmutableMap.of(labels, mBean.<T>cast()), function);
Expand Down Expand Up @@ -592,6 +601,8 @@ public List<Factory> get() {

builder.add(clientRequestMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "Latency", "latency_seconds", "Request latency."));
builder.add(clientRequestMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "TotalLatency", "latency_seconds", "Total request duration."));

builder.add(clientRequestMetricFactory(histogramAsHistogramCollectorConstructor(), "Latency", "latency_hist_seconds", "Request latency."));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ public interface SamplingCounting {
long getCount();

Iterable<Interval> getIntervals();

long[] getValues();
}

0 comments on commit a22cde2

Please sign in to comment.