diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/RelpWrite.java b/src/main/java/com/teragrep/rlp_03/channel/context/Egress.java similarity index 97% rename from src/main/java/com/teragrep/rlp_03/channel/context/RelpWrite.java rename to src/main/java/com/teragrep/rlp_03/channel/context/Egress.java index c137401b..756455f4 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/RelpWrite.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/Egress.java @@ -53,7 +53,7 @@ /** * Egress {@link com.teragrep.rlp_03.frame.RelpFrame} are handled by this */ -public interface RelpWrite extends Consumer, Runnable { +public interface Egress extends Consumer, Runnable { /** * Sends asynchronously the writeable provided. Implementation is required to be thread-safe. diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/RelpWriteImpl.java b/src/main/java/com/teragrep/rlp_03/channel/context/EgressImpl.java similarity index 98% rename from src/main/java/com/teragrep/rlp_03/channel/context/RelpWriteImpl.java rename to src/main/java/com/teragrep/rlp_03/channel/context/EgressImpl.java index f32167b4..c6b2ff0b 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/RelpWriteImpl.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/EgressImpl.java @@ -67,9 +67,9 @@ import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; -final class RelpWriteImpl implements RelpWrite { +final class EgressImpl implements Egress { - private static final Logger LOGGER = LoggerFactory.getLogger(RelpWriteImpl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(EgressImpl.class); private final EstablishedContext establishedContext; @@ -83,7 +83,7 @@ final class RelpWriteImpl implements RelpWrite { private final List toWriteList; - RelpWriteImpl(EstablishedContext establishedContext) { + EgressImpl(EstablishedContext establishedContext) { this.establishedContext = establishedContext; this.queue = new ConcurrentLinkedQueue<>(); this.writeInProgressList = new ArrayList<>(); diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContext.java b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContext.java index 4f5a94b3..e3d32473 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContext.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContext.java @@ -50,8 +50,8 @@ /** * Established type of {@link Context}. It produces ingress data into the provided - * {@link com.teragrep.rlp_03.frame.delegate.FrameDelegate} via {@link RelpRead}. Egress data can be written via - * {@link RelpWrite#accept(Writeable)}. + * {@link com.teragrep.rlp_03.frame.delegate.FrameDelegate} via {@link Ingress}. Egress data can be written via + * {@link Egress#accept(Writeable)}. */ public interface EstablishedContext extends Context { @@ -68,5 +68,5 @@ public interface EstablishedContext extends Context { /** * @return RelpWrite of the connection for sending egress data. */ - RelpWrite relpWrite(); + Egress egress(); } diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextImpl.java b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextImpl.java index 3aae9c59..c1be78c9 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextImpl.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextImpl.java @@ -73,8 +73,8 @@ final class EstablishedContextImpl implements EstablishedContext { private final Clock clock; private final BufferLeasePool bufferLeasePool; - private final RelpRead relpRead; - private final RelpWrite relpWrite; + private final Ingress ingress; + private final Egress egress; EstablishedContextImpl( ExecutorService executorService, @@ -88,8 +88,8 @@ final class EstablishedContextImpl implements EstablishedContext { this.clock = clockFactory.create(this); this.bufferLeasePool = new BufferLeasePool(); - this.relpRead = new RelpReadImpl(this, this.bufferLeasePool, clock); - this.relpWrite = new RelpWriteImpl(this); + this.ingress = new IngressImpl(this, this.bufferLeasePool, clock); + this.egress = new EgressImpl(this); } @@ -154,7 +154,7 @@ public void handleEvent(SelectionKey selectionKey) { } LOGGER.debug("handleEvent submitting new runnable for read"); try { - executorService.submit(relpRead); + executorService.submit(ingress); } catch (RejectedExecutionException ree) { LOGGER.error("executorService.submit threw <{}> for read", ree.getMessage()); @@ -180,7 +180,7 @@ public void handleEvent(SelectionKey selectionKey) { } LOGGER.debug("handleEvent submitting new runnable for write"); try { - executorService.submit(relpWrite); + executorService.submit(egress); LOGGER.debug("submitted write!"); } catch (RejectedExecutionException ree) { @@ -207,12 +207,12 @@ public void handleEvent(SelectionKey selectionKey) { return; } // socket write may be pending a tls read - if (relpWrite.needRead().compareAndSet(true, false)) { - executorService.submit(relpWrite); + if (egress.needRead().compareAndSet(true, false)) { + executorService.submit(egress); } // read anyway - executorService.submit(relpRead); + executorService.submit(ingress); } if (selectionKey.isWritable()) { @@ -229,13 +229,13 @@ public void handleEvent(SelectionKey selectionKey) { close(); return; } - if (relpRead.needWrite().compareAndSet(true, false)) { + if (ingress.needWrite().compareAndSet(true, false)) { // socket read may be pending a tls write - executorService.submit(relpRead); + executorService.submit(ingress); } // write anyway - executorService.submit(relpWrite); + executorService.submit(egress); } } } @@ -251,7 +251,7 @@ public Socket socket() { } @Override - public RelpWrite relpWrite() { - return relpWrite; + public Egress egress() { + return egress; } } diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextStub.java b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextStub.java index 5c3915b9..e163c38d 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextStub.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextStub.java @@ -90,7 +90,7 @@ public Socket socket() { } @Override - public RelpWrite relpWrite() { + public Egress egress() { throw new IllegalArgumentException("EstablishedContextStub does not implement this"); } } diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/RelpRead.java b/src/main/java/com/teragrep/rlp_03/channel/context/Ingress.java similarity index 98% rename from src/main/java/com/teragrep/rlp_03/channel/context/RelpRead.java rename to src/main/java/com/teragrep/rlp_03/channel/context/Ingress.java index d7cdf75f..e925855d 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/RelpRead.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/Ingress.java @@ -50,7 +50,7 @@ /** * Ingress {@link com.teragrep.rlp_03.frame.RelpFrame} are handled by this asynchronously. */ -public interface RelpRead extends Runnable { +public interface Ingress extends Runnable { @Override void run(); diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/RelpReadImpl.java b/src/main/java/com/teragrep/rlp_03/channel/context/IngressImpl.java similarity index 98% rename from src/main/java/com/teragrep/rlp_03/channel/context/RelpReadImpl.java rename to src/main/java/com/teragrep/rlp_03/channel/context/IngressImpl.java index c1c13460..c58b0bca 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/RelpReadImpl.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/IngressImpl.java @@ -64,9 +64,9 @@ import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; -final class RelpReadImpl implements RelpRead { +final class IngressImpl implements Ingress { - private static final Logger LOGGER = LoggerFactory.getLogger(RelpReadImpl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(IngressImpl.class); private final EstablishedContextImpl establishedContext; private final BufferLeasePool bufferLeasePool; @@ -77,7 +77,7 @@ final class RelpReadImpl implements RelpRead { private final Clock clock; - RelpReadImpl(EstablishedContextImpl establishedContext, BufferLeasePool bufferLeasePool, Clock clock) { + IngressImpl(EstablishedContextImpl establishedContext, BufferLeasePool bufferLeasePool, Clock clock) { this.establishedContext = establishedContext; this.bufferLeasePool = bufferLeasePool; diff --git a/src/main/java/com/teragrep/rlp_03/client/ClientImpl.java b/src/main/java/com/teragrep/rlp_03/client/ClientImpl.java index 5d5f699c..e3919bb5 100644 --- a/src/main/java/com/teragrep/rlp_03/client/ClientImpl.java +++ b/src/main/java/com/teragrep/rlp_03/client/ClientImpl.java @@ -91,7 +91,7 @@ public CompletableFuture transmit(RelpFrame relpFrame) { ); CompletableFuture future = transactionService.create(relpFrameToXmit); - establishedContext.relpWrite().accept(relpFrameToXmit.toWriteable()); + establishedContext.egress().accept(relpFrameToXmit.toWriteable()); return future; } diff --git a/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventClose.java b/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventClose.java index 4b9956b9..eb1b1f66 100644 --- a/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventClose.java +++ b/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventClose.java @@ -121,7 +121,7 @@ public void accept(FrameContext frameContext) { Writeables writeables = new Writeables(framesWriteables); WriteableClosure writeableClosure = new WriteableClosure(writeables, frameContext.establishedContext()); - frameContext.establishedContext().relpWrite().accept(writeableClosure); + frameContext.establishedContext().egress().accept(writeableClosure); } finally { frameContext.relpFrame().close(); diff --git a/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventOpen.java b/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventOpen.java index 360496e2..101899a0 100644 --- a/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventOpen.java +++ b/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventOpen.java @@ -85,7 +85,7 @@ public void accept(FrameContext frameContext) { responseFrameTemplate.endOfTransfer() ); - frameContext.establishedContext().relpWrite().accept(frame.toWriteable()); + frameContext.establishedContext().egress().accept(frame.toWriteable()); } finally { frameContext.relpFrame().close(); diff --git a/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventServerClose.java b/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventServerClose.java index 869a3983..07fcc78e 100644 --- a/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventServerClose.java +++ b/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventServerClose.java @@ -76,7 +76,7 @@ public void accept(FrameContext frameContext) { serverCloseFrame.toWriteable(), frameContext.establishedContext() ); - frameContext.establishedContext().relpWrite().accept(closingServerClose); + frameContext.establishedContext().egress().accept(closingServerClose); } finally { frameContext.relpFrame().close(); diff --git a/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventSyslog.java b/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventSyslog.java index 1fec9fe2..c008bedf 100644 --- a/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventSyslog.java +++ b/src/main/java/com/teragrep/rlp_03/frame/delegate/event/RelpEventSyslog.java @@ -130,7 +130,7 @@ public void accept(FrameContext frameContext) { ); } - frameContext.establishedContext().relpWrite().accept(relpFrame.toWriteable()); + frameContext.establishedContext().egress().accept(relpFrame.toWriteable()); } finally { frameContext.relpFrame().close(); diff --git a/src/test/java/com/teragrep/rlp_03/channel/context/RelpWriteFake.java b/src/test/java/com/teragrep/rlp_03/channel/context/EgressFake.java similarity index 97% rename from src/test/java/com/teragrep/rlp_03/channel/context/RelpWriteFake.java rename to src/test/java/com/teragrep/rlp_03/channel/context/EgressFake.java index cdf5dd6d..62a49927 100644 --- a/src/test/java/com/teragrep/rlp_03/channel/context/RelpWriteFake.java +++ b/src/test/java/com/teragrep/rlp_03/channel/context/EgressFake.java @@ -51,13 +51,13 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -public class RelpWriteFake implements RelpWrite { +public class EgressFake implements Egress { private final List writtenFrames; private final AtomicBoolean needRead; - RelpWriteFake() { + EgressFake() { this.writtenFrames = new LinkedList<>(); this.needRead = new AtomicBoolean(); } diff --git a/src/test/java/com/teragrep/rlp_03/channel/context/EstablishedContextFake.java b/src/test/java/com/teragrep/rlp_03/channel/context/EstablishedContextFake.java index c804254a..dbbed2fa 100644 --- a/src/test/java/com/teragrep/rlp_03/channel/context/EstablishedContextFake.java +++ b/src/test/java/com/teragrep/rlp_03/channel/context/EstablishedContextFake.java @@ -55,12 +55,12 @@ public class EstablishedContextFake implements EstablishedContext { private final InterestOps interestOps; private final Socket socket; - private final RelpWrite relpWrite; + private final Egress egress; - EstablishedContextFake(InterestOps interestOps, Socket socket, RelpWrite relpWrite) { + EstablishedContextFake(InterestOps interestOps, Socket socket, Egress egress) { this.interestOps = interestOps; this.socket = socket; - this.relpWrite = relpWrite; + this.egress = egress; } @Override @@ -94,7 +94,7 @@ public Socket socket() { } @Override - public RelpWrite relpWrite() { - return relpWrite; + public Egress egress() { + return egress; } } diff --git a/src/test/java/com/teragrep/rlp_03/client/StuckClientCloseTest.java b/src/test/java/com/teragrep/rlp_03/client/StuckClientCloseTest.java index 62c240c0..31d3da5b 100644 --- a/src/test/java/com/teragrep/rlp_03/client/StuckClientCloseTest.java +++ b/src/test/java/com/teragrep/rlp_03/client/StuckClientCloseTest.java @@ -86,7 +86,7 @@ public void init() { @Override public boolean accept(FrameContext frameContext) { - // received but will not reply via frameContext.establishedContext().relpWrite(); + // received but will not reply via frameContext.establishedContext().egress(); return true; } diff --git a/src/test/java/com/teragrep/rlp_03/readme/ReadmeDeferredTest.java b/src/test/java/com/teragrep/rlp_03/readme/ReadmeDeferredTest.java index c07002fb..ee6ea50a 100644 --- a/src/test/java/com/teragrep/rlp_03/readme/ReadmeDeferredTest.java +++ b/src/test/java/com/teragrep/rlp_03/readme/ReadmeDeferredTest.java @@ -233,7 +233,7 @@ public void run() { RelpFrame responseFrame = relpFrameFactory.create(relpFrame.txn().toBytes(), "rsp", "200 OK"); // WARNING: failing to respond causes transaction aware clients to wait - frameContext.establishedContext().relpWrite().accept(responseFrame.toWriteable()); + frameContext.establishedContext().egress().accept(responseFrame.toWriteable()); } } catch (Exception interruptedException) {