Skip to content

Commit

Permalink
upgrade rlp_03 to 4.0.0 (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
kortemik authored Apr 17, 2024
1 parent 78ff2f0 commit f5a07a5
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 65 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>rlp_03</artifactId>
<version>3.0.0</version>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>com.teragrep</groupId>
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/com/teragrep/cfe_35/router/MessageParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@
import com.teragrep.cfe_35.router.targets.DeadLetter;
import com.teragrep.cfe_35.router.targets.Inspection;
import com.teragrep.rlo_06.*;
import com.teragrep.rlp_03.FrameContext;
import com.teragrep.rlp_03.TransportInfo;

import com.teragrep.rlp_03.channel.socket.TransportInfo;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -124,7 +125,7 @@ public class MessageParser implements Consumer<FrameContext>, AutoCloseable {

@Override
public void accept(FrameContext frameContext) {
transportInfo = frameContext.connectionContext().socket().getTransportInfo();
transportInfo = frameContext.establishedContext().socket().getTransportInfo();
byte[] payload = frameContext.relpFrame().payload().toBytes();
try (final Timer.Context context = responseLatency.time()) {
// increment counters
Expand Down
20 changes: 11 additions & 9 deletions src/main/java/com/teragrep/cfe_35/router/Router.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@
import com.codahale.metrics.Slf4jReporter;
import com.codahale.metrics.jmx.JmxReporter;
import com.teragrep.cfe_35.config.RoutingConfig;
import com.teragrep.rlp_03.delegate.FrameDelegate;
import com.teragrep.rlp_03.Server;
import com.teragrep.rlp_03.ServerFactory;
import com.teragrep.rlp_03.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.config.Config;
import com.teragrep.rlp_03.channel.socket.PlainFactory;
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
import com.teragrep.rlp_03.server.Server;
import com.teragrep.rlp_03.server.ServerFactory;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.MetricsServlet;
Expand All @@ -64,6 +64,8 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand Down Expand Up @@ -104,10 +106,10 @@ public Router(RoutingConfig routingConfig) throws IOException {
return new DefaultFrameDelegate(messageParser);
};

Config config = new Config(routingConfig.getListenPort(), routingConfig.getServerThreads());

ServerFactory serverFactory = new ServerFactory(config, routingInstanceSupplier);
this.server = serverFactory.create();
ExecutorService executorService = Executors.newFixedThreadPool(routingConfig.getServerThreads());
PlainFactory plainFactory = new PlainFactory();
ServerFactory serverFactory = new ServerFactory(executorService, plainFactory, routingInstanceSupplier);
this.server = serverFactory.create(routingConfig.getListenPort());
Thread serverThread = new Thread(server);
serverThread.start();

Expand Down
30 changes: 20 additions & 10 deletions src/test/java/com/teragrep/cfe_35/router/EmptyTagTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,22 @@

import com.codahale.metrics.MetricRegistry;
import com.teragrep.cfe_35.config.RoutingConfig;
import com.teragrep.rlp_03.*;
import com.teragrep.rlp_03.config.Config;
import com.teragrep.rlp_03.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.delegate.FrameDelegate;

import com.teragrep.rlp_03.channel.socket.PlainFactory;
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
import com.teragrep.rlp_03.server.Server;
import com.teragrep.rlp_03.server.ServerFactory;

import org.junit.jupiter.api.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -94,9 +100,13 @@ public void cleanTargets() {
private void setup(int port, List<byte[]> recordList) throws IOException {
Consumer<FrameContext> cbFunction = relpFrameServerRX -> recordList
.add(relpFrameServerRX.relpFrame().payload().toBytes());
Config config = new Config(port, 1);
ServerFactory serverFactory = new ServerFactory(config, () -> new DefaultFrameDelegate(cbFunction));
Server server = serverFactory.create();
ExecutorService executorService = Executors.newSingleThreadExecutor();
ServerFactory serverFactory = new ServerFactory(
executorService,
new PlainFactory(),
() -> new DefaultFrameDelegate(cbFunction)
);
Server server = serverFactory.create(port);
Thread serverThread = new Thread(server);
serverThread.start();
}
Expand Down Expand Up @@ -129,9 +139,9 @@ private void setupTestServer() throws IOException {
return new DefaultFrameDelegate(messageParser);
};

Config config = new Config(port, 1);
ServerFactory serverFactory = new ServerFactory(config, routingInstanceSupplier);
Server server = serverFactory.create();
ExecutorService executorService = Executors.newSingleThreadExecutor();
ServerFactory serverFactory = new ServerFactory(executorService, new PlainFactory(), routingInstanceSupplier);
Server server = serverFactory.create(port);
Thread serverThread = new Thread(server);
serverThread.start();
}
Expand Down
29 changes: 19 additions & 10 deletions src/test/java/com/teragrep/cfe_35/router/MessageParserTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,21 @@

import com.codahale.metrics.MetricRegistry;
import com.teragrep.cfe_35.config.RoutingConfig;
import com.teragrep.rlp_03.*;
import com.teragrep.rlp_03.config.Config;
import com.teragrep.rlp_03.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.delegate.FrameDelegate;

import com.teragrep.rlp_03.channel.socket.PlainFactory;
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.server.Server;
import com.teragrep.rlp_03.server.ServerFactory;
import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
import org.junit.jupiter.api.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -94,9 +99,13 @@ public void cleanTargets() {
private void setup(int port, List<byte[]> recordList) throws IOException {
Consumer<FrameContext> cbFunction = relpFrameServerRX -> recordList
.add(relpFrameServerRX.relpFrame().payload().toBytes());
Config config = new Config(port, 1);
ServerFactory serverFactory = new ServerFactory(config, () -> new DefaultFrameDelegate(cbFunction));
Server server = serverFactory.create();
ExecutorService executorService = Executors.newSingleThreadExecutor();
ServerFactory serverFactory = new ServerFactory(
executorService,
new PlainFactory(),
() -> new DefaultFrameDelegate(cbFunction)
);
Server server = serverFactory.create(port);
Thread serverThread = new Thread(server);
serverThread.start();
}
Expand Down Expand Up @@ -128,9 +137,9 @@ private void setupTestServer() throws IOException {
);
return new DefaultFrameDelegate(messageParser);
};
Config config = new Config(port, 1);
ServerFactory serverFactory = new ServerFactory(config, routingInstanceSupplier);
Server server = serverFactory.create();
ExecutorService executorService = Executors.newSingleThreadExecutor();
ServerFactory serverFactory = new ServerFactory(executorService, new PlainFactory(), routingInstanceSupplier);
Server server = serverFactory.create(port);
Thread serverThread = new Thread(server);
serverThread.start();
}
Expand Down
23 changes: 15 additions & 8 deletions src/test/java/com/teragrep/cfe_35/router/NoSuchTargetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@

import com.codahale.metrics.MetricRegistry;
import com.teragrep.cfe_35.config.RoutingConfig;
import com.teragrep.rlp_03.FrameContext;
import com.teragrep.rlp_03.ServerFactory;
import com.teragrep.rlp_03.config.Config;
import com.teragrep.rlp_03.Server;

import com.teragrep.rlp_03.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.channel.socket.PlainFactory;
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.server.Server;
import com.teragrep.rlp_03.server.ServerFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand All @@ -63,6 +63,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Expand All @@ -87,9 +89,14 @@ public void setupTargets() throws IOException {
private void setup(int port, List<byte[]> recordList) throws IOException {
Consumer<FrameContext> cbFunction = relpFrameServerRX -> recordList
.add(relpFrameServerRX.relpFrame().payload().toBytes());
Config config = new Config(port, 1);
ServerFactory serverFactory = new ServerFactory(config, () -> new DefaultFrameDelegate(cbFunction));
Server server = serverFactory.create();

ExecutorService executorService = Executors.newSingleThreadExecutor();
ServerFactory serverFactory = new ServerFactory(
executorService,
new PlainFactory(),
() -> new DefaultFrameDelegate(cbFunction)
);
Server server = serverFactory.create(port);
Thread serverThread = new Thread(server);
serverThread.start();
}
Expand Down
23 changes: 15 additions & 8 deletions src/test/java/com/teragrep/cfe_35/router/OutputFailureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,22 @@
import com.teragrep.cfe_35.config.RoutingConfig;
import com.teragrep.rlp_01.RelpBatch;
import com.teragrep.rlp_01.RelpConnection;
import com.teragrep.rlp_03.FrameContext;
import com.teragrep.rlp_03.ServerFactory;
import com.teragrep.rlp_03.Server;
import com.teragrep.rlp_03.delegate.DefaultFrameDelegate;

import com.teragrep.rlp_03.channel.socket.PlainFactory;
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.server.Server;
import com.teragrep.rlp_03.server.ServerFactory;
import org.junit.jupiter.api.*;
import org.opentest4j.AssertionFailedError;
import com.teragrep.rlp_03.config.Config;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

Expand Down Expand Up @@ -109,9 +112,13 @@ private Server setup(int port, List<byte[]> recordList) throws IOException {
Consumer<FrameContext> cbFunction = relpFrameServerRX -> recordList
.add(relpFrameServerRX.relpFrame().payload().toBytes());

Config config = new Config(port, 1);
ServerFactory serverFactory = new ServerFactory(config, () -> new DefaultFrameDelegate(cbFunction));
Server server = serverFactory.create();
ExecutorService executorService = Executors.newSingleThreadExecutor();
ServerFactory serverFactory = new ServerFactory(
executorService,
new PlainFactory(),
() -> new DefaultFrameDelegate(cbFunction)
);
Server server = serverFactory.create(port);
Thread serverThread = new Thread(server);
serverThread.start();
return server;
Expand Down
22 changes: 14 additions & 8 deletions src/test/java/com/teragrep/cfe_35/router/PerformanceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,20 @@
import com.teragrep.cfe_35.config.RoutingConfig;
import com.teragrep.rlp_01.RelpBatch;
import com.teragrep.rlp_01.RelpConnection;
import com.teragrep.rlp_03.FrameContext;
import com.teragrep.rlp_03.ServerFactory;
import com.teragrep.rlp_03.config.Config;
import com.teragrep.rlp_03.Server;
import com.teragrep.rlp_03.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.channel.socket.PlainFactory;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.server.ServerFactory;
import com.teragrep.rlp_03.server.Server;
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand Down Expand Up @@ -113,9 +115,13 @@ private void setup(int port, AtomicInteger count) throws IOException {
Consumer<FrameContext> cbFunction = (message) -> {
count.getAndIncrement();
};
Config config = new Config(port, 1);
ServerFactory serverFactory = new ServerFactory(config, () -> new DefaultFrameDelegate(cbFunction));
Server server = serverFactory.create();
ExecutorService executorService = Executors.newSingleThreadExecutor();
ServerFactory serverFactory = new ServerFactory(
executorService,
new PlainFactory(),
() -> new DefaultFrameDelegate(cbFunction)
);
Server server = serverFactory.create(port);
Thread serverThread = new Thread(server);
serverThread.start();
}
Expand Down
24 changes: 16 additions & 8 deletions src/test/java/com/teragrep/cfe_35/router/TargetRoutingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@

import com.codahale.metrics.MetricRegistry;
import com.teragrep.cfe_35.config.RoutingConfig;
import com.teragrep.rlp_03.FrameContext;
import com.teragrep.rlp_03.ServerFactory;
import com.teragrep.rlp_03.config.Config;
import com.teragrep.rlp_03.Server;
import com.teragrep.rlp_03.delegate.DefaultFrameDelegate;

import com.teragrep.rlp_03.channel.socket.PlainFactory;
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.server.Server;
import com.teragrep.rlp_03.server.ServerFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand All @@ -62,6 +63,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Expand All @@ -87,9 +90,14 @@ public void setupTargets() throws IOException {
private void setup(int port, List<byte[]> recordList) throws IOException {
Consumer<FrameContext> cbFunction = relpFrameServerRX -> recordList
.add(relpFrameServerRX.relpFrame().payload().toBytes());
Config config = new Config(port, 1);
ServerFactory serverFactory = new ServerFactory(config, () -> new DefaultFrameDelegate(cbFunction));
Server server = serverFactory.create();

ExecutorService executorService = Executors.newSingleThreadExecutor();
ServerFactory serverFactory = new ServerFactory(
executorService,
new PlainFactory(),
() -> new DefaultFrameDelegate(cbFunction)
);
Server server = serverFactory.create(port);
Thread serverThread = new Thread(server);
serverThread.start();
}
Expand Down

0 comments on commit f5a07a5

Please sign in to comment.