How can you implement DAQmx streaming & callback functions using betterproto stubs? #398
-
Per the suggestion in the Wiki here https://github.com/ni/grpc-device/wiki/Creating-a-gRPC-Client, I am using the betterproto compiler to generate python NI clients. (This has proved critical in order to support both typing and asyncio behavior.) I read the Wiki section here on NI DAQmx callback functions, and have looked closely at the I have attempted to replicate this behavior using betterproto clients; however, I haven't figured out a straightforward way to "finish" the read task once the "DoneEvent" occurs. It appears that betterproto does not provide access to the gRPC stream while using the async generator from the the "register_" methods (see source code for the As such, when receiving a done message in Here's some example code I have put together which contains the meat of the relevant logic from the example above (also configured to collect 1000 samples per channel at 10000 Hz): response = await raise_if_error_async(
daq_client.get_task_attribute_u_int32(
task=task,
attribute_raw=nidaqmx_grpc.TaskUInt32Attributes.TASK_ATTRIBUTE_NUM_CHANS,
)
)
number_of_channels = response.value
async def read_data():
async for every_n_samples_response in daq_client.register_every_n_samples_event(
task=task,
n_samples=100,
every_n_samples_event_type_raw=nidaqmx_grpc.EveryNSamplesEventType.EVERY_N_SAMPLES_EVENT_TYPE_ACQUIRED_INTO_BUFFER,
):
await _raise_if_error_sync(every_n_samples_response)
read_response = await raise_if_error_async(
daq_client.read_analog_f64(
task=task,
num_samps_per_chan=100,
fill_mode_raw=nidaqmx_grpc.GroupBy.GROUP_BY_GROUP_BY_CHANNEL,
array_size_in_samps=number_of_channels * 100,
)
)
print(
f"Acquired {len(read_response.read_array)} samples",
f"({read_response.samps_per_chan_read} samples per channel)",
)
print("Read Data (first 10 samples):", read_response.read_array[:10])
print("read_data Finished")
async def wait_for_done():
async for done_response in daq_client.register_done_event(task=task):
await _raise_if_error_sync(done_response)
break
print("wait_for_done Finished")
start_task = raise_if_error_async(daq_client.start_task(task=task))
print("Starting")
await asyncio.gather(read_data(), wait_for_done(), start_task)
print("Done") This will end up hanging indefinitely. The print statements for Is there some way around this, or a suggested approach here? (I'm guessing it could be done by re-implementing the logic of Side question: is the |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
Tough question, thanks! You can get it to work using The snippet below worked for me. I think you're also correct that you could mix in lower-level calls as an alternative. You could also consider using mypy-protobuf to add type hints to the default protoc gen code. And/or use blocking reads instead of events if that can work for you. This is an unfortunate combination of every_n_samples_stream = daq_service.register_every_n_samples_event(
task=task,
n_samples=100,
every_n_samples_event_type_raw=nidaqmx_grpc.EveryNSamplesEventType.EVERY_N_SAMPLES_EVENT_TYPE_ACQUIRED_INTO_BUFFER,
)
done_stream = daq_service.register_done_event(task=task)
async def read_data():
async for every_n_samples_response in every_n_samples_stream:
await raise_if_error(every_n_samples_response)
read_response = await raise_if_error(
await daq_service.read_analog_f64(
task=task,
num_samps_per_chan=100,
fill_mode_raw=nidaqmx_grpc.GroupBy.GROUP_BY_GROUP_BY_CHANNEL,
array_size_in_samps=100,
)
)
print(
f"Acquired {len(read_response.read_array)} samples",
f"({read_response.samps_per_chan_read} samples per channel)",
)
print("Read Data (first 10 samples):", read_response.read_array[:10])
print("read_data Finished")
async def wait_for_done():
async for done_response in done_stream:
await every_n_samples_stream.aclose()
await done_stream.aclose()
await raise_if_error(done_response)
print("wait_for_done Finished")
acquisition = asyncio.gather(read_data(), wait_for_done())
# How to wait for initial metadata in betterproto?
await sleep(2)
await raise_if_error(await daq_service.start_task(task=task))
print("Starting")
await acquisition
print("Done") |
Beta Was this translation helpful? Give feedback.
Tough question, thanks!
You can get it to work using
aclose
the same way the other example usescancel
inwait_for_done
. But, I can't figure out a way to get past your "Side Question" without adding asleep
. That's frustrating. Maybe that could be added as a feature tobetterproto
(not sure how that would work). That project hasn't been very active lately, unfortunately.The snippet below worked for me. I think you're also correct that you could mix in lower-level calls as an alternative.
betterproto
wrapsgrpclib
internally.You could also consider using mypy-protobuf to add type hints to the default protoc gen code. And/or use blocking reads instead of events if that can work for you.
T…