Skip to content

Commit

Permalink
Add health and status checks
Browse files Browse the repository at this point in the history
  • Loading branch information
agentgt committed Aug 9, 2024
1 parent 4333dc0 commit e58c096
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 58 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/io/jstach/rainbowgum/LogAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ sealed interface LogAction {

enum StandardAction implements LogAction {

REOPEN, FLUSH;
REOPEN, FLUSH, STATUS;

}

Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/io/jstach/rainbowgum/LogAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ default List<LogResponse> _request(LogAction action) {
case LogAction.StandardAction a -> switch (a) {
case LogAction.StandardAction.REOPEN -> List.of(reopen());
case LogAction.StandardAction.FLUSH -> List.of(flush());
case LogAction.StandardAction.STATUS -> List.of(status());
};
default -> throw new IllegalArgumentException(); // TODO fucking eclipse
};
Expand All @@ -447,6 +448,17 @@ default LogResponse flush() {
return new Response(name(), LogResponse.Status.StandardStatus.OK);
}

default LogResponse status() {
Status status;
try {
status = output().status();
}
catch (Exception e) {
status = LogResponse.Status.ofError(e);
}
return new Response(name(), status);
}

static List<DirectLogAppender> findAppenders(ServiceRegistry registry) {
List<DirectLogAppender> appenders = new ArrayList<>();
for (var a : registry.find(LogAppender.class)) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/jstach/rainbowgum/LogConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ final class DefaultLogConfig implements LogConfig {
this.changePublisher = changeable ? new DefaultChangePublisher() : IgnoreChangePublisher.INSTANT;
this.outputRegistry = DefaultOutputRegistry.of(registry);
this.encoderRegistry = DefaultEncoderRegistry.of();
this.publisherRegistry = DefaultPublisherRegistry.of();
this.publisherRegistry = DefaultPublisherRegistry.of(registry);
}

class DefaultChangePublisher extends AbstractChangePublisher {
Expand Down
30 changes: 15 additions & 15 deletions core/src/main/java/io/jstach/rainbowgum/LogOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
Expand Down Expand Up @@ -59,24 +58,12 @@ public interface LogOutput extends LogLifecycle, Flushable {
/**
* Standard OUT URI.
*/
static final URI STDOUT_URI = uri(STDOUT_SCHEME + ":///");
static final URI STDOUT_URI = URI.create(STDOUT_SCHEME + ":///");

/**
* Standard ERR URI
*/
static final URI STDERR_URI = uri(STDERR_SCHEME + ":///");

/*
* This is because URI create is nullable.
*/
private static URI uri(String s) {
try {
return new URI(s);
}
catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
static final URI STDERR_URI = URI.create(STDERR_SCHEME + ":///");

/**
* Provides a lazy loaded output from a URI.
Expand Down Expand Up @@ -183,6 +170,19 @@ public String contentType() {
*/
public URI uri() throws UnsupportedOperationException;

/**
* Requests the health of this output. If no exception is thrown the returned value is
* used. If an exception is thrown the status is considered to be error. This call
* follows the write rules where there will never be overlapping calls. The default
* implementation will return {@link LogResponse.Status.StandardStatus#OK}. which will
* check previous meta log error entries.
* @return status of this output.
* @throws Exception if status check fails which will be an error status.
*/
default LogResponse.Status status() throws Exception {
return LogResponse.Status.StandardStatus.OK;
}

@Override
default void start(LogConfig config) {
}
Expand Down
46 changes: 25 additions & 21 deletions core/src/main/java/io/jstach/rainbowgum/LogOutputRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public sealed interface LogOutputRegistry extends OutputProvider permits Default
*/
public List<LogResponse> flush();

/**
* Will retrieve the status of all outputs usually for health checking.
* @return list of status of outputs.
*/
public List<LogResponse> status();

}

final class DefaultOutputRegistry implements LogOutputRegistry {
Expand All @@ -85,15 +91,26 @@ public DefaultOutputRegistry(ServiceRegistry serviceRegistry) {

@Override
public List<LogResponse> reopen() {
return requestIO(LogAction.StandardAction.REOPEN);
}

@Override
public List<LogResponse> flush() {
return requestIO(LogAction.StandardAction.FLUSH);
}

@Override
public List<LogResponse> status() {
/*
* TODO check rainbowgum is actually running.
* TODO should we queue status requests with a lock? Probably not.
*/
if (reopenLock.tryLock()) {
return _request(LogAction.StandardAction.STATUS);
}

private List<LogResponse> requestIO(LogAction action) {
if (reopenLock.tryLock()) {
try {
return Actor.act(
serviceRegistry.find(LogAppender.class).stream().map(a -> InternalLogAppender.of(a)).toList(),
LogAction.StandardAction.REOPEN);
return _request(action);
}
finally {
reopenLock.unlock();
Expand All @@ -104,25 +121,12 @@ public List<LogResponse> reopen() {
}
}

@Override
public List<LogResponse> flush() {
private List<LogResponse> _request(LogAction action) {
/*
* TODO check rainbowgum is actually running.
*/
if (reopenLock.tryLock()) {

try {
return Actor.act(
serviceRegistry.find(LogAppender.class).stream().map(a -> InternalLogAppender.of(a)).toList(),
LogAction.StandardAction.FLUSH);
}
finally {
reopenLock.unlock();
}
}
else {
return List.of();
}
return Actor.act(serviceRegistry.find(LogAppender.class).stream().map(a -> InternalLogAppender.of(a)).toList(),
action);
}

@Override
Expand Down
24 changes: 14 additions & 10 deletions core/src/main/java/io/jstach/rainbowgum/LogPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

/**
* Publishers push logs to appenders either synchronously or asynchronously.
* Implementations are required to be threadsafe and <strong>overlapping calls are
* expected!</strong> The publisher may report health issues such as metrics of dropped
* events with {@link #status()}.
*/
public sealed interface LogPublisher extends LogEventLogger, LogLifecycle {

Expand All @@ -18,16 +21,17 @@ public sealed interface LogPublisher extends LogEventLogger, LogLifecycle {
*/
public boolean synchronous();

// /**
// * The publisher simply forwards this call to the appenders.
// * TODO maybe we only need the appender call.
// * An appender can act on actions. One of the key
// * actions is reopening files.
// * @param action action to run.
// * @return responses.
// */
// @Override
// public List<LogAction.Response> act(LogAction action);
/**
* Requests the health of this publisher. If no exception is thrown the returned value
* is used. If an exception is thrown the status is considered to be error. The
* default implementation will return {@link LogResponse.Status.StandardStatus#OK}.
* which will check previous meta log error entries.
* @return status of this output.
* @throws Exception if status check fails which will be an error status.
*/
default LogResponse.Status status() throws Exception {
return LogResponse.Status.StandardStatus.OK;
}

/**
* A factory for a publisher from config and appenders.
Expand Down
38 changes: 35 additions & 3 deletions core/src/main/java/io/jstach/rainbowgum/LogPublisherRegistry.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package io.jstach.rainbowgum;

import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import io.jstach.rainbowgum.LogProperty.Property;
import io.jstach.rainbowgum.LogPublisher.PublisherFactory;
import io.jstach.rainbowgum.LogPublisher.PublisherProvider;
import io.jstach.rainbowgum.LogResponse.Status;
import io.jstach.rainbowgum.publisher.BlockingQueueAsyncLogPublisher;
import io.jstach.rainbowgum.spi.RainbowGumServiceProvider;

Expand All @@ -35,6 +38,12 @@ public sealed interface LogPublisherRegistry extends LogPublisher.PublisherProvi
*/
public void register(String scheme, LogPublisher.PublisherProvider publisherProvider);

/**
* Will retrieve the status of all publishers usually for health checking.
* @return list of status of publishers.
*/
public List<LogResponse> status();

/**
* This is URI scheme for the async publisher used by the publisher builder. Call
* {@link #register(String, io.jstach.rainbowgum.LogPublisher.PublisherProvider)} with
Expand Down Expand Up @@ -83,7 +92,14 @@ public sealed interface LogPublisherRegistry extends LogPublisher.PublisherProvi

final class DefaultPublisherRegistry implements LogPublisherRegistry {

ConcurrentHashMap<String, PublisherProvider> providers = new ConcurrentHashMap<String, LogPublisher.PublisherProvider>();
private final ServiceRegistry serviceRegistry;

private final ConcurrentHashMap<String, PublisherProvider> providers = new ConcurrentHashMap<String, LogPublisher.PublisherProvider>();

public DefaultPublisherRegistry(ServiceRegistry serviceRegistry) {
super();
this.serviceRegistry = serviceRegistry;
}

@Override
public PublisherFactory provide(LogProviderRef ref) {
Expand All @@ -106,15 +122,31 @@ public void register(String scheme, LogPublisher.PublisherProvider publisherProv
* Creates a publisher registry instance.
* @return publisher registry.
*/
static LogPublisherRegistry of() {
var r = new DefaultPublisherRegistry();
static LogPublisherRegistry of(ServiceRegistry serviceRegistry) {
var r = new DefaultPublisherRegistry(serviceRegistry);
for (var p : DefaultPublisherProviders.values()) {
r.register(p.scheme(), p);
r.register(BUILTIN_SCHEME_PREFIX + p.scheme(), p);
}
return r;
}

@Override
public List<LogResponse> status() {
List<LogResponse> responses = new ArrayList<>();
serviceRegistry.forEach(LogPublisher.class, (name, pub) -> {
Status status;
try {
status = pub.status();
}
catch (Exception e) {
status = LogResponse.Status.ofError(e);
}
responses.add(new Response(name, status));
});
return responses;
}

}

enum DefaultPublisherProviders implements LogPublisher.PublisherProvider {
Expand Down
Loading

0 comments on commit e58c096

Please sign in to comment.