Simulates multiple threadpools of workers that share a same database table with the pending to be executed tasks resided. This is the solution implemented in the project CANCHITO-WORKFLOW-MANAGER (CWM) for solving some "limitations" encountered during the integration and implementation of Flowable BPM, when executing long-running tasks.
Help us find bugs, add new features or simply just feel free to use it. Download thread-pool-with-task-queue-in-database from our GitHub site.
The MIT License (MIT)
Copyright (c) 2018, canchito-dev
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
If you would like to become an active contributor to this project please follow theses simple steps:
- Fork it
- Create your feature branch
- Commit your changes
- Push to the branch
- Create new Pull Request
This solution is already implemented in CANCHITO-WORKFLOW-MANAGER (CWM) which can be downloaded from github. Remember that CANCHITO-WORKFLOW-MANAGER (CWM) is a work in progress project. If you only wish to implement the asynchronous service by itself, it can also be downloaded from github.
Put in simple words, thread-pool-with-task-queue-in-database's async job executor are individual threads that are started once when the application is started. Each thread starts a thread pool that reuses a (configurable) fixed number of threads operating off database table called CWM_TASKS_QUEUE and acting as a priority blocking list, using the provided ThreadFactory to create new threads when needed. At any point, at most n threads will be active processing tasks.
Periodically, pending tasks are pulled from the database. The number of pending tasks that are pulled at once, depends on the number of available threads for a specific task type. If additional tasks are submitted when all threads are active, they will reside in the database until a thread is available.
If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.
Once a task is pulled by a thread, it is locked. By locking it, we can have several async job executors simultaneously running on different servers. Thus allowing us to have a fail-over, clustered system.
If two or more async job executor are started, they all will be raising for pulling the pending tasks. Consequently, only the first one that looks the task, will be the one that will execute it. The other async job executor will get an optimistic locking exception.
The async job executor configuration is done by modifying two XML files (found under src/main/resources/
):
task-queue-beans.xml
: specified how each async job executor, dedicated to process a specific task is configuredtask-runnable-beans.xml
: here you will find the Runnable classes that are used by each async job executor to execute the task
As you can see, each queue used by the async job executor needs to have some configuration. Let's describe those parameters found in task-queue-beans.xml
file.
runnableName
: the id of the runnable that and instance is initialized and afterward executed. This is the id which relates to the information found intask-runnable-beans.xml
.poolName
: the name of the thread poolcorePoolSize
: the number of threads to keep in the pool, even if they are idlemaximumPoolSize
: the maximum number of threads to allow in the poolkeepAliveTimeInMillis
: when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminatingacquireWaitTimeInMillis
: millis to wait before new tasks are pulled from the databasemaxTasksPerAcquisition
: maximum tasks that can be pulled from the database
For instance task-queue-beans.xml
:
<bean id="task1Queue" class="com.canchitodev.cwm.threadpool.service.TaskQueue" scope="prototype">
<property name="runnableName" value="task1Runnable" />
<property name="poolName" value="task1Queue" />
<property name="corePoolSize" value="2" />
<property name="maximumPoolSize" value="5" />
<property name="keepAliveTimeInMillis" value="300000"/>
<property name="acquireWaitTimeInMillis" value="5000"/>
<property name="maxTasksPerAcquisition" value="2"/>
</bean>
For instance task-runnable-beans.xml
:
<bean id="task1Runnable" class="com.canchitodev.cwm.tasks.runnable.Task1Runnable" scope="prototype"></bean>
A long-running task is composed of a Runnable class, which implements the TaskRunnable class, so that it can override the execute()
method. It is in this last method, where the business logic behind the long-running tasks is found.
public class Task1Runnable implements TaskRunnable {
private static final Logger logger = Logger.getLogger(Task1Runnable.class);
private GenericTaskEntity task;
public Task1Runnable() {}
public Task1Runnable(GenericTaskEntity task) {
this.task = task;
}
public GenericTaskEntity getTask() {
return task;
}
public void setTask(GenericTaskEntity task) {
this.task = task;
}
@Override
public void execute() {
try {
logger.info("Executing task " + task.toString());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
logger.info("Done executing task " + task.toString());
}
}
}
That's it. Now just add the task queue in the task-queue-beans.xml
and the Runnable class to the respective task-runnable-beans.xml
file so that TaskQueueService can start its respective AcquireTaskThread.
In this github project, we have included a jUnit class for testing the async job executor. Just remember to comment the @Ignore
annotation. If you rather see it in action together with Flowable, you can have a look at CANCHITO-WORKFLOW-MANAGER (CWM).