Skip to content

Commit

Permalink
Merge pull request #3 from SiggyF/fedor-missing
Browse files Browse the repository at this point in the history
Fedor missing
  • Loading branch information
SiggyF committed Mar 24, 2014
2 parents a1dce1a + 8f73fe9 commit 36a079e
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 8 deletions.
37 changes: 29 additions & 8 deletions mmi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,38 @@

def send_array(socket, A, flags=0, copy=False, track=False, metadata=None):
"""send a numpy array with metadata over zmq"""
md = dict(
dtype=str(A.dtype),
shape=A.shape,
timestamp=datetime.datetime.now().isoformat()
)

# create a metadata dictionary for the message
md = {}
# always add a timestamp
md['timestamp'] = datetime.datetime.now().isoformat()

# copy extra metadata
if metadata:
md.update(metadata)

# if we don't have an array
if A is None:
# send only json
socket.send_json(md, flags)
else:
# send json, followed by array
socket.send_json(md, flags | zmq.SNDMORE)
# and we're done
return

# add array metadata
md['dtype'] = str(A.dtype)
md['shape'] = A.shape
try:
# If an array has a fill value assume it's an array with missings
# store the fill_Value in the metadata and fill the array before sending.
# asscalar should work for scalar, 0d array or nd array of size 1
md['fill_value'] = np.asscalar(A.fill_value)
A = A.filled()
except AttributeError:
# no masked array, nothing to do
pass

# send json, followed by array
socket.send_json(md, flags | zmq.SNDMORE)
# Make a copy if required and pass along the buffer
msg = buffer(np.ascontiguousarray(A))
socket.send(msg, flags, copy=copy, track=track)
Expand All @@ -33,6 +52,8 @@ def recv_array(socket, flags=0, copy=False, track=False):
buf = buffer(msg)
A = np.frombuffer(buf, dtype=md['dtype'])
A = A.reshape(md['shape'])
if 'fill_value' in md:
A = np.ma.masked_equal(A, md['fill_value'])
else:
# No array expected
A = None
Expand Down
37 changes: 37 additions & 0 deletions tests/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ctx = zmq.Context()
class TestRecv(unittest.TestCase):
def test_sndrcv(self):
"""send an array"""
A = np.array([1,2,3])
req = ctx.socket(zmq.REQ)
req.connect('tcp://localhost:9002')
Expand All @@ -19,5 +20,41 @@ def test_sndrcv(self):
B, metadata = mmi.recv_array(rep)
numpy.testing.assert_array_equal(A, B)

def test_metadata_only(self):
"""send a message with only metadata"""
req = ctx.socket(zmq.REQ)
req.connect('tcp://localhost:9002')
rep = ctx.socket(zmq.REP)
rep.bind('tcp://*:9002')
mmi.send_array(req, A=None)
_, metadata = mmi.recv_array(rep)
self.assertTrue('timestamp' in metadata)

def test_missing(self):
"""send an array with missing data"""
A = np.array([1, 2, 3, 4])
A = np.ma.masked_less(A, 2)
req = ctx.socket(zmq.REQ)
req.connect('tcp://localhost:9002')
rep = ctx.socket(zmq.REP)
rep.bind('tcp://*:9002')
mmi.send_array(req, A)
B, metadata = mmi.recv_array(rep)
numpy.testing.assert_array_equal(A, B)

def test_missing_scalar(self):
"""send an array with missing data as a scalar"""
A = np.array([1, 2, 3, 4])
A = np.ma.masked_less(A, 2)
# test if it works if we use a numpy scalar
A.fill_value = np.int32(9999)
req = ctx.socket(zmq.REQ)
req.connect('tcp://localhost:9002')
rep = ctx.socket(zmq.REP)
rep.bind('tcp://*:9002')
mmi.send_array(req, A)
B, metadata = mmi.recv_array(rep)
numpy.testing.assert_array_equal(A, B)



0 comments on commit 36a079e

Please sign in to comment.