Skip to content

Commit

Permalink
Merge pull request #44 from TAK-Product-Center/upstream/5.1-RELEASE-8
Browse files Browse the repository at this point in the history
TAK Server 5.1-RELEASE-8
  • Loading branch information
takdeveloper authored May 8, 2024
2 parents 1f345ec + 28c311b commit a4e4453
Show file tree
Hide file tree
Showing 188 changed files with 11,040 additions and 6,629 deletions.
Binary file modified src/docs/TAK_Server_Configuration_Guide.odt
Binary file not shown.
Binary file modified src/docs/TAK_Server_Configuration_Guide.pdf
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private void sendHeadersInternal(Metadata headers) {
// Don't check if sendMessage has been called, since it requires that sendHeaders was already
// called.
sendHeadersCalled = true;
stream.writeHeaders(headers);
stream.writeHeaders(headers, !getMethodDescriptor().getType().serverSendsOneMessage());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,15 @@ public void throwDeadlineExceptionToClient() {
}
}

public void throwCanceledExceptionToClient() {
try {
if (clientStream != null)
clientStream.onError(new StatusRuntimeException(Status.CANCELLED));
} catch (Exception e) {
logger.warn("exception sending StatusRuntimeException - CANCELLED to client", e);
}
}

public void throwPermissionDeniedToClient() {
try {
if (clientStream != null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package tak.server.federation.hub;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public class FederationHubIgniteConfig {

private int ignitePoolSize = -1;
private int ignitePoolSizeMultiplier = 2;

public int getIgnitePoolSize() {
return ignitePoolSize;
}
public void setIgnitePoolSize(int ignitePoolSize) {
this.ignitePoolSize = ignitePoolSize;
}
public int getIgnitePoolSizeMultiplier() {
return ignitePoolSizeMultiplier;
}
public void setIgnitePoolSizeMultiplier(int ignitePoolSizeMultiplier) {
this.ignitePoolSizeMultiplier = ignitePoolSizeMultiplier;
}

@Override
public String toString() {
return "FederationHubIgniteConfig [ignitePoolSize=" + ignitePoolSize + ", ignitePoolSizeMultiplier="
+ ignitePoolSizeMultiplier + "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package tak.server.federation.hub;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;

public class FederationHubResources {
private static final int POOL_SIZE_INITIAL = 1;

private static final int NUM_AVAIL_CORES = Runtime.getRuntime().availableProcessors();

private static final int DEFAULT_POOL_MAX = NUM_AVAIL_CORES;

private static final int POOL_SIZE_MAX = DEFAULT_POOL_MAX < 8 ? 8 : DEFAULT_POOL_MAX;

public static final int EXEC_QUEUE_SIZE = 1024 * NUM_AVAIL_CORES;

public static final boolean IS_LOW_CORE = NUM_AVAIL_CORES < 4;

public static final ExecutorService lowCoreExecutorService;
public static final ScheduledExecutorService lowCoreScheduledExecutorService;
public static final ExecutorService lowCoreGrpcExecutorService;
public static final EventLoopGroup lowCoreGrpcEventLoopGroup;

// create a minimal set of executors if low core mode is enabled
static {
if (IS_LOW_CORE) {
lowCoreExecutorService = newExecutorService("federation-hub", POOL_SIZE_INITIAL, DEFAULT_POOL_MAX);
lowCoreScheduledExecutorService = newScheduledExecutor("federation-hub-scheduled", DEFAULT_POOL_MAX);
lowCoreGrpcExecutorService = newGrpcThreadPoolExecutor("federation-hub-grpc", POOL_SIZE_INITIAL, DEFAULT_POOL_MAX);
lowCoreGrpcEventLoopGroup = newGrpcEventLoopGroup("federation-hub-grpc-eventgroup", DEFAULT_POOL_MAX);
} else {
lowCoreExecutorService = null;
lowCoreScheduledExecutorService = null;
lowCoreGrpcExecutorService = null;
lowCoreGrpcEventLoopGroup = null;
}
}

public static final ExecutorService rolExecutor = !IS_LOW_CORE ? newGrpcThreadPoolExecutor("rol-federation-hub-executor", POOL_SIZE_INITIAL, NUM_AVAIL_CORES) : lowCoreGrpcExecutorService;

public static final ScheduledExecutorService mfdtScheduler = !IS_LOW_CORE ? newScheduledExecutor("mfdt-federation-hub-scheduler", POOL_SIZE_MAX) : lowCoreScheduledExecutorService;

public static final ScheduledExecutorService healthCheckScheduler = !IS_LOW_CORE ? newScheduledExecutor("health-check-federation-hub-scheduler", 1) : lowCoreScheduledExecutorService;

public static final ScheduledExecutorService retryScheduler = !IS_LOW_CORE ? newScheduledExecutor("outgoing-connection-federation-hub-scheduler", 1) : lowCoreScheduledExecutorService;

public static final ScheduledExecutorService dbRetentionScheduler = !IS_LOW_CORE ? newScheduledExecutor("db-retention-federation-hub-scheduler", 1) : lowCoreScheduledExecutorService;

public static final ScheduledExecutorService metricsScheduler = !IS_LOW_CORE ? newScheduledExecutor("metrics-federation-hub-scheduler", 1) : lowCoreScheduledExecutorService;

// Bounded Executor pool for federation grpc server and channel builders
public static final ExecutorService federationGrpcExecutor = !IS_LOW_CORE ? newGrpcThreadPoolExecutor("grpc-federation-hub-executor", POOL_SIZE_INITIAL, NUM_AVAIL_CORES) : lowCoreGrpcExecutorService;

// Bounded worker pool for federation grpc server and channel builders
public static final EventLoopGroup federationGrpcWorkerEventLoopGroup = !IS_LOW_CORE ? newGrpcEventLoopGroup("grpc-federation-hub-worker", NUM_AVAIL_CORES) : lowCoreGrpcEventLoopGroup;


public static ExecutorService newExecutorService(String name, int initialPoolSize, int maxPoolSize) {

return newExecutorService(name, initialPoolSize, maxPoolSize, EXEC_QUEUE_SIZE);
}

private static ExecutorService newExecutorService(String name, int initialPoolSize, int maxPoolSize, int queueSize) {

ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat(name + "-%1$d")
.setUncaughtExceptionHandler(new FederationHubExceptionHandler())
.build();

BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(queueSize);
return new FederationHubThreadPoolExecutor(initialPoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, workQueue, threadFactory);
}

private static ScheduledExecutorService newScheduledExecutor(String name, int size) {

ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat(name + "-%1$d")
.setUncaughtExceptionHandler(new FederationHubExceptionHandler())
.build();

return new ScheduledThreadPoolExecutor(size, threadFactory);
}

public static ThreadPoolTaskExecutor websocketExecutor() {

ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat("federation-hub-socket-%1$d")
.setUncaughtExceptionHandler(new FederationHubExceptionHandler())
.build();

ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(POOL_SIZE_INITIAL);
taskExecutor.setMaxPoolSize(POOL_SIZE_MAX);
taskExecutor.setQueueCapacity(EXEC_QUEUE_SIZE);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setKeepAliveSeconds(120);
taskExecutor.setThreadFactory(threadFactory);

return taskExecutor;
}

private static EventLoopGroup newGrpcEventLoopGroup(String name, int maxPoolSize) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat(name + "-%1$d")
.setUncaughtExceptionHandler(new FederationHubExceptionHandler())
.setDaemon(true)
.build();

if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(maxPoolSize, threadFactory);
} else {
return new NioEventLoopGroup(maxPoolSize, threadFactory);
}
}

private static ExecutorService newGrpcThreadPoolExecutor(String name, int initialPoolSize, int maxPoolSize) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(EXEC_QUEUE_SIZE);

ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat("websocket-%1$d")
.setUncaughtExceptionHandler(new FederationHubExceptionHandler())
.build();


return new ThreadPoolExecutor(initialPoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
}

private static class FederationHubThreadPoolExecutor extends ThreadPoolExecutor {

FederationHubThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

Logger logger = LoggerFactory.getLogger(FederationHubThreadPoolExecutor.class);

public FederationHubThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// If submit() method is called instead of execute()
if (t == null && r instanceof Future<?>) {
try {
((Future<?>) r).get();
} catch (CancellationException e) {
t = e;
} catch (ExecutionException e) {
t = e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
// Exception occurred

logger.error("Uncaught exception ", t);
}
// can perform cleanup actions here
}
}

private static class FederationHubExceptionHandler implements Thread.UncaughtExceptionHandler {

Logger logger = LoggerFactory.getLogger(FederationHubExceptionHandler.class);

@Override
public void uncaughtException(Thread thread, Throwable t) {
logger.error("Uncaught exception", t);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,61 @@
package tak.server.federation.hub;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.NoOpFailureHandler;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Strings;

public class FederationHubUtils {
private static final Logger logger = LoggerFactory.getLogger(FederationHubUtils.class);

public static IgniteConfiguration getIgniteConfiguration(String profile, boolean isClient) {
FederationHubIgniteConfig igniteConfig = null;

String igniteFile = System.getProperty("fedhub.ignite.config");
if (Strings.isNullOrEmpty(igniteFile)) {
igniteFile = "/opt/tak/federation-hub/configs/ignite.yml";
logger.info("Ignite config file not supplied. Assigning default to: " + igniteFile);
} else {
logger.info("Ignite Config file supplied: " + igniteFile);
}

try {
igniteConfig = new FederationHubUtils().loadIgniteConfig(igniteFile);
logger.info("Loaded ignite config from file");
} catch (Exception e) {
logger.info("Ignite config not found, generating default one");
// failed to load file, use defaults
igniteConfig = new FederationHubIgniteConfig();
}

IgniteConfiguration conf = new IgniteConfiguration();

String defaultWorkDir = "/opt/tak/federation-hub";
try {
defaultWorkDir = U.defaultWorkDirectory();
} catch (IgniteCheckedException e) {
logger.error(" error getting Ignite work dir, default to /opt/tak/federation-hub ", e);
}

conf.setWorkDirectory(defaultWorkDir + "/" + profile + "-tmp-work");

String address = FederationHubConstants.FEDERATION_HUB_IGNITE_HOST + ":" +
FederationHubConstants.NON_MULTICAST_DISCOVERY_PORT + ".." +
Expand Down Expand Up @@ -44,7 +87,41 @@ public static IgniteConfiguration getIgniteConfiguration(String profile, boolean
profile));

conf.setFailureHandler(new NoOpFailureHandler());

int poolSize;
// dynamic
if (igniteConfig.getIgnitePoolSize() < 0) {
poolSize = Math.min(Runtime.getRuntime().availableProcessors() * igniteConfig.getIgnitePoolSizeMultiplier(), 1024);
} else {
poolSize = igniteConfig.getIgnitePoolSize();
}

if (isClient) {
ClientConnectorConfiguration ccc = conf.getClientConnectorConfiguration();
ccc.setThreadPoolSize(poolSize);
}

conf.setSystemThreadPoolSize(poolSize + 1);
conf.setPublicThreadPoolSize(poolSize);
conf.setQueryThreadPoolSize(poolSize);
conf.setServiceThreadPoolSize(poolSize);
conf.setStripedPoolSize(poolSize);
conf.setDataStreamerThreadPoolSize(poolSize);
conf.setRebalanceThreadPoolSize(poolSize);

return conf;
}

private FederationHubIgniteConfig loadIgniteConfig(String configFile)
throws JsonParseException, JsonMappingException, FileNotFoundException, IOException {
if (getClass().getResource(configFile) != null) {
// It's a resource.
return new ObjectMapper(new YAMLFactory()).readValue(getClass().getResourceAsStream(configFile),
FederationHubIgniteConfig.class);
}

// It's a file.
return new ObjectMapper(new YAMLFactory()).readValue(new FileInputStream(configFile),
FederationHubIgniteConfig.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ public interface FederationHubBroker {
FederationHubBrokerMetrics getFederationHubBrokerMetrics();
List<String> getGroupsForNode(String federateId);
void deleteGroupCa(String groupId);
void disconnectFederate(String connectionId);
Map<String, X509Certificate> getCAsFromFile();
byte[] getSelfCaFile();
}
Loading

0 comments on commit a4e4453

Please sign in to comment.