Skip to content

Commit

Permalink
extract framing from RelpFrame to FrameClock (#186)
Browse files Browse the repository at this point in the history
* extract framing from RelpFrame to FrameClock

* give private methods of RelpReadImpl better names: innerLoop -> attemptFrameCompletion, processFrame -> delegateFrame

* remove left over comment
  • Loading branch information
kortemik authored May 20, 2024
1 parent 0de508a commit e9267a7
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 99 deletions.
48 changes: 18 additions & 30 deletions src/main/java/com/teragrep/rlp_03/channel/context/RelpReadImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@
*/
package com.teragrep.rlp_03.channel.context;

import com.teragrep.rlp_03.frame.*;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
import com.teragrep.rlp_03.channel.buffer.BufferLease;
import com.teragrep.rlp_03.channel.buffer.BufferLeasePool;
import com.teragrep.rlp_03.frame.RelpFrameAccess;
import com.teragrep.rlp_03.frame.RelpFrameImpl;
import com.teragrep.rlp_03.frame.RelpFrameLeaseful;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tlschannel.NeedsReadException;
Expand All @@ -60,7 +58,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -76,7 +73,8 @@ final class RelpReadImpl implements RelpRead {
private final EstablishedContextImpl establishedContext;
private final FrameDelegate frameDelegate;
private final BufferLeasePool bufferLeasePool;
private final List<RelpFrameLeaseful> relpFrames;
private final FrameClockLeaseful frameClockLeaseful;
private final RelpFrameStub relpFrameStub;
private final LinkedList<BufferLease> activeBuffers;
private final Lock lock;
// tls
Expand All @@ -91,7 +89,8 @@ final class RelpReadImpl implements RelpRead {
this.frameDelegate = frameDelegate;
this.bufferLeasePool = bufferLeasePool;

this.relpFrames = new ArrayList<>(1);
this.frameClockLeaseful = new FrameClockLeaseful(bufferLeasePool, new FrameClock());
this.relpFrameStub = new RelpFrameStub();
this.activeBuffers = new LinkedList<>();
this.lock = new ReentrantLock();
this.needWrite = new AtomicBoolean();
Expand All @@ -107,49 +106,39 @@ public void run() {
}
while (true) {
LOGGER.debug("run loop start");
// TODO implement better state store?
RelpFrameLeaseful relpFrame;
if (relpFrames.isEmpty()) {
relpFrame = new RelpFrameLeaseful(bufferLeasePool, new RelpFrameImpl());
}
else {
relpFrame = relpFrames.remove(0);
}

boolean complete = false;
// resume if frame is present
RelpFrame frame = relpFrameStub;
if (!activeBuffers.isEmpty()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("resuming buffer <{}>, activeBuffers <{}>", activeBuffers.get(0), activeBuffers);
}
complete = innerLoop(relpFrame);
frame = attemptFrameCompletion();
}

while (activeBuffers.isEmpty() && !complete) {
while (activeBuffers.isEmpty() && frame.isStub()) {
// fill buffers for read
long readBytes = readData();

if (readBytesToOperation(readBytes)) {
LOGGER.debug("readBytesToOperation(readBytes) forces return");
relpFrames.add(relpFrame); // back to list, as incomplete it is
return; // TODO this is quite ugly return, single point of return is preferred!
}

if (innerLoop(relpFrame)) {
frame = attemptFrameCompletion();
if (!frame.isStub()) {
break;
}

}

if (relpFrame.endOfTransfer().isComplete()) {
LOGGER.trace("received relpFrame <[{}]>", relpFrame);
if (!frame.isStub()) {
LOGGER.trace("received relpFrame <[{}]>", frame);
LOGGER.debug("frame complete, activeBuffers <{}>", activeBuffers);
if (!processFrame(relpFrame)) {
if (!delegateFrame(frame)) {
break;
}
}
else {
relpFrames.add(relpFrame); // back to list, as incomplete it is
LOGGER.debug("frame partial, activeBuffers <{}>", activeBuffers);
}
LOGGER.debug("loop done!");
Expand All @@ -164,16 +153,15 @@ public void run() {
}
}

private boolean innerLoop(RelpFrameLeaseful relpFrame) {
boolean rv = false;
private RelpFrame attemptFrameCompletion() {
RelpFrame rv = relpFrameStub;
while (!activeBuffers.isEmpty()) {
// TODO redesign this, very coupled design here !
BufferLease buffer = activeBuffers.removeFirst();
LOGGER.debug("submitting buffer <{}> from activeBuffers <{}> to relpFrame", buffer, activeBuffers);

if (relpFrame.submit(buffer)) {
rv = true;

rv = frameClockLeaseful.submit(buffer);
if (!rv.isStub()) {
if (buffer.buffer().hasRemaining()) {
buffer.addRef(); // a shared buffer

Expand Down Expand Up @@ -225,7 +213,7 @@ else if (readBytes < 0) {
return false;
}

private boolean processFrame(RelpFrameLeaseful relpFrame) {
private boolean delegateFrame(RelpFrame relpFrame) {
boolean rv;

RelpFrameAccess relpFrameAccess = new RelpFrameAccess(relpFrame);
Expand Down
123 changes: 123 additions & 0 deletions src/main/java/com/teragrep/rlp_03/frame/FrameClock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Java Reliable Event Logging Protocol Library Server Implementation RLP-03
* Copyright (C) 2021-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.rlp_03.frame;

import com.teragrep.rlp_03.frame.fragment.Fragment;
import com.teragrep.rlp_03.frame.fragment.FragmentImpl;
import com.teragrep.rlp_03.frame.fragment.FragmentStub;
import com.teragrep.rlp_03.frame.function.*;

import java.nio.ByteBuffer;

public class FrameClock {

private final RelpFrame relpFrameStub;

private Fragment txn;
private Fragment command;
private Fragment payloadLength;
private Fragment payload;
private Fragment endOfTransfer;

public FrameClock() {
this.relpFrameStub = new RelpFrameStub();
clear();
}

private void clear() {
this.txn = new FragmentImpl(new TransactionFunction());
this.command = new FragmentImpl(new CommandFunction());
this.payloadLength = new FragmentImpl(new PayloadLengthFunction());
this.payload = new FragmentStub();
this.endOfTransfer = new FragmentImpl(new EndOfTransferFunction());
}

public synchronized RelpFrame submit(ByteBuffer input) {
boolean ready = false;

while (input.hasRemaining() && !ready) {

if (!txn.isComplete()) {
txn.accept(input);
}
else if (!command.isComplete()) {
command.accept(input);
}
else if (!payloadLength.isComplete()) {
payloadLength.accept(input);

if (payloadLength.isComplete()) {
// PayloadFunction depends on payload length and needs to by dynamically created
int payloadSize = payloadLength.toInt();
payload = new FragmentImpl(new PayloadFunction(payloadSize));
}
}
else if (!payload.isComplete()) {
payload.accept(input);
}
else if (!endOfTransfer.isComplete()) {
endOfTransfer.accept(input);

if (endOfTransfer.isComplete()) {
// all complete
ready = true;
}
}
else {
throw new IllegalStateException("submit not allowed on a complete frame");
}
}

if (ready) {
RelpFrame relpFrame = new RelpFrameImpl(txn, command, payloadLength, payload, endOfTransfer);
clear();
return relpFrame;
}
else {
return relpFrameStub;
}
}
}
83 changes: 83 additions & 0 deletions src/main/java/com/teragrep/rlp_03/frame/FrameClockLeaseful.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Java Reliable Event Logging Protocol Library Server Implementation RLP-03
* Copyright (C) 2021-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.rlp_03.frame;

import com.teragrep.rlp_03.channel.buffer.BufferLease;
import com.teragrep.rlp_03.channel.buffer.BufferLeasePool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

public class FrameClockLeaseful {

private final BufferLeasePool bufferLeasePool;
private final FrameClock frameClock;
private final List<BufferLease> leases;

public FrameClockLeaseful(BufferLeasePool bufferLeasePool, FrameClock frameClock) {
this.bufferLeasePool = bufferLeasePool;
this.frameClock = frameClock;
this.leases = new ArrayList<>();
}

public RelpFrame submit(BufferLease bufferLease) {
leases.add(bufferLease);
RelpFrame relpFrame = frameClock.submit(bufferLease.buffer());

RelpFrame rv;
if (relpFrame.isStub()) {
rv = new RelpFrameLeaseful(bufferLeasePool, relpFrame, Collections.emptyList());
}
else {
LinkedList<BufferLease> frameLeases = new LinkedList<>(leases);
rv = new RelpFrameLeaseful(bufferLeasePool, relpFrame, frameLeases);
leases.clear();
}
return rv;
}
}
Loading

0 comments on commit e9267a7

Please sign in to comment.