Skip to content

Commit

Permalink
Merge pull request #32 from kortemik/faster-parser
Browse files Browse the repository at this point in the history
refactor parser to use bytebuffers, make parser re-usable
  • Loading branch information
StrongestNumber9 authored May 11, 2023
2 parents b61cdf2 + 436f16f commit d2617df
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 59 deletions.
9 changes: 2 additions & 7 deletions src/main/java/com/teragrep/rlp_01/RelpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public class RelpConnection implements RelpSender {
private int txBufferSize;
private ByteBuffer preAllocatedTXBuffer;
private ByteBuffer preAllocatedRXBuffer;
private static final int MAX_COMMAND_LENGTH = 11;
private final RelpClientSocket relpClientSocket;
private final RelpParser parser = new RelpParser();

private final static byte[] OFFER;

Expand Down Expand Up @@ -252,8 +252,6 @@ private void readAcks(RelpBatch relpBatch)
throws IOException, TimeoutException, IllegalStateException {
LOGGER.trace("relpConnection.readAcks> entry");

RelpParser parser = null;

int readBytes;

boolean notComplete = this.window.size() > 0;
Expand All @@ -271,9 +269,6 @@ private void readAcks(RelpBatch relpBatch)
// process it
if (readBytes > 0) {
while (preAllocatedRXBuffer.hasRemaining()) {
if (parser == null) {
parser = new RelpParser();
}
parser.parse(preAllocatedRXBuffer.get());

if (parser.isComplete()) {
Expand All @@ -292,7 +287,7 @@ private void readAcks(RelpBatch relpBatch)
window.removePending(txnId);
}
// this one is complete, ready for next
parser = null;
parser.reset();
if (window.size() == 0) {
notComplete = false;
break;
Expand Down
116 changes: 65 additions & 51 deletions src/main/java/com/teragrep/rlp_01/RelpParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,40 @@
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
A hand-made parser to process RELP messages.
*/
public class RelpParser {
private static final Logger LOGGER = LoggerFactory.getLogger(RelpParser.class);

private static final int MAX_COMMAND_LENGTH = 11;

// perhaps antlr4 would be better for this and not some hand made parser
private relpParserState state;
private boolean isComplete;

private String frameTxnIdString;
private int frameTxnId;
private String frameCommandString;
private String frameLengthString;
private int frameLength;
private RelpParserState state = RelpParserState.TXN;
private boolean isComplete = false;
private final ByteBuffer txnIdBuffer = ByteBuffer.allocateDirect(String.valueOf(TxID.MAX_ID).length());
private int frameTxnId = -1;
private final ByteBuffer commandBuffer = ByteBuffer.allocateDirect(MAX_COMMAND_LENGTH);
private String frameCommand = "";
private final ByteBuffer lengthBuffer = ByteBuffer.allocateDirect(String.valueOf(Integer.MAX_VALUE).length());
private int frameLength = -1;
private int frameLengthLeft;
private ByteBuffer frameData;

private static final int MAX_COMMAND_LENGTH = 11;

public RelpParser() {
this.state = relpParserState.TXN;
this.frameTxnIdString = "";
this.frameTxnId = -1;
this.frameCommandString= "";
this.frameLengthString= "";
this.frameLength = -1;

}

@Deprecated
public RelpParser( boolean debug )
{
this.state = relpParserState.TXN;
this.frameTxnIdString = "";
this.frameTxnId = -1;
this.frameCommandString = "";
this.frameLengthString = "";
this.frameLength = -1;
public RelpParser( boolean debug ) {

}

private String byteBufferToAsciiString(ByteBuffer byteBuffer) {
return StandardCharsets.US_ASCII.decode(byteBuffer).toString();
}

public boolean isComplete() {
Expand All @@ -71,7 +66,7 @@ public int getTxnId() {
}

public String getCommandString() {
return this.frameCommandString;
return frameCommand;
}

public int getLength() {
Expand All @@ -82,16 +77,20 @@ public ByteBuffer getData() {
return this.frameData;
}

public relpParserState getState() { return this.state; }
public RelpParserState getState() {
return state;
}

private enum relpParserState {
private enum RelpParserState {
TXN,
COMMAND,
LENGTH,
DATA,
NL
}



/**
Parse the message byte-by-byte and enter each byte as a string to the proper
storage based on the state of the parser.
Expand All @@ -103,35 +102,38 @@ public void parse(byte b) {
switch (this.state) {
case TXN:
if (b == ' '){
frameTxnId = Integer.parseInt(frameTxnIdString);
txnIdBuffer.flip();
frameTxnId = Integer.parseInt(byteBufferToAsciiString(txnIdBuffer));
if (frameTxnId < 0) {
throw new IllegalArgumentException("TXNR must " +
"be >= 0");
throw new IllegalArgumentException("TXNR must be >= 0");
}
state = relpParserState.COMMAND;
LOGGER.trace( "relpParser> txnId <[{}]>", frameTxnId );
state = RelpParserState.COMMAND;
}
else {
frameTxnIdString += new String(new byte[] {b});
txnIdBuffer.put(b);
}
break;
case COMMAND:
if (b == ' '){
state = relpParserState.LENGTH;
LOGGER.trace( "relpParser> command <[{}]>", frameCommandString );
commandBuffer.flip();
// Spec constraints.
if( frameCommandString.length() > MAX_COMMAND_LENGTH &&
!frameCommandString.equals(RelpCommand.OPEN) &&
!frameCommandString.equals(RelpCommand.CLOSE) &&
!frameCommandString.equals(RelpCommand.ABORT) &&
!frameCommandString.equals(RelpCommand.SERVER_CLOSE) &&
!frameCommandString.equals(RelpCommand.SYSLOG) &&
!frameCommandString.equals(RelpCommand.RESPONSE)) {
frameCommand = byteBufferToAsciiString(commandBuffer);
LOGGER.trace( "relpParser> command <[{}]>", frameCommand);

if(
!frameCommand.equals(RelpCommand.OPEN) &&
!frameCommand.equals(RelpCommand.CLOSE) &&
!frameCommand.equals(RelpCommand.ABORT) &&
!frameCommand.equals(RelpCommand.SERVER_CLOSE) &&
!frameCommand.equals(RelpCommand.SYSLOG) &&
!frameCommand.equals(RelpCommand.RESPONSE)) {
throw new IllegalStateException( "Invalid COMMAND." );
}
state = RelpParserState.LENGTH;
}
else {
frameCommandString += new String(new byte[] {b});
commandBuffer.put(b);
}
break;
case LENGTH:
Expand All @@ -142,24 +144,23 @@ public void parse(byte b) {
HEADER = TXNR SP COMMAND SP DATALEN LF; and LF is for relpParserState.NL
*/
if (b == ' ' || b == '\n') {

frameLength = Integer.parseInt(frameLengthString);
lengthBuffer.flip();
frameLength = Integer.parseInt(byteBufferToAsciiString(lengthBuffer));

if (frameLength < 0) {
throw new IllegalArgumentException("DATALEN must be " +
">= 0");
throw new IllegalArgumentException("DATALEN must be >= 0");
}

frameLengthLeft = frameLength;
frameData = ByteBuffer.allocateDirect(frameLength);

// Length bytes done, move onto next state.
if (frameLength == 0 ) {
state = relpParserState.NL;
state = RelpParserState.NL;
} else {
state = relpParserState.DATA;
state = RelpParserState.DATA;
}
LOGGER.trace( "relpParser> length <[{}]>", frameLengthString );
LOGGER.trace( "relpParser> length <[{}]>", frameLength );
if (b == '\n') {
if (frameLength == 0) {
this.isComplete = true;
Expand All @@ -170,11 +171,11 @@ public void parse(byte b) {
}
}
else {
frameLengthString += new String(new byte[] {b});
lengthBuffer.put(b);
}
break;
case DATA:
if(this.isComplete) this.state = relpParserState.NL;
if(this.isComplete) this.state = RelpParserState.NL;
// Parser will only read the given length of data. If the message
// gives data bigger than the frameLength, bad luck for them.
if (frameLengthLeft > 0) {
Expand All @@ -187,7 +188,7 @@ public void parse(byte b) {
if (frameLengthLeft == 0) {
// make ready for consumer
frameData.flip();
state = relpParserState.NL;
state = RelpParserState.NL;

LOGGER.trace("relpParser> data buffer <{}>", frameData);

Expand All @@ -209,4 +210,17 @@ public void parse(byte b) {
break;
}
}

public void reset() {
state = RelpParserState.TXN;
isComplete = false;
txnIdBuffer.clear();
frameTxnId = -1;
commandBuffer.clear();
frameCommand = "";
lengthBuffer.clear();
frameLength = -1;
frameLengthLeft = 0;
frameData = null;
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/teragrep/rlp_01/TxID.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class TxID {

private int transactionIdentifier;
public int MAX_ID = 999999999;
public static final int MAX_ID = 999999999;

public TxID() {
this.transactionIdentifier = 1;
Expand Down

0 comments on commit d2617df

Please sign in to comment.