diff --git a/mmi/__init__.py b/mmi/__init__.py index c233aba..f21c6f0 100644 --- a/mmi/__init__.py +++ b/mmi/__init__.py @@ -6,19 +6,38 @@ def send_array(socket, A, flags=0, copy=False, track=False, metadata=None): """send a numpy array with metadata over zmq""" - md = dict( - dtype=str(A.dtype), - shape=A.shape, - timestamp=datetime.datetime.now().isoformat() - ) + + # create a metadata dictionary for the message + md = {} + # always add a timestamp + md['timestamp'] = datetime.datetime.now().isoformat() + + # copy extra metadata if metadata: md.update(metadata) + + # if we don't have an array if A is None: # send only json socket.send_json(md, flags) - else: - # send json, followed by array - socket.send_json(md, flags | zmq.SNDMORE) + # and we're done + return + + # add array metadata + md['dtype'] = str(A.dtype) + md['shape'] = A.shape + try: + # If an array has a fill value assume it's an array with missings + # store the fill_Value in the metadata and fill the array before sending. + # asscalar should work for scalar, 0d array or nd array of size 1 + md['fill_value'] = np.asscalar(A.fill_value) + A = A.filled() + except AttributeError: + # no masked array, nothing to do + pass + + # send json, followed by array + socket.send_json(md, flags | zmq.SNDMORE) # Make a copy if required and pass along the buffer msg = buffer(np.ascontiguousarray(A)) socket.send(msg, flags, copy=copy, track=track) @@ -33,6 +52,8 @@ def recv_array(socket, flags=0, copy=False, track=False): buf = buffer(msg) A = np.frombuffer(buf, dtype=md['dtype']) A = A.reshape(md['shape']) + if 'fill_value' in md: + A = np.ma.masked_equal(A, md['fill_value']) else: # No array expected A = None diff --git a/tests/test_simple.py b/tests/test_simple.py index dac364b..5811649 100644 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -10,6 +10,7 @@ ctx = zmq.Context() class TestRecv(unittest.TestCase): def test_sndrcv(self): + """send an array""" A = np.array([1,2,3]) req = ctx.socket(zmq.REQ) req.connect('tcp://localhost:9002') @@ -19,5 +20,41 @@ def test_sndrcv(self): B, metadata = mmi.recv_array(rep) numpy.testing.assert_array_equal(A, B) + def test_metadata_only(self): + """send a message with only metadata""" + req = ctx.socket(zmq.REQ) + req.connect('tcp://localhost:9002') + rep = ctx.socket(zmq.REP) + rep.bind('tcp://*:9002') + mmi.send_array(req, A=None) + _, metadata = mmi.recv_array(rep) + self.assertTrue('timestamp' in metadata) + + def test_missing(self): + """send an array with missing data""" + A = np.array([1, 2, 3, 4]) + A = np.ma.masked_less(A, 2) + req = ctx.socket(zmq.REQ) + req.connect('tcp://localhost:9002') + rep = ctx.socket(zmq.REP) + rep.bind('tcp://*:9002') + mmi.send_array(req, A) + B, metadata = mmi.recv_array(rep) + numpy.testing.assert_array_equal(A, B) + + def test_missing_scalar(self): + """send an array with missing data as a scalar""" + A = np.array([1, 2, 3, 4]) + A = np.ma.masked_less(A, 2) + # test if it works if we use a numpy scalar + A.fill_value = np.int32(9999) + req = ctx.socket(zmq.REQ) + req.connect('tcp://localhost:9002') + rep = ctx.socket(zmq.REP) + rep.bind('tcp://*:9002') + mmi.send_array(req, A) + B, metadata = mmi.recv_array(rep) + numpy.testing.assert_array_equal(A, B) +