Skip to content

Commit

Permalink
Merge pull request #70 from avaje/feature/add-ScheduledTask
Browse files Browse the repository at this point in the history
Add ScheduledTask to help execute periodic reporting of metrics
  • Loading branch information
rbygrave authored Jun 12, 2024
2 parents 1941bb8 + 30b2f9f commit fa81736
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 0 deletions.
6 changes: 6 additions & 0 deletions metrics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

<dependencies>

<dependency>
<groupId>io.avaje</groupId>
<artifactId>avaje-applog</artifactId>
<version>1.0</version>
</dependency>

<dependency>
<groupId>io.avaje</groupId>
<artifactId>junit</artifactId>
Expand Down
99 changes: 99 additions & 0 deletions metrics/src/main/java/io/avaje/metrics/DScheduledTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.avaje.metrics;

import io.avaje.applog.AppLog;

import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.System.Logger.Level.ERROR;

final class DScheduledTask implements ScheduledTask {

private static final System.Logger log = AppLog.getLogger(DScheduledTask.class);

static final class DBuilder implements Builder {
private int initial = 60;
private int delay = 60;
private TimeUnit timeUnit = TimeUnit.SECONDS;

private Runnable task;

@Override
public DBuilder schedule(int initial, int delay, TimeUnit timeUnit) {
this.initial = initial;
this.delay = delay;
this.timeUnit = timeUnit;
return this;
}

@Override
public DBuilder task(Runnable task) {
this.task = task;
return this;
}

@Override
public DScheduledTask build() {
Objects.requireNonNull(task, "task is required");
return new DScheduledTask(task, initial, delay, timeUnit);
}
}


private final ReentrantLock activeLock = new ReentrantLock();
private final Runnable task;
private final int initial;
private final int delay;
private final TimeUnit timeUnit;

private final ScheduledExecutorService executor;
private ScheduledFuture<?> backgroundTask;

DScheduledTask(Runnable task, int initial, int delay, TimeUnit timeUnit) {
this.task = task;
this.initial = initial;
this.delay = delay;
this.timeUnit = timeUnit;
this.executor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("schTask"));
}

@Override
public void start() {
this.backgroundTask = executor.scheduleWithFixedDelay(this::runTask, initial, delay, timeUnit);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return this.backgroundTask.cancel(mayInterruptIfRunning);
}

/**
* Wait for the task to complete if it is actively running .
*/
@Override
public void waitIfRunning(long timeout, TimeUnit timeUnit) {
try {
if (activeLock.tryLock(timeout, timeUnit)) {
activeLock.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.log(ERROR, "interrupted while waiting for task to complete", e);
}
}

private void runTask() {
activeLock.lock();
try {
task.run();
} catch (Throwable e) {
log.log(ERROR, "Error stopping task", e);
} finally {
activeLock.unlock();
}
}
}
34 changes: 34 additions & 0 deletions metrics/src/main/java/io/avaje/metrics/DaemonThreadFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.avaje.metrics;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* ThreadFactory for Daemon threads.
* <p>
* Daemon threads do not stop a JVM stopping. If an application only has Daemon
* threads left it will shutdown.
* <p>
* In using Daemon threads you need to either not care about being interrupted
* on shutdown or register with the JVM shutdown hook to perform a nice shutdown
* of the daemon threads etc.
*/
final class DaemonThreadFactory implements ThreadFactory {

private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DaemonThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(null, r, namePrefix + threadNumber.getAndIncrement(), 0);
t.setDaemon(true);
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
59 changes: 59 additions & 0 deletions metrics/src/main/java/io/avaje/metrics/ScheduledTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.avaje.metrics;

import java.util.concurrent.TimeUnit;

/**
* A ScheduledTask that can run periodically, be cancelled, and
* also aware of when its running (for use with Lambda).
* <p>
* The ScheduledTask will use a Daemon thread and expect to just
* stop on JVM shutdown.
*/
public interface ScheduledTask {

/**
* Return the Builder for a ScheduledTask.
*/
static Builder builder() {
return new DScheduledTask.DBuilder();
}

/**
* Start the scheduled task.
*/
void start();

/**
* Cancel the scheduled task.
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* If the task is actively running wait for the task to complete.
*/
void waitIfRunning(long timeout, TimeUnit timeUnit);

/**
* The builder for a ScheduledTask.
*/
interface Builder {

/**
* Specify the schedule to run the task.
* @param initial The initial delay
* @param delay The delay between task execution
* @param timeUnit The timeunit of the scheduled delay
*/
Builder schedule(int initial, int delay, TimeUnit timeUnit);

/**
* Specify the task to execute periodically according to the schedule.
*/
Builder task(Runnable task);

/**
* Build the scheduled task.
*/
ScheduledTask build();
}
}
1 change: 1 addition & 0 deletions metrics/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
exports io.avaje.metrics.spi;
exports io.avaje.metrics.annotation;

requires transitive io.avaje.applog;
requires static java.management;

uses io.avaje.metrics.spi.SpiMetricBuilder;
Expand Down
44 changes: 44 additions & 0 deletions metrics/src/test/java/io/avaje/metrics/ScheduledTaskTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.avaje.metrics;

import org.junit.jupiter.api.Test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.assertj.core.api.Assertions.assertThat;

class ScheduledTaskTest {

@Test
void runIt() throws InterruptedException {

ScheduledTask task = ScheduledTask.builder()
.schedule(1, 1, TimeUnit.MILLISECONDS)
.task(ScheduledTaskTest::hello)
.build();

assertThat(counter.get()).isEqualTo(0);
System.out.println("start...");
task.start();
Thread.sleep(5);
assertThat(counter.get()).isGreaterThan(0);

task.waitIfRunning(10, TimeUnit.SECONDS);
Thread.sleep(30);

System.out.println("cancel...");
task.cancel(false);
long after0 = counter.get();
assertThat(after0).isGreaterThan(1);
Thread.sleep(10);
long after1 = counter.get();
assertThat(after1).isEqualTo(after0);
System.out.println("done");
}

private static final AtomicLong counter = new AtomicLong();

private static void hello() {
System.out.println("hi " + counter.incrementAndGet());
}
}

0 comments on commit fa81736

Please sign in to comment.