diff --git a/README.md b/README.md
index e4ed5fab83a..d78d0d1910b 100644
--- a/README.md
+++ b/README.md
@@ -94,6 +94,9 @@ It stores spans as json and has been designed for larger scale.
Note: This store requires a [spark job](https://github.com/openzipkin/zipkin-dependencies) to aggregate dependency links.
+### Throttling
+As part of a [Collector surge and error handling](https://cwiki.apache.org/confluence/display/ZIPKIN/Collector+surge+and+error+handling) discussion that took place a throttling mechanism was added to allow more fine-grained control over how Zipkin interacts with the various `StorageComponents`. In particular, for those installations which use a push-based Collector (such as the HTTP rest API), enabling the throttle can allow Zipkin to buffer some messages in order to avoid aggressively dropping them. See [zipkin-server](zipkin-server#throttled-storage) for configuration information.
+
### Disabling search
The following API endpoints provide search features, and are enabled by
default. Search primarily allows the trace list screen of the UI operate.
diff --git a/pom.xml b/pom.xml
index 856ac6a5fc5..987ffa3a767 100755
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@
0.84.027.0.1-jre
+ 0.2.03.0.0-alpha01
diff --git a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
index aabcbcb40bc..56f988c2ea1 100644
--- a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
+++ b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
@@ -31,6 +31,7 @@
import zipkin2.storage.StorageComponent;
import static java.util.Arrays.asList;
+import java.util.concurrent.RejectedExecutionException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -186,6 +187,16 @@ public void storeSpansCallback_onErrorWithMessage() {
}
@Test
+ public void errorAcceptingSpans_onErrorRejectedExecution() {
+ RuntimeException error = new RejectedExecutionException("slow down");
+ collector.handleStorageError(TRACE, error, callback);
+
+ verify(callback).onError(error);
+ assertThat(messages)
+ .containsOnly("Cannot store spans [1, 1, 2, ...] due to RejectedExecutionException(slow down)");
+ verify(metrics).incrementSpansDropped(4);
+ }
+
public void handleStorageError_onErrorWithNullMessage() {
RuntimeException error = new RuntimeException();
collector.handleStorageError(TRACE, error, callback);
diff --git a/zipkin-server/README.md b/zipkin-server/README.md
index 7e64055cdae..bd03490f12a 100644
--- a/zipkin-server/README.md
+++ b/zipkin-server/README.md
@@ -157,6 +157,14 @@ Defaults to true
* `AUTOCOMPLETE_KEYS`: list of span tag keys which will be returned by the `/api/v2/autocompleteTags` endpoint
* `AUTOCOMPLETE_TTL`: How long in milliseconds to suppress calls to write the same autocomplete key/value pair. Default 3600000 (1 hr)
+### Throttled Storage
+These settings can be used to help tune the rate at which Zipkin flushes data to another, underlying `StorageComponent` (such as Elasticsearch):
+
+ * `STORAGE_THROTTLE_ENABLED`: Enables throttling
+ * `STORAGE_THROTTLE_MIN_CONCURRENCY`: Minimum number of Threads to use for writing to storage.
+ * `STORAGE_THROTTLE_MAX_CONCURRENCY`: Maximum number of Threads to use for writing to storage. In order to avoid configuration drift, this value may override other, storage-specific values such as Elasticsearch's `ES_MAX_REQUESTS`.
+ * `STORAGE_THROTTLE_MAX_QUEUE_SIZE`: How many messages to buffer while all Threads are writing data before abandoning a message (0 = no buffering).
+
### Cassandra Storage
Zipkin's [Cassandra storage component](../zipkin-storage/cassandra)
supports version 3.11+ and applies when `STORAGE_TYPE` is set to `cassandra3`:
diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml
index f17dd8954ab..c53f629d273 100644
--- a/zipkin-server/pom.xml
+++ b/zipkin-server/pom.xml
@@ -250,6 +250,17 @@
${micrometer.version}
+
+ com.netflix.concurrency-limits
+ concurrency-limits-core
+ ${netflix.concurrency.limits.version}
+
+
+ io.micrometer
+ micrometer-core
+ ${micrometer.version}
+
+
io.zipkin.brave
@@ -299,6 +310,11 @@
2.4.0test
+
+ org.mockito
+ mockito-core
+ test
+
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java
new file mode 100644
index 00000000000..0d7cb4e1845
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
+import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+
+@Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class)
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD})
+@interface ConditionalOnThrottledStorage {
+ class ThrottledStorageCondition extends SpringBootCondition {
+ @Override
+ public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
+ String throttleEnabled = context.getEnvironment()
+ .getProperty("zipkin.storage.throttle.enabled");
+
+ if (!Boolean.valueOf(throttleEnabled)) {
+ return ConditionOutcome.noMatch("zipkin.storage.throttle.enabled isn't true");
+ }
+
+ return ConditionOutcome.match();
+ }
+ }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
index cb381a7f338..b3d4b90ae3a 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
@@ -26,6 +26,9 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;
import java.util.List;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
@@ -33,20 +36,24 @@
import org.springframework.boot.actuate.health.HealthAggregator;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.Order;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.web.servlet.config.annotation.ViewControllerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+import zipkin2.server.internal.throttle.ZipkinStorageThrottleProperties;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.CollectorSampler;
import zipkin2.server.internal.brave.TracingStorageComponent;
import zipkin2.storage.InMemoryStorage;
import zipkin2.storage.StorageComponent;
+import zipkin2.server.internal.throttle.ThrottledStorageComponent;
@Configuration
@ImportAutoConfiguration(ArmeriaSpringActuatorAutoConfiguration.class)
@@ -157,10 +164,47 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
}
}
+ @Configuration
+ @EnableConfigurationProperties(ZipkinStorageThrottleProperties.class)
+ @ConditionalOnThrottledStorage
+ static class ThrottledStorageComponentEnhancer implements BeanPostProcessor, BeanFactoryAware {
+
+ /**
+ * Need this to resolve cyclic instantiation issue with spring. Mostly, this is for MeterRegistry as really
+ * bad things happen if you try to Autowire it (loss of JVM metrics) but also using it for properties just to make
+ * sure no cycles exist at all as a result of turning throttling on.
+ *
+ *
+ */
+ private BeanFactory beanFactory;
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) {
+ if (bean instanceof StorageComponent) {
+ ZipkinStorageThrottleProperties throttleProperties = beanFactory.getBean(ZipkinStorageThrottleProperties.class);
+ return new ThrottledStorageComponent((StorageComponent) bean,
+ beanFactory.getBean(MeterRegistry.class),
+ throttleProperties.getMinConcurrency(),
+ throttleProperties.getMaxConcurrency(),
+ throttleProperties.getMaxQueueSize());
+ }
+ return bean;
+ }
+
+ @Override
+ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
+ this.beanFactory = beanFactory;
+ }
+ }
+
/**
* This is a special-case configuration if there's no StorageComponent of any kind. In-Mem can
* supply both read apis, so we add two beans here.
+ *
+ *
Note: this needs to be {@link Lazy} to avoid circular dependency issues when using with
+ * {@link ThrottledStorageComponentEnhancer}.
*/
+ @Lazy
@Configuration
@Conditional(StorageTypeMemAbsentOrEmpty.class)
@ConditionalOnMissingBean(StorageComponent.class)
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
index 8ba574f355b..bd034f17c92 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
@@ -23,6 +23,7 @@
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import zipkin2.elasticsearch.ElasticsearchStorage;
@@ -40,8 +41,10 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
private String index = "zipkin";
/** The date separator used to create the index name. Default to -. */
private String dateSeparator = "-";
- /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 */
+ /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */
private int maxRequests = 64;
+ /** Overrides maximum in-flight requests to match throttling settings if throttling is enabled. */
+ private Integer throttleMaxConcurrency;
/** Number of shards (horizontal scaling factor) per index. Defaults to 5. */
private int indexShards = 5;
/** Number of replicas (redundancy factor) per index. Defaults to 1.` */
@@ -61,6 +64,13 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
*/
private int timeout = 10_000;
+ public ZipkinElasticsearchStorageProperties(@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
+ @Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
+ if (throttleEnabled) {
+ this.throttleMaxConcurrency = throttleMaxConcurrency;
+ }
+ }
+
public String getPipeline() {
return pipeline;
}
@@ -180,7 +190,7 @@ public ElasticsearchStorage.Builder toBuilder(OkHttpClient client) {
.index(index)
.dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0))
.pipeline(pipeline)
- .maxRequests(maxRequests)
+ .maxRequests(throttleMaxConcurrency == null ? maxRequests : throttleMaxConcurrency)
.indexShards(indexShards)
.indexReplicas(indexReplicas);
}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java
new file mode 100644
index 00000000000..acedc2dadaa
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import com.netflix.concurrency.limits.Limiter;
+import com.netflix.concurrency.limits.limiter.AbstractLimiter;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+final class ActuateThrottleMetrics {
+ private final MeterRegistry registryInstance;
+
+ public ActuateThrottleMetrics(MeterRegistry registryInstance) {
+ this.registryInstance = registryInstance;
+ }
+
+ public void bind(ExecutorService executor) {
+ if(!(executor instanceof ThreadPoolExecutor)){
+ return;
+ }
+
+ ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
+ Gauge.builder("zipkin_storage.throttle.concurrency", pool::getCorePoolSize)
+ .description("number of threads running storage requests")
+ .register(registryInstance);
+ Gauge.builder("zipkin_storage.throttle.queue_size", pool.getQueue()::size)
+ .description("number of items queued waiting for access to storage")
+ .register(registryInstance);
+ }
+
+ public void bind(Limiter limiter) {
+ if(!(limiter instanceof AbstractLimiter)){
+ return;
+ }
+
+ AbstractLimiter abstractLimiter = (AbstractLimiter) limiter;
+
+ // This value should parallel (zipkin_storage.throttle.queue_size + zipkin_storage.throttle.concurrency)
+ // It is tracked to make sure it doesn't perpetually increase. If it does then we're not resolving LimitListeners.
+ Gauge.builder("zipkin_storage.throttle.in_flight_requests", abstractLimiter::getInflight)
+ .description("number of requests the limiter thinks are active")
+ .register(registryInstance);
+ }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
new file mode 100644
index 00000000000..14fcd5f43a7
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import com.netflix.concurrency.limits.Limiter;
+import com.netflix.concurrency.limits.Limiter.Listener;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+import zipkin2.Call;
+import zipkin2.Callback;
+import zipkin2.storage.InMemoryStorage;
+
+/**
+ * {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService serves two
+ * purposes:
+ *
+ *
Limits the number of requests that can run in parallel.
+ *
Depending on configuration, can queue up requests to make sure we don't aggressively drop requests that would
+ * otherwise succeed if given a moment. Bounded queues are safest for this as unbounded ones can lead to heap
+ * exhaustion and {@link OutOfMemoryError OOM errors}.
+ *
+ *
+ * @see ThrottledStorageComponent
+ */
+final class ThrottledCall extends Call {
+ final ExecutorService executor;
+ final Limiter limiter;
+ final Listener limitListener;
+ /**
+ * Delegate call needs to be supplied later to avoid having it take action when it is created (like
+ * {@link InMemoryStorage} and thus avoid being throttled.
+ */
+ final Supplier> delegate;
+ Call call;
+ volatile boolean canceled;
+
+ public ThrottledCall(ExecutorService executor, Limiter limiter, Supplier> delegate) {
+ this.executor = executor;
+ this.limiter = limiter;
+ this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
+ this.delegate = delegate;
+ }
+
+ private ThrottledCall(ThrottledCall other) {
+ this(other.executor, other.limiter, other.call == null ? other.delegate : () -> other.call.clone());
+ }
+
+ @Override
+ public V execute() throws IOException {
+ try {
+ call = delegate.get();
+
+ // Make sure we throttle
+ Future future = executor.submit(() -> {
+ String oldName = setCurrentThreadName(call.toString());
+ try {
+ return call.execute();
+ } finally {
+ setCurrentThreadName(oldName);
+ }
+ });
+ V result = future.get(); // Still block for the response
+
+ limitListener.onSuccess();
+ return result;
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RejectedExecutionException) {
+ // Storage rejected us, throttle back
+ limitListener.onDropped();
+ } else {
+ limitListener.onIgnore();
+ }
+
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ throw new RuntimeException("Issue while executing on a throttled call", cause);
+ }
+ } catch (InterruptedException e) {
+ limitListener.onIgnore();
+ throw new RuntimeException("Interrupted while blocking on a throttled call", e);
+ } catch (Exception e) {
+ // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
+ // write bound, but a drop in concurrency won't necessarily help.
+ limitListener.onIgnore();
+ throw e;
+ }
+ }
+
+ @Override
+ public void enqueue(Callback callback) {
+ try {
+ executor.execute(new QueuedCall(callback));
+ } catch (Exception e) {
+ // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be
+ // write bound, but a drop in concurrency won't necessarily help.
+ limitListener.onIgnore();
+ throw e;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ canceled = true;
+ if (call != null) {
+ call.cancel();
+ }
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return canceled || (call != null && call.isCanceled());
+ }
+
+ @Override
+ public Call clone() {
+ return new ThrottledCall<>(this);
+ }
+
+ /**
+ * @param name New name for the current Thread
+ * @return Previous name of the current Thread
+ */
+ static String setCurrentThreadName(String name) {
+ Thread thread = Thread.currentThread();
+ String originalName = thread.getName();
+ try {
+ thread.setName(name);
+ return originalName;
+ } catch (SecurityException e) {
+ return originalName;
+ }
+ }
+
+ final class QueuedCall implements Runnable {
+ final Callback callback;
+
+ public QueuedCall(Callback callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (canceled) {
+ return;
+ }
+
+ call = ThrottledCall.this.delegate.get();
+
+ String oldName = setCurrentThreadName(call.toString());
+ try {
+ enqueueAndWait();
+ } finally {
+ setCurrentThreadName(oldName);
+ }
+ } catch (Exception e) {
+ limitListener.onIgnore();
+ callback.onError(e);
+ }
+ }
+
+ void enqueueAndWait() {
+ ThrottledCallback throttleCallback = new ThrottledCallback<>(callback, limitListener);
+ call.enqueue(throttleCallback);
+
+ // Need to wait here since the callback call will run asynchronously also.
+ // This ensures we don't exceed our throttle/queue limits.
+ throttleCallback.await();
+ }
+ }
+
+ static final class ThrottledCallback implements Callback {
+ final Callback delegate;
+ final Listener limitListener;
+ final CountDownLatch latch;
+
+ public ThrottledCallback(Callback delegate, Listener limitListener) {
+ this.delegate = delegate;
+ this.limitListener = limitListener;
+ this.latch = new CountDownLatch(1);
+ }
+
+ public void await() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ limitListener.onIgnore();
+ throw new RuntimeException("Interrupted while blocking on a throttled call", e);
+ }
+ }
+
+ @Override
+ public void onSuccess(V value) {
+ try {
+ limitListener.onSuccess();
+ delegate.onSuccess(value);
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ try {
+ if (t instanceof RejectedExecutionException) {
+ limitListener.onDropped();
+ } else {
+ limitListener.onIgnore();
+ }
+
+ delegate.onError(t);
+ } finally {
+ latch.countDown();
+ }
+ }
+ }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledSpanConsumer.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledSpanConsumer.java
new file mode 100644
index 00000000000..ac6f6f3930c
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledSpanConsumer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import com.netflix.concurrency.limits.Limiter;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import zipkin2.Call;
+import zipkin2.Span;
+import zipkin2.storage.SpanConsumer;
+
+/**
+ * Delegating implementation that wraps another {@link SpanConsumer} and ensures that only so many requests
+ * can get through to it at a given time.
+ *
+ * @see ThrottledCall
+ */
+final class ThrottledSpanConsumer implements SpanConsumer {
+ final SpanConsumer delegate;
+ final Limiter limiter;
+ final ExecutorService executor;
+
+ ThrottledSpanConsumer(SpanConsumer delegate, Limiter limiter, ExecutorService executor) {
+ this.delegate = delegate;
+ this.limiter = limiter;
+ this.executor = executor;
+ }
+
+ @Override
+ public Call accept(List spans) {
+ return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans));
+ }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
new file mode 100644
index 00000000000..e87156e903e
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import com.netflix.concurrency.limits.Limit;
+import com.netflix.concurrency.limits.limit.Gradient2Limit;
+import com.netflix.concurrency.limits.limiter.AbstractLimiter;
+import io.micrometer.core.instrument.MeterRegistry;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import zipkin2.storage.SpanConsumer;
+import zipkin2.storage.SpanStore;
+import zipkin2.storage.StorageComponent;
+
+/**
+ * Delegating implementation that limits requests to the {@link #spanConsumer()} of another
+ * {@link StorageComponent}. The theory here is that this class can be used to:
+ *
+ *
Prevent spamming the storage engine with excessive, spike requests when they come in; thus preserving it's life.
+ *
Optionally act as a buffer so that a fixed number requests can be queued for execution when the throttle allows
+ * for it. This optional queue must be bounded in order to avoid running out of memory from infinitely queueing.
+ *
+ *
+ * @see ThrottledSpanConsumer
+ */
+public final class ThrottledStorageComponent extends StorageComponent {
+ final StorageComponent delegate;
+ final AbstractLimiter limiter;
+ final ThreadPoolExecutor executor;
+
+ public ThrottledStorageComponent(StorageComponent delegate,
+ MeterRegistry registry,
+ int minConcurrency,
+ int maxConcurrency,
+ int maxQueueSize) {
+ this.delegate = Objects.requireNonNull(delegate);
+
+ Limit limit = Gradient2Limit.newBuilder()
+ .minLimit(minConcurrency)
+ .initialLimit(minConcurrency) // Limiter will trend towards min until otherwise necessary so may as well start there
+ .maxConcurrency(maxConcurrency)
+ .queueSize(0)
+ .build();
+ this.limiter = new Builder().limit(limit).build();
+
+ this.executor = new ThreadPoolExecutor(limit.getLimit(),
+ limit.getLimit(),
+ 0,
+ TimeUnit.DAYS,
+ createQueue(maxQueueSize),
+ new ThottledThreadFactory(),
+ new ThreadPoolExecutor.AbortPolicy());
+ limit.notifyOnChange(new ThreadPoolExecutorResizer(executor));
+
+ if (registry != null) {
+ ActuateThrottleMetrics metrics = new ActuateThrottleMetrics(registry);
+ metrics.bind(executor);
+ metrics.bind(limiter);
+ }
+ }
+
+ @Override
+ public SpanStore spanStore() {
+ return delegate.spanStore();
+ }
+
+ @Override
+ public SpanConsumer spanConsumer() {
+ return new ThrottledSpanConsumer(delegate.spanConsumer(), limiter, executor);
+ }
+
+ @Override
+ public void close() throws IOException {
+ executor.shutdownNow();
+ delegate.close();
+ }
+
+ private static BlockingQueue createQueue(int maxSize) {
+ if (maxSize < 0) {
+ throw new IllegalArgumentException("Invalid max queue size; must be >= 0 but was: " + maxSize);
+ }
+
+ if (maxSize == 0) {
+ // 0 means we should be bounded but we can't create a queue with that size so use 1 instead.
+ maxSize = 1;
+ }
+
+ return new LinkedBlockingQueue<>(maxSize);
+ }
+
+ static final class ThottledThreadFactory implements ThreadFactory {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("throttle-pool-" + thread.getId());
+ return thread;
+ }
+ }
+
+ static final class ThreadPoolExecutorResizer implements Consumer {
+ final ThreadPoolExecutor executor;
+
+ public ThreadPoolExecutorResizer(ThreadPoolExecutor executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * This is {@code synchronized} to ensure that we don't let the core/max pool sizes get out of sync; even
+ * for an instant. The two need to be tightly coupled together to ensure that when our queue fills up we don't spin
+ * up extra Threads beyond our calculated limit.
+ *
+ *
There is also an unfortunate aspect where the {@code max} has to always be greater than {@code core} or an
+ * exception will be thrown. So they have to be adjust appropriately relative to the direction the size is going.
+ */
+ @Override
+ public synchronized void accept(Integer newValue) {
+ int previousValue = executor.getCorePoolSize();
+
+ int newValueInt = newValue;
+ if (previousValue < newValueInt) {
+ executor.setMaximumPoolSize(newValueInt);
+ executor.setCorePoolSize(newValueInt);
+ } else if (previousValue > newValueInt) {
+ executor.setCorePoolSize(newValueInt);
+ executor.setMaximumPoolSize(newValueInt);
+ }
+ // Note: no case for equals. Why modify something that doesn't need modified?
+ }
+ }
+
+ static final class Builder extends AbstractLimiter.Builder {
+ public NonLimitingLimiter build() {
+ return new NonLimitingLimiter(this);
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+ }
+
+ /**
+ * Unlike a normal Limiter, this will actually not prevent the creation of a {@link Listener} in
+ * {@link #acquire(java.lang.Void)}. The point of this is to ensure that we can always derive an appropriate
+ * {@link Limit#getLimit() Limit} while the {@link #executor} handles actually limiting running requests.
+ */
+ static final class NonLimitingLimiter extends AbstractLimiter {
+ public NonLimitingLimiter(AbstractLimiter.Builder> builder) {
+ super(builder);
+ }
+
+ @Override
+ public Optional acquire(Void context) {
+ return Optional.of(createListener());
+ }
+ }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java
new file mode 100644
index 00000000000..d144c7391b9
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("zipkin.storage.throttle")
+public final class ZipkinStorageThrottleProperties {
+ /** Should we throttle at all? */
+ private boolean enabled;
+ /** Minimum number of storage requests to allow through at a given time. */
+ private int minConcurrency;
+ /**
+ * Maximum number of storage requests to allow through at a given time. Should be tuned to (bulk_index_pool_size / num_servers_in_cluster).
+ * e.g. 200 (default pool size in Elasticsearch) / 2 (number of load balanced zipkin-server instances) = 100.
+ */
+ private int maxConcurrency;
+ /**
+ * Maximum number of storage requests to buffer while waiting for open Thread.
+ * 0 = no buffering.
+ */
+ private int maxQueueSize;
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public int getMinConcurrency() {
+ return minConcurrency;
+ }
+
+ public void setMinConcurrency(int minConcurrency) {
+ this.minConcurrency = minConcurrency;
+ }
+
+ public int getMaxConcurrency() {
+ return maxConcurrency;
+ }
+
+ public void setMaxConcurrency(int maxConcurrency) {
+ this.maxConcurrency = maxConcurrency;
+ }
+
+ public int getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ public void setMaxQueueSize(int maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+}
diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml
index 72b0fe8122f..895360c69b8 100644
--- a/zipkin-server/src/main/resources/zipkin-server-shared.yml
+++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml
@@ -53,6 +53,11 @@ zipkin:
autocomplete-ttl: ${AUTOCOMPLETE_TTL:3600000}
autocomplete-cardinality: 20000
type: ${STORAGE_TYPE:mem}
+ throttle:
+ enabled: ${STORAGE_THROTTLE_ENABLED:false}
+ minConcurrency: ${STORAGE_THROTTLE_MIN_CONCURRENCY:10}
+ maxConcurrency: ${STORAGE_THROTTLE_MAX_CONCURRENCY:200}
+ maxQueueSize: ${STORAGE_THROTTLE_MAX_QUEUE_SIZE:1000}
mem:
# Maximum number of spans to keep in memory. When exceeded, oldest traces (and their spans) will be purged.
# A safe estimate is 1K of memory per span (each span with 2 annotations + 1 binary annotation), plus
diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/throttle/FakeCall.java b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/FakeCall.java
new file mode 100644
index 00000000000..6e6d720b5f5
--- /dev/null
+++ b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/FakeCall.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
+import zipkin2.Call;
+import zipkin2.Callback;
+
+class FakeCall extends Call {
+ boolean overCapacity = false;
+
+ public void setOverCapacity(boolean isOverCapacity) {
+ this.overCapacity = isOverCapacity;
+ }
+
+ @Override
+ public Void execute() throws IOException {
+ if (overCapacity) {
+ throw new RejectedExecutionException();
+ }
+
+ return null;
+ }
+
+ @Override
+ public void enqueue(Callback callback) {
+ if (overCapacity) {
+ callback.onError(new RejectedExecutionException());
+ } else {
+ callback.onSuccess(null);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return false;
+ }
+
+ @Override
+ public Call clone() {
+ return null;
+ }
+}
diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledCallTest.java b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledCallTest.java
new file mode 100644
index 00000000000..de8d29b5efe
--- /dev/null
+++ b/zipkin-server/src/test/java/zipkin2/server/internal/throttle/ThrottledCallTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import com.netflix.concurrency.limits.Limiter;
+import com.netflix.concurrency.limits.Limiter.Listener;
+import com.netflix.concurrency.limits.limit.SettableLimit;
+import com.netflix.concurrency.limits.limiter.SimpleLimiter;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import zipkin2.Call;
+import zipkin2.Callback;
+
+public class ThrottledCallTest {
+ SettableLimit limit;
+ Limiter limiter;
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Before
+ public void setup() {
+ this.limit = SettableLimit.startingAt(0);
+ this.limiter = SimpleLimiter.newBuilder().limit(limit).build();
+ }
+
+ @Test
+ public void callCreation_isDeferred() throws IOException {
+ boolean[] created = new boolean[] {false};
+ Supplier> delegate = () -> {
+ created[0] = true;
+ return Call.create(null);
+ };
+
+ ThrottledCall throttle = createThrottle(delegate);
+
+ assertFalse(created[0]);
+ throttle.execute();
+ assertTrue(created[0]);
+ }
+
+ @Test
+ public void execute_isThrottled() throws Throwable {
+ int numThreads = 1;
+ int queueSize = 1;
+ int totalTasks = numThreads + queueSize;
+
+ Semaphore startLock = new Semaphore(numThreads);
+ Semaphore waitLock = new Semaphore(totalTasks);
+ Semaphore failLock = new Semaphore(1);
+ Supplier> delegate = () -> new LockedCall(startLock, waitLock);
+ ThrottledCall throttle = createThrottle(numThreads, queueSize, delegate);
+
+ // Step 1: drain appropriate locks
+ startLock.drainPermits();
+ waitLock.drainPermits();
+ failLock.drainPermits();
+
+ // Step 2: saturate threads and fill queue
+ ExecutorService backgroundPool = Executors.newCachedThreadPool();
+ for (int i = 0; i < totalTasks; i++) {
+ backgroundPool.submit(throttle::execute);
+ }
+
+ try {
+ // Step 3: make sure the threads actually started
+ startLock.acquire(numThreads);
+
+ // Step 4: submit something beyond our limits
+ Future