diff --git a/quickfixj-core/src/main/java/quickfix/mina/message/FIXMessageEncoder.java b/quickfixj-core/src/main/java/quickfix/mina/message/FIXMessageEncoder.java index d3fe41e2c0..d3654966cd 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/message/FIXMessageEncoder.java +++ b/quickfixj-core/src/main/java/quickfix/mina/message/FIXMessageEncoder.java @@ -19,20 +19,29 @@ package quickfix.mina.message; -import java.io.UnsupportedEncodingException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - +import org.apache.mina.core.buffer.AbstractIoBuffer; import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.buffer.IoBufferAllocator; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; import org.quickfixj.CharsetSupport; - import quickfix.Message; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + /** * Encodes a Message object or message string as a byte array to be * transmitted on MINA connection. @@ -40,7 +49,6 @@ public class FIXMessageEncoder implements MessageEncoder { private static final Set> TYPES; - private final String charsetEncoding; static { Set> types = new HashSet>(); @@ -49,36 +57,201 @@ public class FIXMessageEncoder implements MessageEncoder { TYPES = Collections.unmodifiableSet(types); } - public FIXMessageEncoder() { - charsetEncoding = CharsetSupport.getCharset(); - } - public static Set> getMessageTypes() { return TYPES; } - public void encode(IoSession session, Object message, ProtocolEncoderOutput out) - throws ProtocolCodecException { - String fixMessageString; - if (message instanceof String) { - fixMessageString = (String) message; - } else if (message instanceof Message) { - fixMessageString = message.toString(); + private static final int DEFAULT_BUFFER_SIZE = 8192; + private static final int MAX_BUFFERS_BY_THREAD = 16; + + protected static final class ThreadEncoder { + private final ThreadBufferAllocator allocator = new ThreadBufferAllocator(); + private final CharsetEncoder charsetEncoder; + + private ThreadEncoder(Charset charset) { + charsetEncoder = charset.newEncoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); + } + } + + private final Charset charset; + + private final ThreadLocal encoders = new ThreadLocal() { + @Override + protected ThreadEncoder initialValue() { + return new ThreadEncoder(charset); + } + }; + + public FIXMessageEncoder() { + String charsetName = CharsetSupport.getCharset(); + try { + charset = Charset.forName(null != charsetName ? charsetName : "ISO-8859-1"); + } catch (Exception e) { + throw new RuntimeException(new UnsupportedEncodingException(charsetName)); + } + } + + @Override + public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws ProtocolCodecException { + String text = message.toString(); + if(text.length() > DEFAULT_BUFFER_SIZE) { + sendWrapMessage(text, out); } else { - throw new ProtocolCodecException("Invalid FIX message object type: " - + message.getClass()); + ThreadEncoder threadEncoder = encoders.get(); + IoBuffer ioBuffer = threadEncoder.allocator.getBuffer(); + try { + encodeBytes(text, ioBuffer, threadEncoder.charsetEncoder); + } catch(ProtocolCodecException pce) { + try { // possible buffer overflow + ioBuffer.free(); + ioBuffer = IoBuffer.wrap(text.getBytes(threadEncoder.charsetEncoder.charset())); + } catch(Exception e) { + throw new ProtocolCodecException(e); + } + } + out.write(ioBuffer.flip()); } + } - byte[] bytes; + private void sendWrapMessage(String message, ProtocolEncoderOutput out) throws ProtocolCodecException { try { - bytes = fixMessageString.getBytes(charsetEncoding); - } catch (UnsupportedEncodingException e) { + out.write(IoBuffer.wrap(message.getBytes(charset))); + } catch (Exception e) { throw new ProtocolCodecException(e); } + } + + private static final ProtocolCodecException overflowCodecException = new ProtocolCodecException("OVERFLOW"); - IoBuffer buffer = IoBuffer.allocate(bytes.length); - buffer.put(bytes); - buffer.flip(); - out.write(buffer); + private void encodeBytes(String message, IoBuffer output, CharsetEncoder charsetEncoder) throws ProtocolCodecException { + if(message.length() > 0) try { + CoderResult cr = charsetEncoder.encode(CharBuffer.wrap(message), output.buf(), true); + if(!cr.isUnderflow()) + throw overflowCodecException; + cr = charsetEncoder.flush(output.buf()); + if(!cr.isUnderflow()) + throw overflowCodecException; + } finally { + charsetEncoder.reset(); + } + } + + private static final class ThreadBufferAllocator implements IoBufferAllocator { + + private final ArrayList ioBuffers = new ArrayList(); + + private ThreadBufferAllocator() { + for(int i = 0; i < MAX_BUFFERS_BY_THREAD; i++) { + ioBuffers.add(allocate(DEFAULT_BUFFER_SIZE, true, true)); + } + } + + private IoBuffer getBuffer() { + synchronized(ioBuffers) { + if(!ioBuffers.isEmpty()) + return ioBuffers.remove(ioBuffers.size() - 1); + } + return allocate(DEFAULT_BUFFER_SIZE, true); + } + + @Override + public IoBuffer allocate(int capacity, boolean direct) { + return allocate(capacity, direct, false); + } + + private IoBuffer allocate(int capacity, boolean direct, boolean reuse) { + return wrap(allocateNioBuffer(capacity, direct), reuse); + } + + @Override + public ByteBuffer allocateNioBuffer(int capacity, boolean direct) { + return direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity); + } + + @Override + public IoBuffer wrap(ByteBuffer nioBuffer) { + return wrap(nioBuffer, false); + } + + private IoBuffer wrap(ByteBuffer nioBuffer, boolean reuse) { + return new SimpleBuffer(this, nioBuffer, reuse); + } + + @Override + public void dispose() { + synchronized (ioBuffers) { + ioBuffers.clear(); + } + } + + private final class SimpleBuffer extends AbstractIoBuffer { + private ByteBuffer buf; + private final boolean recycle; + + protected SimpleBuffer(ThreadBufferAllocator allocator, ByteBuffer buf, boolean recycle) { + super(allocator, buf.capacity()); + this.buf = buf; + this.recycle = recycle; + buf.order(ByteOrder.BIG_ENDIAN); + } + + protected SimpleBuffer(SimpleBuffer parent, ByteBuffer buf) { + super(parent); + this.buf = buf; + recycle = false; + } + + @Override + public ByteBuffer buf() { + return buf; + } + + @Override + protected void buf(ByteBuffer buf) { + this.buf = buf; + } + + @Override + protected IoBuffer duplicate0() { + return new SimpleBuffer(this, this.buf.duplicate()); + } + + @Override + protected IoBuffer slice0() { + return new SimpleBuffer(this, this.buf.slice()); + } + + @Override + protected IoBuffer asReadOnlyBuffer0() { + return new SimpleBuffer(this, this.buf.asReadOnlyBuffer()); + } + + @Override + public byte[] array() { + return buf.array(); + } + + @Override + public int arrayOffset() { + return buf.arrayOffset(); + } + + @Override + public boolean hasArray() { + return buf.hasArray(); + } + + @Override + public void free() { + clear(); + if(recycle) { + synchronized (ioBuffers) { + ioBuffers.add(this); + } + } + } + } } }