From 0537f5da7d940ffe584d4b6e071ace438e99ec98 Mon Sep 17 00:00:00 2001 From: Romain Bioteau Date: Fri, 6 Sep 2024 16:11:28 +0200 Subject: [PATCH] feat(worker): use a SingleThreadPoolExecutor for BonitaExecutorService (#3135) Use a `SingleThreadPoolExecutorFactory` in community for `BonitaExecutorService`. Use a `ThreadPoolExecutorFactory` in subscription that supports ThreadPool sizing properties. Display a warning at startup when those properties are overridden with another value. Closes [BPM-239](https://bonitasoft.atlassian.net/browse/BPM-239) [BPM-239]: https://bonitasoft.atlassian.net/browse/BPM-239?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ --- .../service/impl/SpringBeanAccessor.java | 28 +- .../bonita-tenant-community.properties | 3 - .../engine/work/BonitaExecutorService.java | 6 +- .../work/BonitaThreadPoolExecutorFactory.java | 22 ++ ...java => DefaultBonitaExecutorService.java} | 82 +++-- .../DefaultBonitaExecutorServiceFactory.java | 89 ++---- .../work/SingleThreadPoolExecutorFactory.java | 81 +++++ .../engine/work/WorkDescriptor.java | 6 +- .../engine/work/WorkerThreadFactory.java | 4 + .../work/BonitaThreadPoolExecutorTest.java | 286 ------------------ ...faultBonitaExecutorServiceFactoryTest.java | 34 ++- .../DefaultBonitaExecutorServiceTest.java | 273 +++++++++++++++++ 12 files changed, 496 insertions(+), 418 deletions(-) create mode 100644 services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutorFactory.java rename services/bonita-work/src/main/java/org/bonitasoft/engine/work/{BonitaThreadPoolExecutor.java => DefaultBonitaExecutorService.java} (68%) create mode 100644 services/bonita-work/src/main/java/org/bonitasoft/engine/work/SingleThreadPoolExecutorFactory.java delete mode 100644 services/bonita-work/src/test/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutorTest.java create mode 100644 services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceTest.java diff --git a/bpm/bonita-core/bonita-process-engine/src/main/java/org/bonitasoft/engine/service/impl/SpringBeanAccessor.java b/bpm/bonita-core/bonita-process-engine/src/main/java/org/bonitasoft/engine/service/impl/SpringBeanAccessor.java index 20170fbc0ad..03f1cd58cfa 100644 --- a/bpm/bonita-core/bonita-process-engine/src/main/java/org/bonitasoft/engine/service/impl/SpringBeanAccessor.java +++ b/bpm/bonita-core/bonita-process-engine/src/main/java/org/bonitasoft/engine/service/impl/SpringBeanAccessor.java @@ -18,10 +18,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; +import java.util.*; import lombok.extern.slf4j.Slf4j; import org.bonitasoft.engine.exception.BonitaRuntimeException; @@ -44,6 +41,11 @@ public class SpringBeanAccessor { private static final String HAZELCAST_CONFIG_FILENAME = "hazelcast.xml"; static final BonitaHomeServer BONITA_HOME_SERVER = BonitaHomeServer.getInstance(); + + private static final String WORK_CORE_POOL_SIZE = "bonita.tenant.work.corePoolSize"; + private static final String WORK_MAX_POOL_SIZE = "bonita.tenant.work.maximumPoolSize"; + private static final String WORK_KEEP_ALIVE_IN_SECONDS = "bonita.tenant.work.keepAliveTimeSeconds"; + private BonitaSpringContext context; private boolean contextFinishedInitialized = false; @@ -108,6 +110,24 @@ private void configureContext(BonitaSpringContext context) throws IOException { propertySources.addAfter(StandardEnvironment.SYSTEM_ENVIRONMENT_PROPERTY_SOURCE_NAME, new PropertiesPropertySource("contextProperties", getProperties())); } + warnDeprecatedProperties(propertySources); + } + + protected void warnDeprecatedProperties(MutablePropertySources propertySources) { + warnIfPropertyIsDeprecated(propertySources, WORK_CORE_POOL_SIZE); + warnIfPropertyIsDeprecated(propertySources, WORK_MAX_POOL_SIZE); + warnIfPropertyIsDeprecated(propertySources, WORK_KEEP_ALIVE_IN_SECONDS); + } + + private void warnIfPropertyIsDeprecated(MutablePropertySources propertySources, String property) { + propertySources.stream() + .filter(ps -> ps.containsProperty(property)) + .map(ps -> ps.getProperty(property)) + .filter(Objects::nonNull) + .findFirst() + .ifPresent(value -> log.warn( + "{} property is not supported in community edition anymore. It will be ignored.", + property)); } protected BonitaSpringContext createContext() { diff --git a/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-tenant-community.properties b/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-tenant-community.properties index 7284cd099f0..2665bbd5b78 100644 --- a/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-tenant-community.properties +++ b/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-tenant-community.properties @@ -32,9 +32,6 @@ bonita.tenant.connector.warnWhenLongerThanMillis=10000 # Work service # Time to wait in seconds for all work to terminate when the service is paused or stopped bonita.tenant.work.terminationTimeout=30 -bonita.tenant.work.corePoolSize=10 -bonita.tenant.work.maximumPoolSize=10 -bonita.tenant.work.keepAliveTimeSeconds=60 bonita.tenant.work.queueCapacity=500000 # Add a delay on work when the transaction that registers the work has multiple XA Resources diff --git a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaExecutorService.java b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaExecutorService.java index bf11bef2552..e8799ead9fe 100644 --- a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaExecutorService.java +++ b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaExecutorService.java @@ -13,6 +13,8 @@ **/ package org.bonitasoft.engine.work; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -38,8 +40,10 @@ public interface BonitaExecutorService { * * @param work */ - void submit(WorkDescriptor work); + Future submit(WorkDescriptor work); boolean awaitTermination(long workTerminationTimeout, TimeUnit seconds) throws InterruptedException; + ThreadPoolExecutor getExecutor(); + } diff --git a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutorFactory.java b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutorFactory.java new file mode 100644 index 00000000000..e6bb373e13d --- /dev/null +++ b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutorFactory.java @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2024 Bonitasoft S.A. + * Bonitasoft, 32 rue Gustave Eiffel - 38000 Grenoble + * This library is free software; you can redistribute it and/or modify it under the terms + * of the GNU Lesser General Public License as published by the Free Software Foundation + * version 2.1 of the License. + * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU Lesser General Public License for more details. + * You should have received a copy of the GNU Lesser General Public License along with this + * program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth + * Floor, Boston, MA 02110-1301, USA. + **/ +package org.bonitasoft.engine.work; + +import java.util.concurrent.ThreadPoolExecutor; + +public interface BonitaThreadPoolExecutorFactory { + + ThreadPoolExecutor create(); + +} diff --git a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutor.java b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/DefaultBonitaExecutorService.java similarity index 68% rename from services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutor.java rename to services/bonita-work/src/main/java/org/bonitasoft/engine/work/DefaultBonitaExecutorService.java index b2d724b4eb3..7d95c6f791c 100644 --- a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutor.java +++ b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/DefaultBonitaExecutorService.java @@ -15,14 +15,7 @@ import java.util.Collection; import java.util.HashMap; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import io.micrometer.core.instrument.Counter; @@ -34,41 +27,34 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * @author Julien Reboul - * @author Baptiste Mesta - */ - -public class BonitaThreadPoolExecutor extends ThreadPoolExecutor implements BonitaExecutorService { +public class DefaultBonitaExecutorService implements BonitaExecutorService { - private Logger log = LoggerFactory.getLogger(BonitaThreadPoolExecutor.class); + private static final Logger log = LoggerFactory.getLogger(DefaultBonitaExecutorService.class); public static final String NUMBER_OF_WORKS_PENDING = "bonita.bpmengine.work.pending"; public static final String NUMBER_OF_WORKS_RUNNING = "bonita.bpmengine.work.running"; public static final String NUMBER_OF_WORKS_EXECUTED = "bonita.bpmengine.work.executed"; + public static final String WORKS_UNIT = "works"; - private final BlockingQueue workQueue; private final WorkFactory workFactory; private final EngineClock engineClock; private final WorkExecutionCallback workExecutionCallback; - private WorkExecutionAuditor workExecutionAuditor; - private MeterRegistry meterRegistry; + private final WorkExecutionAuditor workExecutionAuditor; + private final MeterRegistry meterRegistry; private final AtomicLong runningWorks = new AtomicLong(); private final Counter executedWorkCounter; private final Gauge numberOfWorksPending; private final Gauge numberOfWorksRunning; - - public BonitaThreadPoolExecutor(final int corePoolSize, - final int maximumPoolSize, - final long keepAliveTime, - final TimeUnit unit, - final BlockingQueue workQueue, - final ThreadFactory threadFactory, - final RejectedExecutionHandler handler, WorkFactory workFactory, EngineClock engineClock, - WorkExecutionCallback workExecutionCallback, - WorkExecutionAuditor workExecutionAuditor, MeterRegistry meterRegistry, long tenantId) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - this.workQueue = workQueue; + private final ThreadPoolExecutor executor; + + public DefaultBonitaExecutorService(final ThreadPoolExecutor executor, + final WorkFactory workFactory, + final EngineClock engineClock, + final WorkExecutionCallback workExecutionCallback, + final WorkExecutionAuditor workExecutionAuditor, + final MeterRegistry meterRegistry, + final long tenantId) { + this.executor = executor; this.workFactory = workFactory; this.engineClock = engineClock; this.workExecutionCallback = workExecutionCallback; @@ -76,44 +62,39 @@ public BonitaThreadPoolExecutor(final int corePoolSize, this.meterRegistry = meterRegistry; Tags tags = Tags.of("tenant", String.valueOf(tenantId)); - numberOfWorksPending = Gauge.builder(NUMBER_OF_WORKS_PENDING, workQueue, Collection::size) - .tags(tags).baseUnit("works").description("Works pending in the execution queue") + numberOfWorksPending = Gauge.builder(NUMBER_OF_WORKS_PENDING, executor.getQueue(), Collection::size) + .tags(tags).baseUnit(WORKS_UNIT).description("Works pending in the execution queue") .register(meterRegistry); numberOfWorksRunning = Gauge.builder(NUMBER_OF_WORKS_RUNNING, runningWorks, AtomicLong::get) - .tags(tags).baseUnit("works").description("Works currently executing") + .tags(tags).baseUnit(WORKS_UNIT).description("Works currently executing") .register(meterRegistry); executedWorkCounter = Counter.builder(NUMBER_OF_WORKS_EXECUTED) - .tags(tags).baseUnit("works").description("total works executed since last server start") + .tags(tags).baseUnit(WORKS_UNIT).description("total works executed since last server start") .register(meterRegistry); } - @Override - public void clearAllQueues() { - workQueue.clear(); + public ThreadPoolExecutor getExecutor() { + return executor; } @Override - public Future submit(final Runnable task) { - // only submit if not shutdown - if (!isShutdown()) { - execute(task); - } - return null; + public void clearAllQueues() { + executor.getQueue().clear(); } @Override public void shutdownAndEmptyQueue() { - super.shutdown(); - log.info("Clearing queue of work, had {} elements", workQueue.size()); - workQueue.clear(); + executor.shutdown(); + log.info("Clearing queue of work, had {} elements", executor.getQueue().size()); + executor.getQueue().clear(); meterRegistry.remove(numberOfWorksPending); meterRegistry.remove(numberOfWorksRunning); meterRegistry.remove(executedWorkCounter); } @Override - public void submit(WorkDescriptor work) { - submit(() -> { + public Future submit(WorkDescriptor work) { + return executor.submit(() -> { if (isRequiringDelayedExecution(work)) { // Future implementation should use a real delay e.g. using a ScheduledThreadPoolExecutor // Will be executed later @@ -152,6 +133,11 @@ public void submit(WorkDescriptor work) { }); } + @Override + public boolean awaitTermination(long workTerminationTimeout, TimeUnit seconds) throws InterruptedException { + return executor.awaitTermination(workTerminationTimeout, seconds); + } + private boolean isRequiringDelayedExecution(WorkDescriptor work) { return work.getExecutionThreshold() != null && work.getExecutionThreshold().isAfter(engineClock.now()); } diff --git a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactory.java b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactory.java index 6b6f0fa693f..05ee66479a6 100644 --- a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactory.java +++ b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactory.java @@ -13,12 +13,7 @@ **/ package org.bonitasoft.engine.work; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import io.micrometer.core.instrument.MeterRegistry; import org.bonitasoft.engine.commons.time.EngineClock; @@ -45,83 +40,55 @@ @Component("bonitaExecutorServiceFactory") public class DefaultBonitaExecutorServiceFactory implements BonitaExecutorServiceFactory { - private Logger logger = LoggerFactory.getLogger(DefaultBonitaExecutorServiceFactory.class); - private final int corePoolSize; - private final int queueCapacity; - private final int maximumPoolSize; - private final long keepAliveTimeSeconds; - private final EngineClock engineClock; - private final WorkFactory workFactory; + private static final String BONITA_WORK_EXECUTOR = "bonita-work-executor"; + private final Logger logger = LoggerFactory.getLogger(DefaultBonitaExecutorServiceFactory.class); + private final long tenantId; - private final WorkExecutionAuditor workExecutionAuditor; private final MeterRegistry meterRegistry; private final ExecutorServiceMetricsProvider executorServiceMetricsProvider; + private final BonitaThreadPoolExecutorFactory bonitaThreadPoolExecutorFactory; + private final EngineClock engineClock; + private final WorkFactory workFactory; + private final WorkExecutionAuditor workExecutionAuditor; - public DefaultBonitaExecutorServiceFactory( - WorkFactory workFactory, - @Value("${tenantId}") long tenantId, - @Value("${bonita.tenant.work.corePoolSize}") int corePoolSize, - @Value("${bonita.tenant.work.queueCapacity}") int queueCapacity, - @Value("${bonita.tenant.work.maximumPoolSize}") int maximumPoolSize, - @Value("${bonita.tenant.work.keepAliveTimeSeconds}") long keepAliveTimeSeconds, + public DefaultBonitaExecutorServiceFactory(@Value("${tenantId}") long tenantId, + MeterRegistry meterRegistry, EngineClock engineClock, + WorkFactory workFactory, WorkExecutionAuditor workExecutionAuditor, - MeterRegistry meterRegistry, - ExecutorServiceMetricsProvider executorServiceMetricsProvider) { - this.workFactory = workFactory; + ExecutorServiceMetricsProvider executorServiceMetricsProvider, + BonitaThreadPoolExecutorFactory bonitaThreadPoolExecutorFactory) { this.tenantId = tenantId; - this.corePoolSize = corePoolSize; - this.queueCapacity = queueCapacity; - this.maximumPoolSize = maximumPoolSize; - this.keepAliveTimeSeconds = keepAliveTimeSeconds; - this.engineClock = engineClock; - this.workExecutionAuditor = workExecutionAuditor; this.meterRegistry = meterRegistry; + this.workFactory = workFactory; + this.workExecutionAuditor = workExecutionAuditor; + this.engineClock = engineClock; this.executorServiceMetricsProvider = executorServiceMetricsProvider; + this.bonitaThreadPoolExecutorFactory = bonitaThreadPoolExecutorFactory; } @Override public BonitaExecutorService createExecutorService(WorkExecutionCallback workExecutionCallback) { - final BlockingQueue workQueue = new ArrayBlockingQueue<>(queueCapacity); - final RejectedExecutionHandler handler = new QueueRejectedExecutionHandler(); - final WorkerThreadFactory threadFactory = new WorkerThreadFactory("Bonita-Worker", tenantId, maximumPoolSize); - - final BonitaThreadPoolExecutor bonitaThreadPoolExecutor = new BonitaThreadPoolExecutor(corePoolSize, - maximumPoolSize, keepAliveTimeSeconds, TimeUnit.SECONDS, - workQueue, threadFactory, handler, workFactory, engineClock, workExecutionCallback, - workExecutionAuditor, meterRegistry, tenantId); + final ThreadPoolExecutor bonitaThreadPoolExecutor = bonitaThreadPoolExecutorFactory.create(); + final BonitaExecutorService bonitaExecutorService = new DefaultBonitaExecutorService(bonitaThreadPoolExecutor, + workFactory, + engineClock, + workExecutionCallback, + workExecutionAuditor, + meterRegistry, + tenantId); logger.info( - "Creating a new Thread pool to handle works: " + bonitaThreadPoolExecutor); + "Creating a new Thread pool to handle works: {}", bonitaThreadPoolExecutor); //TODO this returns the timed executor service, this should be used instead of the BonitaExecutorService but we should change it everywhere executorServiceMetricsProvider - .bindMetricsOnly(meterRegistry, bonitaThreadPoolExecutor, "bonita-work-executor", tenantId); - return bonitaThreadPoolExecutor; + .bindMetricsOnly(meterRegistry, bonitaThreadPoolExecutor, BONITA_WORK_EXECUTOR, tenantId); + return bonitaExecutorService; } @Override public void unbind() { - executorServiceMetricsProvider.unbind(meterRegistry, "bonita-work-executor", tenantId); - } - - private final class QueueRejectedExecutionHandler implements RejectedExecutionHandler { - - public QueueRejectedExecutionHandler() { - } - - @Override - public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) { - if (executor.isShutdown()) { - logger.info("Tried to run work " + task - + " but the work service is shutdown. work will be restarted with the node"); - } else { - throw new RejectedExecutionException( - "Unable to run the task " - + task - + "\n your work queue is full you might consider changing your configuration to scale more. See parameter 'queueCapacity' in bonita.home configuration files."); - } - } - + executorServiceMetricsProvider.unbind(meterRegistry, BONITA_WORK_EXECUTOR, tenantId); } } diff --git a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/SingleThreadPoolExecutorFactory.java b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/SingleThreadPoolExecutorFactory.java new file mode 100644 index 00000000000..7a1a05b670b --- /dev/null +++ b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/SingleThreadPoolExecutorFactory.java @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2024 Bonitasoft S.A. + * Bonitasoft, 32 rue Gustave Eiffel - 38000 Grenoble + * This library is free software; you can redistribute it and/or modify it under the terms + * of the GNU Lesser General Public License as published by the Free Software Foundation + * version 2.1 of the License. + * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU Lesser General Public License for more details. + * You should have received a copy of the GNU Lesser General Public License along with this + * program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth + * Floor, Boston, MA 02110-1301, USA. + **/ +package org.bonitasoft.engine.work; + +import java.util.concurrent.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; +import org.springframework.stereotype.Component; + +@Component +@ConditionalOnSingleCandidate(BonitaThreadPoolExecutorFactory.class) +public class SingleThreadPoolExecutorFactory implements BonitaThreadPoolExecutorFactory { + + private final int queueCapacity; + private final long tenantId; + + public SingleThreadPoolExecutorFactory(@Value("${tenantId}") long tenantId, + @Value("${bonita.tenant.work.queueCapacity}") int queueCapacity) { + this.queueCapacity = queueCapacity; + this.tenantId = tenantId; + } + + @Override + public ThreadPoolExecutor create() { + return new SingleThreadPoolExecutor(new ArrayBlockingQueue<>(queueCapacity), + new WorkerThreadFactory("Bonita-Worker", tenantId)); + } + + public static class SingleThreadPoolExecutor extends ThreadPoolExecutor { + + public SingleThreadPoolExecutor(final BlockingQueue workQueue, + final ThreadFactory threadFactory) { + super(1, 1, 0, TimeUnit.MILLISECONDS, workQueue, threadFactory, new QueueRejectedExecutionHandler()); + } + + @Override + public Future submit(final Runnable task) { + // only submit if not shutdown + if (!isShutdown()) { + return super.submit(task); + + } + return null; + } + + } + + public static class QueueRejectedExecutionHandler implements RejectedExecutionHandler { + + private static final Logger logger = LoggerFactory.getLogger(QueueRejectedExecutionHandler.class); + + @Override + public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) { + if (executor.isShutdown()) { + logger.info( + "Tried to run work {} but the work service is shutdown. work will be restarted with the node", + task); + } else { + throw new RejectedExecutionException( + "Unable to run the task " + + task + + "\n your work queue is full you might consider changing your configuration to scale more. See parameter 'bonita.tenant.work.queueCapacity' in bonita-tenant-community.properties configuration files."); + } + } + + } +} diff --git a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/WorkDescriptor.java b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/WorkDescriptor.java index bc82988ceed..9f1a9dd41d8 100644 --- a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/WorkDescriptor.java +++ b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/WorkDescriptor.java @@ -30,10 +30,10 @@ */ public class WorkDescriptor implements Serializable { - private String uuid = UUID.randomUUID().toString(); - private String type; + private final String uuid = UUID.randomUUID().toString(); + private final String type; private Long tenantId; - private Map parameters; + private final Map parameters; private int retryCount = 0; private Instant executionThreshold; private int executionCount = 0; diff --git a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/WorkerThreadFactory.java b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/WorkerThreadFactory.java index 67728569061..ddd514ba3cb 100644 --- a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/WorkerThreadFactory.java +++ b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/WorkerThreadFactory.java @@ -36,6 +36,10 @@ public WorkerThreadFactory(final String name, final long tenantId, final int max this.padding = guessPadding(maximumPoolSize); } + public WorkerThreadFactory(final String name, final long tenantId) { + this(name, tenantId, 1); + } + /** * @param maximumPoolSize */ diff --git a/services/bonita-work/src/test/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutorTest.java b/services/bonita-work/src/test/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutorTest.java deleted file mode 100644 index 3eef82f0dec..00000000000 --- a/services/bonita-work/src/test/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutorTest.java +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Copyright (C) 2017-2019 Bonitasoft S.A. - * Bonitasoft, 32 rue Gustave Eiffel - 38000 Grenoble - * This library is free software; you can redistribute it and/or modify it under the terms - * of the GNU Lesser General Public License as published by the Free Software Foundation - * version 2.1 of the License. - * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; - * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU Lesser General Public License for more details. - * You should have received a copy of the GNU Lesser General Public License along with this - * program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth - * Floor, Boston, MA 02110-1301, USA. - **/ -package org.bonitasoft.engine.work; - -import static java.time.temporal.ChronoUnit.SECONDS; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -import java.time.Duration; -import java.time.Instant; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import io.micrometer.core.instrument.Clock; -import io.micrometer.core.instrument.Gauge; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import org.bonitasoft.engine.commons.time.FixedEngineClock; -import org.bonitasoft.engine.work.audit.WorkExecutionAuditor; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -/** - * @author Baptiste Mesta. - */ -public class BonitaThreadPoolExecutorTest { - - public static final long TENANT_ID = 13L; - @Rule - public MockitoRule mockitoRule = MockitoJUnit.rule(); - @Mock - private WorkExecutionAuditor workExecutionAuditor; - private MyWorkExecutionCallback workExecutionCallback = new MyWorkExecutionCallback(); - private BonitaThreadPoolExecutor bonitaThreadPoolExecutor; - private static int threadNumber = 3; - private FixedEngineClock engineClock = new FixedEngineClock(Instant.now()); - private WorkFactory workFactory = new LocalWorkFactory(2); - private SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry( - // So that micrometer updates its counters every 1 ms: - k -> k.equals("simple.step") ? Duration.ofMillis(1).toString() : null, - Clock.SYSTEM); - - @Before - public void before() throws Exception { - bonitaThreadPoolExecutor = new BonitaThreadPoolExecutor(threadNumber, threadNumber // - , 1_000, TimeUnit.SECONDS // - , new ArrayBlockingQueue<>(1_000) // - , new WorkerThreadFactory("test-worker", 1, threadNumber) // - , (r, executor) -> { - } // - , workFactory, engineClock, workExecutionCallback, workExecutionAuditor, - meterRegistry, TENANT_ID); - } - - @Test - public void should_call_on_success_callback_when_work_executed_properly() { - WorkDescriptor workDescriptor = WorkDescriptor.create("NORMAL"); - - bonitaThreadPoolExecutor.submit(workDescriptor); - - await().until(() -> workExecutionCallback.isOnSuccessCalled()); - assertThat(workExecutionCallback.isOnFailureCalled()).isFalse(); - } - - @Test - public void should_call_on_failure_callback_when_work_executed_properly() { - WorkDescriptor workDescriptor = WorkDescriptor.create("EXCEPTION"); - - bonitaThreadPoolExecutor.submit(workDescriptor); - - await().until(() -> workExecutionCallback.isOnFailureCalled()); - assertThat(workExecutionCallback.isOnSuccessCalled()).isFalse(); - } - - @Test - public void should_execute_work_after_specified_date() throws Exception { - WorkDescriptor workDescriptor = WorkDescriptor.create("NORMAL"); - workDescriptor.mustBeExecutedAfter(Instant.now().plus(5, SECONDS)); - - bonitaThreadPoolExecutor.submit(workDescriptor); - - //should still not be executed - engineClock.addTime(1, SECONDS); - Thread.sleep(50); - assertThat(workExecutionCallback.isOnSuccessCalled()).isFalse(); - - //add time, work should be executed - engineClock.addTime(5, SECONDS); - await().until(() -> workExecutionCallback.isOnSuccessCalled()); - } - - @Test - public void should_update_meter_when_work_executes() { - Gauge currentWorkQueue = meterRegistry.find(BonitaThreadPoolExecutor.NUMBER_OF_WORKS_PENDING).gauge(); - for (int i = 0; i <= threadNumber + 3; i++) { - WorkDescriptor workDescriptor = WorkDescriptor.create("SLEEP"); - bonitaThreadPoolExecutor.submit(workDescriptor); - } - await().until(() -> currentWorkQueue.value() > 0); - } - - @Test - public void should_update_works_counters_when_enqueuing_workDescriptor_with_long_processing_time() - throws Exception { - - //when: - bonitaThreadPoolExecutor.submit(WorkDescriptor.create("NORMAL")); - bonitaThreadPoolExecutor.submit(WorkDescriptor.create("SLEEP")); - bonitaThreadPoolExecutor.submit(WorkDescriptor.create("SLEEP")); - bonitaThreadPoolExecutor.submit(WorkDescriptor.create("SLEEP")); - bonitaThreadPoolExecutor.submit(WorkDescriptor.create("SLEEP")); - bonitaThreadPoolExecutor.submit(WorkDescriptor.create("NORMAL")); - TimeUnit.MILLISECONDS.sleep(50); // give some time to the executor to process the work - - //then: - // 1 executed because normal work is submitted first. - assertThat(meterRegistry.find(BonitaThreadPoolExecutor.NUMBER_OF_WORKS_EXECUTED).counter().count()) - .as("Executed works number").isEqualTo(1); - // 3 running works because we have 3 threads in the pool and sleeping works wait for 2s to execute: - assertThat(meterRegistry.find(BonitaThreadPoolExecutor.NUMBER_OF_WORKS_RUNNING).gauge().value()) - .as("Running works number").isEqualTo(3); - // 2 pending works because all 3 threads are busy, so the last 2 works are in the queue: - assertThat(meterRegistry.find(BonitaThreadPoolExecutor.NUMBER_OF_WORKS_PENDING).gauge().value()) - .as("Pending works number").isEqualTo(2); - } - - @Test - public void should_have_no_meters_after_shutdown() throws Exception { - bonitaThreadPoolExecutor.submit(WorkDescriptor.create("NORMAL")); - TimeUnit.MILLISECONDS.sleep(50); // give some time to the executor to process the work - - bonitaThreadPoolExecutor.shutdownAndEmptyQueue(); - - assertThat(meterRegistry.getMeters()).hasSize(0); - } - - @Test - public void should_have_tenant_id_in_all_meters() { - assertThat(meterRegistry.find(BonitaThreadPoolExecutor.NUMBER_OF_WORKS_EXECUTED) - .tag("tenant", String.valueOf(TENANT_ID)).counter()).isNotNull(); - assertThat(meterRegistry.find(BonitaThreadPoolExecutor.NUMBER_OF_WORKS_RUNNING) - .tag("tenant", String.valueOf(TENANT_ID)).gauge()).isNotNull(); - assertThat(meterRegistry.find(BonitaThreadPoolExecutor.NUMBER_OF_WORKS_PENDING) - .tag("tenant", String.valueOf(TENANT_ID)).gauge()).isNotNull(); - } - - @Test - public void should_call_on_success_callback_only_when_async_work_executed_properly() throws InterruptedException { - WorkDescriptor workDescriptor = WorkDescriptor.create("ASYNC"); - - bonitaThreadPoolExecutor.submit(workDescriptor); - - TimeUnit.MILLISECONDS.sleep(50); // give some time to the executor to process the work - assertThat(workExecutionCallback.isOnSuccessCalled()).isFalse(); - assertThat(workExecutionCallback.isOnFailureCalled()).isFalse(); - - await().until(() -> workExecutionCallback.isOnSuccessCalled()); - assertThat(workExecutionCallback.isOnFailureCalled()).isFalse(); - } - - @Test - public void should_call_on_failure_callback_ony_when_async_work_executed_properly() throws InterruptedException { - WorkDescriptor workDescriptor = WorkDescriptor.create("ASYNC_EXCEPTION"); - - bonitaThreadPoolExecutor.submit(workDescriptor); - - TimeUnit.MILLISECONDS.sleep(50); // give some time to the executor to process the work - assertThat(workExecutionCallback.isOnSuccessCalled()).isFalse(); - assertThat(workExecutionCallback.isOnFailureCalled()).isFalse(); - - await().until(() -> workExecutionCallback.isOnFailureCalled()); - assertThat(workExecutionCallback.isOnSuccessCalled()).isFalse(); - assertThat(workExecutionCallback.getThrown()).hasMessage("my exception").isInstanceOf(SWorkException.class); - } - - // ================================================================================================================= - // UTILS - // ================================================================================================================= - - private static class MyWorkExecutionCallback implements WorkExecutionCallback { - - private final AtomicBoolean onSuccessCalled = new AtomicBoolean(false); - private final AtomicBoolean onFailureCalled = new AtomicBoolean(false); - private Throwable thrown; - - @Override - public void onSuccess(WorkDescriptor workDescriptor) { - onSuccessCalled.set(true); - } - - @Override - public void onFailure(WorkDescriptor work, BonitaWork bonitaWork, Map context, - Throwable thrown) { - this.thrown = thrown; - onFailureCalled.set(true); - } - - public boolean isOnSuccessCalled() { - return onSuccessCalled.get(); - } - - public boolean isOnFailureCalled() { - return onFailureCalled.get(); - } - - public Throwable getThrown() { - return thrown; - } - } - - private static class LocalWorkFactory implements WorkFactory { - - private final long workSleepPeriodInSeconds; - - private LocalWorkFactory(long workSleepPeriodInSeconds) { - this.workSleepPeriodInSeconds = workSleepPeriodInSeconds; - } - - @Override - public BonitaWork create(WorkDescriptor workDescriptor) { - return new BonitaWork() { - - @Override - public String getDescription() { - return workDescriptor.toString(); - } - - @Override - public CompletableFuture work(Map context) throws Exception { - switch (workDescriptor.getType()) { - case "EXCEPTION": - throw new Exception("classic exception"); - case "SLEEP": - TimeUnit.SECONDS.sleep(workSleepPeriodInSeconds); - break; - case "ASYNC": - return CompletableFuture.supplyAsync(() -> { - try { - TimeUnit.MILLISECONDS.sleep(200); - } catch (InterruptedException ignored) { - } - return null; - }, Executors.newSingleThreadExecutor()); - case "ASYNC_EXCEPTION": - return CompletableFuture.supplyAsync(() -> { - try { - TimeUnit.MILLISECONDS.sleep(200); - } catch (InterruptedException ignored) { - } - throw new CompletionException(new SWorkException("my exception")); - }, Executors.newSingleThreadExecutor()); - case "NORMAL": - default: - } - return CompletableFuture.completedFuture(null); - } - - @Override - public void handleFailure(Throwable e, Map context) { - // do nothing - } - }; - } - - } - -} diff --git a/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactoryTest.java b/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactoryTest.java index 2bf13421b27..509a33d9a7b 100644 --- a/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactoryTest.java +++ b/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactoryTest.java @@ -16,8 +16,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; -import java.util.concurrent.ThreadPoolExecutor; - import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import org.bonitasoft.engine.commons.time.DefaultEngineClock; import org.bonitasoft.engine.monitoring.DefaultExecutorServiceMetricsProvider; @@ -39,16 +37,20 @@ public class DefaultBonitaExecutorServiceFactoryTest { public void threadNameInExecutorService_should_contain_tenantId() { long tenantId = 999; DefaultBonitaExecutorServiceFactory defaultBonitaExecutorServiceFactory = new DefaultBonitaExecutorServiceFactory( - workFactory, tenantId, 1, - 20, 15, 10, new DefaultEngineClock(), mock(WorkExecutionAuditor.class), new SimpleMeterRegistry(), - new DefaultExecutorServiceMetricsProvider()); + tenantId, + new SimpleMeterRegistry(), + new DefaultEngineClock(), + workFactory, + mock(WorkExecutionAuditor.class), + new DefaultExecutorServiceMetricsProvider(), + new SingleThreadPoolExecutorFactory(tenantId, 10)); BonitaExecutorService createExecutorService = defaultBonitaExecutorServiceFactory .createExecutorService(workExecutionCallback); Runnable r = () -> { }; - String name = ((ThreadPoolExecutor) createExecutorService).getThreadFactory().newThread(r).getName(); + String name = createExecutorService.getExecutor().getThreadFactory().newThread(r).getName(); assertThat(name).as("thread name should contains the tenantId").contains(Long.toString(tenantId)); } @@ -58,9 +60,13 @@ public void createExecutorService_should_register_ExecutorServiceMetrics() { long tenantId = 97L; final SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); DefaultBonitaExecutorServiceFactory defaultBonitaExecutorServiceFactory = new DefaultBonitaExecutorServiceFactory( - workFactory, tenantId, 1, - 20, 15, 10, new DefaultEngineClock(), mock(WorkExecutionAuditor.class), meterRegistry, - new DefaultExecutorServiceMetricsProvider()); + tenantId, + meterRegistry, + new DefaultEngineClock(), + workFactory, + mock(WorkExecutionAuditor.class), + new DefaultExecutorServiceMetricsProvider(), + new SingleThreadPoolExecutorFactory(tenantId, 10)); // when: defaultBonitaExecutorServiceFactory.createExecutorService(workExecutionCallback); @@ -79,9 +85,13 @@ public void should_not_have_metrics_when_unbind_is_called() { long tenantId = 97L; final SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry(); DefaultBonitaExecutorServiceFactory defaultBonitaExecutorServiceFactory = new DefaultBonitaExecutorServiceFactory( - workFactory, tenantId, 1, - 20, 15, 10, new DefaultEngineClock(), mock(WorkExecutionAuditor.class), meterRegistry, - new DefaultExecutorServiceMetricsProvider()); + tenantId, + meterRegistry, + new DefaultEngineClock(), + workFactory, + mock(WorkExecutionAuditor.class), + new DefaultExecutorServiceMetricsProvider(), + new SingleThreadPoolExecutorFactory(tenantId, 10)); // when: defaultBonitaExecutorServiceFactory.createExecutorService(workExecutionCallback); diff --git a/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceTest.java b/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceTest.java new file mode 100644 index 00000000000..dda5e76f108 --- /dev/null +++ b/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceTest.java @@ -0,0 +1,273 @@ +/** + * Copyright (C) 2017-2019 Bonitasoft S.A. + * Bonitasoft, 32 rue Gustave Eiffel - 38000 Grenoble + * This library is free software; you can redistribute it and/or modify it under the terms + * of the GNU Lesser General Public License as published by the Free Software Foundation + * version 2.1 of the License. + * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU Lesser General Public License for more details. + * You should have received a copy of the GNU Lesser General Public License along with this + * program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth + * Floor, Boston, MA 02110-1301, USA. + **/ +package org.bonitasoft.engine.work; + +import static java.time.temporal.ChronoUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.micrometer.core.instrument.Clock; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.bonitasoft.engine.commons.time.FixedEngineClock; +import org.bonitasoft.engine.work.audit.WorkExecutionAuditor; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +/** + * @author Baptiste Mesta. + */ +public class DefaultBonitaExecutorServiceTest { + + public static final long TENANT_ID = 13L; + @Rule + public MockitoRule mockitoRule = MockitoJUnit.rule(); + @Mock + private WorkExecutionAuditor workExecutionAuditor; + private final MyWorkExecutionCallback workExecutionCallback = new MyWorkExecutionCallback(); + private DefaultBonitaExecutorService bonitaExecutorService; + private final FixedEngineClock engineClock = new FixedEngineClock(Instant.now()); + private final WorkFactory workFactory = new LocalWorkFactory(2); + private final SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry( + // So that micrometer updates its counters every 1 ms: + k -> k.equals("simple.step") ? Duration.ofMillis(1).toString() : null, + Clock.SYSTEM); + + @Before + public void before() { + var threadPoolExecutor = new SingleThreadPoolExecutorFactory.SingleThreadPoolExecutor( + new LinkedBlockingQueue<>(10), + new WorkerThreadFactory("test-worker", 1, 1)); + bonitaExecutorService = new DefaultBonitaExecutorService(threadPoolExecutor, workFactory, engineClock, + workExecutionCallback, workExecutionAuditor, + meterRegistry, TENANT_ID); + } + + @Test + public void should_call_on_success_callback_when_work_executed_properly() { + WorkDescriptor workDescriptor = WorkDescriptor.create("NORMAL"); + + bonitaExecutorService.submit(workDescriptor); + + await().until(workExecutionCallback::isOnSuccessCalled); + assertThat(workExecutionCallback.isOnFailureCalled()).isFalse(); + } + + @Test + public void should_call_on_failure_callback_when_work_executed_properly() { + WorkDescriptor workDescriptor = WorkDescriptor.create("EXCEPTION"); + + bonitaExecutorService.submit(workDescriptor); + + await().until(workExecutionCallback::isOnFailureCalled); + assertThat(workExecutionCallback.isOnSuccessCalled()).isFalse(); + } + + @Test + public void should_execute_work_after_specified_date() { + WorkDescriptor workDescriptor = WorkDescriptor.create("NORMAL"); + workDescriptor.mustBeExecutedAfter(Instant.now().plus(5, SECONDS)); + + bonitaExecutorService.submit(workDescriptor); + + //should still not be executed + engineClock.addTime(1, SECONDS); + await().atLeast(50, TimeUnit.MILLISECONDS) + .untilAsserted(() -> assertThat(workExecutionCallback.isOnSuccessCalled()).isFalse()); + + //add time, work should be executed + engineClock.addTime(5, SECONDS); + await().until(workExecutionCallback::isOnSuccessCalled); + } + + @Test + public void should_update_meter_when_work_executes() { + Gauge currentWorkQueue = meterRegistry.find(DefaultBonitaExecutorService.NUMBER_OF_WORKS_PENDING).gauge(); + for (int i = 0; i <= 4; i++) { + WorkDescriptor workDescriptor = WorkDescriptor.create("SLEEP"); + bonitaExecutorService.submit(workDescriptor); + } + await().untilAsserted(() -> assertThat(currentWorkQueue.value()).isPositive()); + } + + @Test + public void should_update_works_counters_when_enqueuing_workDescriptor_with_long_processing_time() + throws Exception { + + //when: + bonitaExecutorService.submit(WorkDescriptor.create("NORMAL")); + bonitaExecutorService.submit(WorkDescriptor.create("SLEEP")); + bonitaExecutorService.submit(WorkDescriptor.create("SLEEP")); + bonitaExecutorService.submit(WorkDescriptor.create("SLEEP")); + bonitaExecutorService.submit(WorkDescriptor.create("SLEEP")); + bonitaExecutorService.submit(WorkDescriptor.create("NORMAL")); + TimeUnit.MILLISECONDS.sleep(50); // give some time to the executor to process the work + + //then: + // 1 executed because normal work is submitted first. + assertThat(meterRegistry.find(DefaultBonitaExecutorService.NUMBER_OF_WORKS_EXECUTED).counter().count()) + .as("Executed works number").isEqualTo(1); + // 1 running works because we have 1 threads in the pool and sleeping works wait for 2s to execute: + assertThat(meterRegistry.find(DefaultBonitaExecutorService.NUMBER_OF_WORKS_RUNNING).gauge().value()) + .as("Running works number").isEqualTo(1); + // 4 pending works because the single thread is busy, so the last 4 works are in the queue: + assertThat(meterRegistry.find(DefaultBonitaExecutorService.NUMBER_OF_WORKS_PENDING).gauge().value()) + .as("Pending works number").isEqualTo(4); + } + + @Test + public void should_have_no_meters_after_shutdown() { + var work = bonitaExecutorService.submit(WorkDescriptor.create("NORMAL")); + await().until(work::isDone); + + bonitaExecutorService.shutdownAndEmptyQueue(); + + assertThat(meterRegistry.getMeters()).isEmpty(); + } + + @Test + public void should_have_tenant_id_in_all_meters() { + assertThat(meterRegistry.find(DefaultBonitaExecutorService.NUMBER_OF_WORKS_EXECUTED) + .tag("tenant", String.valueOf(TENANT_ID)).counter()).isNotNull(); + assertThat(meterRegistry.find(DefaultBonitaExecutorService.NUMBER_OF_WORKS_RUNNING) + .tag("tenant", String.valueOf(TENANT_ID)).gauge()).isNotNull(); + assertThat(meterRegistry.find(DefaultBonitaExecutorService.NUMBER_OF_WORKS_PENDING) + .tag("tenant", String.valueOf(TENANT_ID)).gauge()).isNotNull(); + } + + @Test + public void should_call_on_success_callback_only_when_async_work_executed_properly() { + WorkDescriptor workDescriptor = WorkDescriptor.create("ASYNC"); + + var work = bonitaExecutorService.submit(workDescriptor); + await().until(work::isDone); + + assertThat(workExecutionCallback.isOnSuccessCalled()).isFalse(); + assertThat(workExecutionCallback.isOnFailureCalled()).isFalse(); + + await().until(workExecutionCallback::isOnSuccessCalled); + assertThat(workExecutionCallback.isOnFailureCalled()).isFalse(); + } + + @Test + public void should_call_on_failure_callback_ony_when_async_work_executed_properly() { + WorkDescriptor workDescriptor = WorkDescriptor.create("ASYNC_EXCEPTION"); + + var work = bonitaExecutorService.submit(workDescriptor); + await().until(work::isDone); + + assertThat(workExecutionCallback.isOnSuccessCalled()).isFalse(); + assertThat(workExecutionCallback.isOnFailureCalled()).isFalse(); + + await().until(workExecutionCallback::isOnFailureCalled); + assertThat(workExecutionCallback.isOnSuccessCalled()).isFalse(); + assertThat(workExecutionCallback.getThrown()).hasMessage("my exception").isInstanceOf(SWorkException.class); + } + + // ================================================================================================================= + // UTILS + // ================================================================================================================= + + private static class MyWorkExecutionCallback implements WorkExecutionCallback { + + private final AtomicBoolean onSuccessCalled = new AtomicBoolean(false); + private final AtomicBoolean onFailureCalled = new AtomicBoolean(false); + private Throwable thrown; + + @Override + public void onSuccess(WorkDescriptor workDescriptor) { + onSuccessCalled.set(true); + } + + @Override + public void onFailure(WorkDescriptor work, BonitaWork bonitaWork, Map context, + Throwable thrown) { + this.thrown = thrown; + onFailureCalled.set(true); + } + + public boolean isOnSuccessCalled() { + return onSuccessCalled.get(); + } + + public boolean isOnFailureCalled() { + return onFailureCalled.get(); + } + + public Throwable getThrown() { + return thrown; + } + } + + private record LocalWorkFactory(long workSleepPeriodInSeconds) implements WorkFactory { + + @Override + public BonitaWork create(WorkDescriptor workDescriptor) { + return new BonitaWork() { + + @Override + public String getDescription() { + return workDescriptor.toString(); + } + + @Override + public CompletableFuture work(Map context) throws Exception { + switch (workDescriptor.getType()) { + case "EXCEPTION": + throw new Exception("classic exception"); + case "SLEEP": + TimeUnit.SECONDS.sleep(workSleepPeriodInSeconds); + break; + case "ASYNC": + return CompletableFuture.supplyAsync(() -> { + try { + TimeUnit.MILLISECONDS.sleep(200); + } catch (InterruptedException ignored) { + } + return null; + }, Executors.newSingleThreadExecutor()); + case "ASYNC_EXCEPTION": + return CompletableFuture.supplyAsync(() -> { + try { + TimeUnit.MILLISECONDS.sleep(200); + } catch (InterruptedException ignored) { + } + throw new CompletionException(new SWorkException("my exception")); + }, Executors.newSingleThreadExecutor()); + case "NORMAL": + default: + } + return CompletableFuture.completedFuture(null); + } + + @Override + public void handleFailure(Throwable e, Map context) { + // do nothing + } + }; + } + +} + +}