Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update FIXMessageEncoder.java #47

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,36 @@

package quickfix.mina.message;

import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import org.apache.mina.core.buffer.AbstractIoBuffer;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.buffer.IoBufferAllocator;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.demux.MessageEncoder;
import org.quickfixj.CharsetSupport;

import quickfix.Message;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* Encodes a Message object or message string as a byte array to be
* transmitted on MINA connection.
*/
public class FIXMessageEncoder implements MessageEncoder<Object> {

private static final Set<Class<?>> TYPES;
private final String charsetEncoding;

static {
Set<Class<?>> types = new HashSet<Class<?>>();
Expand All @@ -49,36 +57,201 @@ public class FIXMessageEncoder implements MessageEncoder<Object> {
TYPES = Collections.unmodifiableSet(types);
}

public FIXMessageEncoder() {
charsetEncoding = CharsetSupport.getCharset();
}

public static Set<Class<?>> getMessageTypes() {
return TYPES;
}

public void encode(IoSession session, Object message, ProtocolEncoderOutput out)
throws ProtocolCodecException {
String fixMessageString;
if (message instanceof String) {
fixMessageString = (String) message;
} else if (message instanceof Message) {
fixMessageString = message.toString();
private static final int DEFAULT_BUFFER_SIZE = 8192;
private static final int MAX_BUFFERS_BY_THREAD = 16;

protected static final class ThreadEncoder {
private final ThreadBufferAllocator allocator = new ThreadBufferAllocator();
private final CharsetEncoder charsetEncoder;

private ThreadEncoder(Charset charset) {
charsetEncoder = charset.newEncoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE);
}
}

private final Charset charset;

private final ThreadLocal<ThreadEncoder> encoders = new ThreadLocal<ThreadEncoder>() {
@Override
protected ThreadEncoder initialValue() {
return new ThreadEncoder(charset);
}
};

public FIXMessageEncoder() {
String charsetName = CharsetSupport.getCharset();
try {
charset = Charset.forName(null != charsetName ? charsetName : "ISO-8859-1");
} catch (Exception e) {
throw new RuntimeException(new UnsupportedEncodingException(charsetName));
}
}

@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws ProtocolCodecException {
String text = message.toString();
if(text.length() > DEFAULT_BUFFER_SIZE) {
sendWrapMessage(text, out);
} else {
throw new ProtocolCodecException("Invalid FIX message object type: "
+ message.getClass());
ThreadEncoder threadEncoder = encoders.get();
IoBuffer ioBuffer = threadEncoder.allocator.getBuffer();
try {
encodeBytes(text, ioBuffer, threadEncoder.charsetEncoder);
} catch(ProtocolCodecException pce) {
try { // possible buffer overflow
ioBuffer.free();
ioBuffer = IoBuffer.wrap(text.getBytes(threadEncoder.charsetEncoder.charset()));
} catch(Exception e) {
throw new ProtocolCodecException(e);
}
}
out.write(ioBuffer.flip());
}
}

byte[] bytes;
private void sendWrapMessage(String message, ProtocolEncoderOutput out) throws ProtocolCodecException {
try {
bytes = fixMessageString.getBytes(charsetEncoding);
} catch (UnsupportedEncodingException e) {
out.write(IoBuffer.wrap(message.getBytes(charset)));
} catch (Exception e) {
throw new ProtocolCodecException(e);
}
}

private static final ProtocolCodecException overflowCodecException = new ProtocolCodecException("OVERFLOW");

IoBuffer buffer = IoBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
out.write(buffer);
private void encodeBytes(String message, IoBuffer output, CharsetEncoder charsetEncoder) throws ProtocolCodecException {
if(message.length() > 0) try {
CoderResult cr = charsetEncoder.encode(CharBuffer.wrap(message), output.buf(), true);
if(!cr.isUnderflow())
throw overflowCodecException;
cr = charsetEncoder.flush(output.buf());
if(!cr.isUnderflow())
throw overflowCodecException;
} finally {
charsetEncoder.reset();
}
}

private static final class ThreadBufferAllocator implements IoBufferAllocator {

private final ArrayList<IoBuffer> ioBuffers = new ArrayList<IoBuffer>();

private ThreadBufferAllocator() {
for(int i = 0; i < MAX_BUFFERS_BY_THREAD; i++) {
ioBuffers.add(allocate(DEFAULT_BUFFER_SIZE, true, true));
}
}

private IoBuffer getBuffer() {
synchronized(ioBuffers) {
if(!ioBuffers.isEmpty())
return ioBuffers.remove(ioBuffers.size() - 1);
}
return allocate(DEFAULT_BUFFER_SIZE, true);
}

@Override
public IoBuffer allocate(int capacity, boolean direct) {
return allocate(capacity, direct, false);
}

private IoBuffer allocate(int capacity, boolean direct, boolean reuse) {
return wrap(allocateNioBuffer(capacity, direct), reuse);
}

@Override
public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
return direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
}

@Override
public IoBuffer wrap(ByteBuffer nioBuffer) {
return wrap(nioBuffer, false);
}

private IoBuffer wrap(ByteBuffer nioBuffer, boolean reuse) {
return new SimpleBuffer(this, nioBuffer, reuse);
}

@Override
public void dispose() {
synchronized (ioBuffers) {
ioBuffers.clear();
}
}

private final class SimpleBuffer extends AbstractIoBuffer {
private ByteBuffer buf;
private final boolean recycle;

protected SimpleBuffer(ThreadBufferAllocator allocator, ByteBuffer buf, boolean recycle) {
super(allocator, buf.capacity());
this.buf = buf;
this.recycle = recycle;
buf.order(ByteOrder.BIG_ENDIAN);
}

protected SimpleBuffer(SimpleBuffer parent, ByteBuffer buf) {
super(parent);
this.buf = buf;
recycle = false;
}

@Override
public ByteBuffer buf() {
return buf;
}

@Override
protected void buf(ByteBuffer buf) {
this.buf = buf;
}

@Override
protected IoBuffer duplicate0() {
return new SimpleBuffer(this, this.buf.duplicate());
}

@Override
protected IoBuffer slice0() {
return new SimpleBuffer(this, this.buf.slice());
}

@Override
protected IoBuffer asReadOnlyBuffer0() {
return new SimpleBuffer(this, this.buf.asReadOnlyBuffer());
}

@Override
public byte[] array() {
return buf.array();
}

@Override
public int arrayOffset() {
return buf.arrayOffset();
}

@Override
public boolean hasArray() {
return buf.hasArray();
}

@Override
public void free() {
clear();
if(recycle) {
synchronized (ioBuffers) {
ioBuffers.add(this);
}
}
}
}
}
}