Skip to content

Commit

Permalink
ingress interested (#209)
Browse files Browse the repository at this point in the history
* ingress interested

* fix typo

* move register(clock) out from EstablishedContextImpl constructor

* allow Clock to re-register into a connection

* add note about Clock's responsibility to read all data

* add note about Clock's responsibility to read all data

* fix thread safety of IngressImpl register(clock) and unregister(clock)

* clarify exception messages, remove obsolete TODO

* apply spotless
  • Loading branch information
kortemik authored Jun 25, 2024
1 parent ab9e063 commit 99ca9e6
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 84 deletions.
6 changes: 5 additions & 1 deletion src/main/java/com/teragrep/net_01/channel/context/Clock.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,9 @@

public interface Clock extends AutoCloseable {

boolean advance(BufferLease bufferLease);
/**
* @param bufferLease to be consumed by the Clock. IMPORTANT: current tls implementation will not work properly if
* {@link BufferLease} is not fully consumed.
*/
void advance(BufferLease bufferLease);
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ public void handleEvent(SelectionKey selectionKey) {
EstablishedContext establishedContext = new EstablishedContextImpl(
executorService,
socketFactory.create(socketChannel),
interestOps,
clockFactory
interestOps
);

selectionKey.attach(establishedContext);

interestOps.add(SelectionKey.OP_READ);
establishedContext.ingress().register(clockFactory.create(establishedContext));

LOGGER.debug("establishedContext <{}>", establishedContext);
establishedContextConsumer.accept(establishedContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ public interface EstablishedContext extends Context {
Socket socket();

/**
* @return RelpWrite of the connection for sending egress data.
* @return Ingress of the connection
*/
Ingress ingress();

/**
* @return Egress of the connection for sending egress data.
*/
Egress egress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,18 @@ final class EstablishedContextImpl implements EstablishedContext {
private final ExecutorService executorService;
private final Socket socket;
private final InterestOps interestOps;
private final Clock clock;

private final BufferLeasePool bufferLeasePool;
private final Ingress ingress;
private final Egress egress;

EstablishedContextImpl(
ExecutorService executorService,
Socket socket,
InterestOps interestOps,
ClockFactory clockFactory
) {
EstablishedContextImpl(ExecutorService executorService, Socket socket, InterestOps interestOps) {
this.interestOps = interestOps;
this.executorService = executorService;
this.socket = socket;
this.clock = clockFactory.create(this);

this.bufferLeasePool = new BufferLeasePool();
this.ingress = new IngressImpl(this, this.bufferLeasePool, clock);
this.ingress = new IngressImpl(this, this.bufferLeasePool);
this.egress = new EgressImpl(this);

}
Expand All @@ -105,7 +98,7 @@ public void close() {
}

try {
clock.close();
ingress.close();
}
catch (Exception exception) {
LOGGER.warn("FrameDelegate close threw exception <{}>", exception.getMessage());
Expand Down Expand Up @@ -250,6 +243,11 @@ public Socket socket() {
return socket;
}

@Override
public Ingress ingress() {
return ingress;
}

@Override
public Egress egress() {
return egress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public Socket socket() {
throw new IllegalArgumentException("EstablishedContextStub does not implement this");
}

@Override
public Ingress ingress() {
throw new IllegalArgumentException("EstablishedContextStub does not implement this");
}

@Override
public Egress egress() {
throw new IllegalArgumentException("EstablishedContextStub does not implement this");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,11 @@ public interface Ingress extends Runnable {
void run();

AtomicBoolean needWrite();

void register(Clock clock);

void unregister(Clock clock);

void close() throws Exception;

}
61 changes: 57 additions & 4 deletions src/main/java/com/teragrep/net_01/channel/context/IngressImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ final class IngressImpl implements Ingress {
// tls
public final AtomicBoolean needWrite;

private final Clock clock;
private final List<Clock> interestedClocks;

IngressImpl(EstablishedContextImpl establishedContext, BufferLeasePool bufferLeasePool, Clock clock) {
IngressImpl(EstablishedContextImpl establishedContext, BufferLeasePool bufferLeasePool) {
this.establishedContext = establishedContext;
this.bufferLeasePool = bufferLeasePool;

this.activeBuffers = new LinkedList<>();
this.lock = new ReentrantLock();
this.needWrite = new AtomicBoolean();

this.clock = clock;
this.interestedClocks = new LinkedList<>();
}

@Override
Expand Down Expand Up @@ -115,7 +115,16 @@ public void run() {
activeBuffers
);

continueReading = clock.advance(bufferLease);
if (!interestedClocks.isEmpty()) {
for (Clock clock : interestedClocks) {
clock.advance(bufferLease);
}
}

if (interestedClocks.isEmpty()) {
continueReading = false;
}

LOGGER.debug("clock returned continueReading <{}>", continueReading);
if (!bufferLease.isTerminated() && bufferLease.buffer().hasRemaining()) {
// return back as it has some remaining
Expand Down Expand Up @@ -250,4 +259,48 @@ private void activateBuffers(List<BufferLease> bufferLeases) {
public AtomicBoolean needWrite() {
return needWrite;
}

@Override
public void register(Clock clock) {
lock.lock();
try {
if (!interestedClocks.isEmpty()) {
throw new IllegalStateException(
"Unable to register ingress clock, only one interested clock is allowed"
);
}
interestedClocks.add(clock);
establishedContext.interestOps().add(OP_READ);
}
finally {
lock.unlock();
}
}

@Override
public void unregister(Clock clock) {
lock.lock();
try {
if (!interestedClocks.contains(clock)) {
throw new IllegalStateException("Unable to unregister ingress clock, it is not registered");
}
interestedClocks.remove(clock);
}
finally {
lock.unlock();
}
}

@Override
public void close() throws Exception {
lock.lock();
try {
for (Clock clock : interestedClocks) {
clock.close();
}
}
finally {
lock.unlock();
}
}
}
133 changes: 78 additions & 55 deletions src/main/java/com/teragrep/net_01/channel/context/InterestOpsImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.slf4j.LoggerFactory;

import java.nio.channels.SelectionKey;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

final class InterestOpsImpl implements InterestOps {

Expand All @@ -58,81 +60,102 @@ final class InterestOpsImpl implements InterestOps {

private int currentOps;

private final Lock lock;

InterestOpsImpl(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
this.currentOps = selectionKey.interestOps();
this.lock = new ReentrantLock();
}

@Override
public void add(int op) {
int keysOps = selectionKey.interestOps();
int newOps = currentOps | op;
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Adding op <{}> to currentOps <{}>, newOps <{}>, keyOps <{}>, validOps <{}>", op,
currentOps, newOps, selectionKey.interestOps(), selectionKey.channel().validOps()
);
lock.lock();
try {
int keysOps = selectionKey.interestOps();
int newOps = currentOps | op;
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Adding op <{}> to currentOps <{}>, newOps <{}>, keyOps <{}>, validOps <{}>", op,
currentOps, newOps, selectionKey.interestOps(), selectionKey.channel().validOps()
);
}
currentOps = newOps;

selectionKey.interestOps(newOps); // CancelledKeyException

selectionKey.selector().wakeup();
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Added op <{}>, currentOps <{}>, keyOps <{}>, validOps <{}>", op, currentOps, keysOps,
selectionKey.channel().validOps()
);
}
}
currentOps = newOps;

selectionKey.interestOps(newOps); // CancelledKeyException

selectionKey.selector().wakeup();
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Added op <{}>, currentOps <{}>, keyOps <{}>, validOps <{}>", op, currentOps, keysOps,
selectionKey.channel().validOps()
);
finally {
lock.unlock();
}
}

@Override
public void remove(int op) {
int newOps = currentOps & ~op;
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Removing op <{}> from currentOps <{}>, newOps <{}>, keyOps <{}>, validOps <{}>", op,
currentOps, newOps, selectionKey.interestOps(), selectionKey.channel().validOps()
);
lock.lock();
try {
int newOps = currentOps & ~op;
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Removing op <{}> from currentOps <{}>, newOps <{}>, keyOps <{}>, validOps <{}>", op,
currentOps, newOps, selectionKey.interestOps(), selectionKey.channel().validOps()
);
}
currentOps = newOps;

selectionKey.interestOps(newOps); // CancelledKeyException

selectionKey.selector().wakeup();
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Removed op <{}>, currentOps <{}>, keyOps <{}>, validOps <{}>", op, currentOps,
selectionKey.interestOps(), selectionKey.channel().validOps()
);
}
}
currentOps = newOps;

selectionKey.interestOps(newOps); // CancelledKeyException

selectionKey.selector().wakeup();
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Removed op <{}>, currentOps <{}>, keyOps <{}>, validOps <{}>", op, currentOps,
selectionKey.interestOps(), selectionKey.channel().validOps()
);
finally {
lock.unlock();
}
}

@Override
public void removeAll() {
int keysOps = selectionKey.interestOps();
int newOps = 0;
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Removing all currentOps <{}>, newOps <{}>, keyOps <{}>, validOps <{}>", currentOps, newOps,
keysOps, selectionKey.channel().validOps()
);
lock.lock();
try {
int keysOps = selectionKey.interestOps();
int newOps = 0;
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Removing all currentOps <{}>, newOps <{}>, keyOps <{}>, validOps <{}>", currentOps,
newOps, keysOps, selectionKey.channel().validOps()
);
}

selectionKey.interestOps(newOps); // CancelledKeyException

selectionKey.selector().wakeup();
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Removed all ops. currentOps <{}>, keyOps <{}>, validOps <{}>", currentOps, keysOps,
selectionKey.channel().validOps()
);
}
}

selectionKey.interestOps(newOps); // CancelledKeyException

selectionKey.selector().wakeup();
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Removed all ops. currentOps <{}>, keyOps <{}>, validOps <{}>", currentOps, keysOps,
selectionKey.channel().validOps()
);
finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,12 @@ public void handleEvent(SelectionKey selectionKey) {
EstablishedContext establishedContext = new EstablishedContextImpl(
executorService,
socket,
interestOps,
clockFactory
interestOps
);

clientSelectionKey.attach(establishedContext);

// proper attachment attached, now it is safe to use
clientSelectionKey.interestOps(SelectionKey.OP_READ);
establishedContext.ingress().register(clockFactory.create(establishedContext));
}
}
catch (CancelledKeyException cke) {
Expand Down
Loading

0 comments on commit 99ca9e6

Please sign in to comment.