diff --git a/src/main/java/com/teragrep/rlp_01/RelpConnection.java b/src/main/java/com/teragrep/rlp_01/RelpConnection.java index adf6902..d2784a6 100644 --- a/src/main/java/com/teragrep/rlp_01/RelpConnection.java +++ b/src/main/java/com/teragrep/rlp_01/RelpConnection.java @@ -40,8 +40,8 @@ public class RelpConnection implements RelpSender { private int txBufferSize; private ByteBuffer preAllocatedTXBuffer; private ByteBuffer preAllocatedRXBuffer; - private static final int MAX_COMMAND_LENGTH = 11; private final RelpClientSocket relpClientSocket; + private final RelpParser parser = new RelpParser(); private final static byte[] OFFER; @@ -252,8 +252,6 @@ private void readAcks(RelpBatch relpBatch) throws IOException, TimeoutException, IllegalStateException { LOGGER.trace("relpConnection.readAcks> entry"); - RelpParser parser = null; - int readBytes; boolean notComplete = this.window.size() > 0; @@ -271,9 +269,6 @@ private void readAcks(RelpBatch relpBatch) // process it if (readBytes > 0) { while (preAllocatedRXBuffer.hasRemaining()) { - if (parser == null) { - parser = new RelpParser(); - } parser.parse(preAllocatedRXBuffer.get()); if (parser.isComplete()) { @@ -292,7 +287,7 @@ private void readAcks(RelpBatch relpBatch) window.removePending(txnId); } // this one is complete, ready for next - parser = null; + parser.reset(); if (window.size() == 0) { notComplete = false; break; diff --git a/src/main/java/com/teragrep/rlp_01/RelpParser.java b/src/main/java/com/teragrep/rlp_01/RelpParser.java index ca7227e..0fc62fd 100644 --- a/src/main/java/com/teragrep/rlp_01/RelpParser.java +++ b/src/main/java/com/teragrep/rlp_01/RelpParser.java @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; /** A hand-made parser to process RELP messages. @@ -28,38 +29,32 @@ public class RelpParser { private static final Logger LOGGER = LoggerFactory.getLogger(RelpParser.class); + private static final int MAX_COMMAND_LENGTH = 11; + // perhaps antlr4 would be better for this and not some hand made parser - private relpParserState state; - private boolean isComplete; - - private String frameTxnIdString; - private int frameTxnId; - private String frameCommandString; - private String frameLengthString; - private int frameLength; + private RelpParserState state = RelpParserState.TXN; + private boolean isComplete = false; + private final ByteBuffer txnIdBuffer = ByteBuffer.allocateDirect(String.valueOf(TxID.MAX_ID).length()); + private int frameTxnId = -1; + private final ByteBuffer commandBuffer = ByteBuffer.allocateDirect(MAX_COMMAND_LENGTH); + private String frameCommand = ""; + private final ByteBuffer lengthBuffer = ByteBuffer.allocateDirect(String.valueOf(Integer.MAX_VALUE).length()); + private int frameLength = -1; private int frameLengthLeft; private ByteBuffer frameData; - private static final int MAX_COMMAND_LENGTH = 11; public RelpParser() { - this.state = relpParserState.TXN; - this.frameTxnIdString = ""; - this.frameTxnId = -1; - this.frameCommandString= ""; - this.frameLengthString= ""; - this.frameLength = -1; + } @Deprecated - public RelpParser( boolean debug ) - { - this.state = relpParserState.TXN; - this.frameTxnIdString = ""; - this.frameTxnId = -1; - this.frameCommandString = ""; - this.frameLengthString = ""; - this.frameLength = -1; + public RelpParser( boolean debug ) { + + } + + private String byteBufferToAsciiString(ByteBuffer byteBuffer) { + return StandardCharsets.US_ASCII.decode(byteBuffer).toString(); } public boolean isComplete() { @@ -71,7 +66,7 @@ public int getTxnId() { } public String getCommandString() { - return this.frameCommandString; + return frameCommand; } public int getLength() { @@ -82,9 +77,11 @@ public ByteBuffer getData() { return this.frameData; } - public relpParserState getState() { return this.state; } + public RelpParserState getState() { + return state; + } - private enum relpParserState { + private enum RelpParserState { TXN, COMMAND, LENGTH, @@ -92,6 +89,8 @@ private enum relpParserState { NL } + + /** Parse the message byte-by-byte and enter each byte as a string to the proper storage based on the state of the parser. @@ -103,35 +102,38 @@ public void parse(byte b) { switch (this.state) { case TXN: if (b == ' '){ - frameTxnId = Integer.parseInt(frameTxnIdString); + txnIdBuffer.flip(); + frameTxnId = Integer.parseInt(byteBufferToAsciiString(txnIdBuffer)); if (frameTxnId < 0) { - throw new IllegalArgumentException("TXNR must " + - "be >= 0"); + throw new IllegalArgumentException("TXNR must be >= 0"); } - state = relpParserState.COMMAND; LOGGER.trace( "relpParser> txnId <[{}]>", frameTxnId ); + state = RelpParserState.COMMAND; } else { - frameTxnIdString += new String(new byte[] {b}); + txnIdBuffer.put(b); } break; case COMMAND: if (b == ' '){ - state = relpParserState.LENGTH; - LOGGER.trace( "relpParser> command <[{}]>", frameCommandString ); + commandBuffer.flip(); // Spec constraints. - if( frameCommandString.length() > MAX_COMMAND_LENGTH && - !frameCommandString.equals(RelpCommand.OPEN) && - !frameCommandString.equals(RelpCommand.CLOSE) && - !frameCommandString.equals(RelpCommand.ABORT) && - !frameCommandString.equals(RelpCommand.SERVER_CLOSE) && - !frameCommandString.equals(RelpCommand.SYSLOG) && - !frameCommandString.equals(RelpCommand.RESPONSE)) { + frameCommand = byteBufferToAsciiString(commandBuffer); + LOGGER.trace( "relpParser> command <[{}]>", frameCommand); + + if( + !frameCommand.equals(RelpCommand.OPEN) && + !frameCommand.equals(RelpCommand.CLOSE) && + !frameCommand.equals(RelpCommand.ABORT) && + !frameCommand.equals(RelpCommand.SERVER_CLOSE) && + !frameCommand.equals(RelpCommand.SYSLOG) && + !frameCommand.equals(RelpCommand.RESPONSE)) { throw new IllegalStateException( "Invalid COMMAND." ); } + state = RelpParserState.LENGTH; } else { - frameCommandString += new String(new byte[] {b}); + commandBuffer.put(b); } break; case LENGTH: @@ -142,12 +144,11 @@ public void parse(byte b) { HEADER = TXNR SP COMMAND SP DATALEN LF; and LF is for relpParserState.NL */ if (b == ' ' || b == '\n') { - - frameLength = Integer.parseInt(frameLengthString); + lengthBuffer.flip(); + frameLength = Integer.parseInt(byteBufferToAsciiString(lengthBuffer)); if (frameLength < 0) { - throw new IllegalArgumentException("DATALEN must be " + - ">= 0"); + throw new IllegalArgumentException("DATALEN must be >= 0"); } frameLengthLeft = frameLength; @@ -155,11 +156,11 @@ public void parse(byte b) { // Length bytes done, move onto next state. if (frameLength == 0 ) { - state = relpParserState.NL; + state = RelpParserState.NL; } else { - state = relpParserState.DATA; + state = RelpParserState.DATA; } - LOGGER.trace( "relpParser> length <[{}]>", frameLengthString ); + LOGGER.trace( "relpParser> length <[{}]>", frameLength ); if (b == '\n') { if (frameLength == 0) { this.isComplete = true; @@ -170,11 +171,11 @@ public void parse(byte b) { } } else { - frameLengthString += new String(new byte[] {b}); + lengthBuffer.put(b); } break; case DATA: - if(this.isComplete) this.state = relpParserState.NL; + if(this.isComplete) this.state = RelpParserState.NL; // Parser will only read the given length of data. If the message // gives data bigger than the frameLength, bad luck for them. if (frameLengthLeft > 0) { @@ -187,7 +188,7 @@ public void parse(byte b) { if (frameLengthLeft == 0) { // make ready for consumer frameData.flip(); - state = relpParserState.NL; + state = RelpParserState.NL; LOGGER.trace("relpParser> data buffer <{}>", frameData); @@ -209,4 +210,17 @@ public void parse(byte b) { break; } } + + public void reset() { + state = RelpParserState.TXN; + isComplete = false; + txnIdBuffer.clear(); + frameTxnId = -1; + commandBuffer.clear(); + frameCommand = ""; + lengthBuffer.clear(); + frameLength = -1; + frameLengthLeft = 0; + frameData = null; + } } diff --git a/src/main/java/com/teragrep/rlp_01/TxID.java b/src/main/java/com/teragrep/rlp_01/TxID.java index b751f7b..38b9ade 100644 --- a/src/main/java/com/teragrep/rlp_01/TxID.java +++ b/src/main/java/com/teragrep/rlp_01/TxID.java @@ -25,7 +25,7 @@ public class TxID { private int transactionIdentifier; - public int MAX_ID = 999999999; + public static final int MAX_ID = 999999999; public TxID() { this.transactionIdentifier = 1;