Skip to content

Commit

Permalink
wait_for_value_interface_change (#652)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZohebShaikh authored Nov 19, 2024
1 parent 9ea28ad commit e4b18e5
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 32 deletions.
45 changes: 45 additions & 0 deletions docs/how-to/use_set_and_wait_for_other_value.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# `set_and_wait_for_other_value`

The `set_and_wait_for_other_value` function (defined in `_signal.py`) is a utility designed to:

1. **Set a signal**.
2. **Wait for another signal to reach a specified value (`match_value`)**.

The behavior of the function depends on the value of the `wait_for_set_completion` parameter:

- **If `wait_for_set_completion = True`:**
The function returns at **1** (see diagram below), which occurs when the "set operation" is complete.

- **If `wait_for_set_completion = False`:**
The function returns at **2**, which occurs when the `match_signal` reaches the `match_value`.

In **AreaDetector**, the `wait_for_set_completion` parameter should generally be set to **`False`**, as the preferred behavior is to return when the `match_signal` achieves the `match_value`.

---

## Behavior Diagram:

![wait_for_set_completion](../images/set_and_wait_for_other_value.svg)

---

## Example Usage

```python
# Example code snippet for using set_and_wait_for_other_value in an AreaDetector driver
self._arm_status = set_and_wait_for_other_value(
self._drv.arm,
1,
self._drv.state,
"ready",
timeout=DEFAULT_TIMEOUT,
wait_for_set_completion=False,
)
```

In this **AreaDetector driver**, the function ensures that the detector's `arm_status` reaches `"ready"` before capturing data. To achieve this:

- The detector is instructed to transition to the "armed" state by setting the driver's `arm` signal to `1`.
- The function waits until the driver's `state` signal equals `"ready"`.

This approach ensures that data capture is only initiated once the detector is fully prepared.
10 changes: 10 additions & 0 deletions docs/images/set_and_wait_for_other_value.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions src/ophyd_async/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
assert_emitted,
assert_reading,
assert_value,
observe_signals_value,
observe_value,
set_and_wait_for_other_value,
set_and_wait_for_value,
Expand Down Expand Up @@ -158,6 +159,7 @@
"assert_reading",
"assert_value",
"observe_value",
"observe_signals_value",
"set_and_wait_for_value",
"set_and_wait_for_other_value",
"soft_signal_r_and_setter",
Expand Down
107 changes: 90 additions & 17 deletions src/ophyd_async/core/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
SignalDatatypeV,
)
from ._soft_signal_backend import SoftSignalBackend
from ._status import AsyncStatus
from ._status import AsyncStatus, completed_status
from ._utils import (
CALCULATE_TIMEOUT,
DEFAULT_TIMEOUT,
Expand Down Expand Up @@ -448,27 +448,79 @@ async def observe_value(
do_something_with(value)
"""

q: asyncio.Queue[SignalDatatypeT | Status] = asyncio.Queue()
async for _, value in observe_signals_value(
signal, timeout=timeout, done_status=done_status
):
yield value


async def observe_signals_value(
*signals: SignalR[SignalDatatypeT],
timeout: float | None = None,
done_status: Status | None = None,
) -> AsyncGenerator[tuple[SignalR[SignalDatatypeT], SignalDatatypeT], None]:
"""Subscribe to the value of a signal so it can be iterated from.
Parameters
----------
signals:
Call subscribe_value on all the signals at the start, and clear_sub on it at the
end
timeout:
If given, how long to wait for each updated value in seconds. If an update
is not produced in this time then raise asyncio.TimeoutError
done_status:
If this status is complete, stop observing and make the iterator return.
If it raises an exception then this exception will be raised by the iterator.
Notes
-----
Example usage::
async for signal,value in observe_signals_values(sig1,sig2,..):
if signal is sig1:
do_something_with(value)
elif signal is sig2:
do_something_else_with(value)
"""
q: asyncio.Queue[tuple[SignalR[SignalDatatypeT], SignalDatatypeT] | Status] = (
asyncio.Queue()
)
if timeout is None:
get_value = q.get
else:

async def get_value():
return await asyncio.wait_for(q.get(), timeout)

cbs: dict[SignalR, Callback] = {}
for signal in signals:

def queue_value(value: SignalDatatypeT, signal=signal):
q.put_nowait((signal, value))

cbs[signal] = queue_value
signal.subscribe_value(queue_value)

if done_status is not None:
done_status.add_callback(q.put_nowait)

signal.subscribe_value(q.put_nowait)
try:
while True:
# yield here in case something else is filling the queue
# like in test_observe_value_times_out_with_no_external_task()
await asyncio.sleep(0)
item = await asyncio.wait_for(q.get(), timeout)
item = await get_value()
if done_status and item is done_status:
if exc := done_status.exception():
raise exc
else:
break
else:
yield cast(SignalDatatypeT, item)
yield cast(tuple[SignalR[SignalDatatypeT], SignalDatatypeT], item)
finally:
signal.clear_sub(q.put_nowait)
for signal, cb in cbs.items():
signal.clear_sub(cb)


class _ValueChecker(Generic[SignalDatatypeT]):
Expand Down Expand Up @@ -533,30 +585,33 @@ async def wait_for_value(
async def set_and_wait_for_other_value(
set_signal: SignalW[SignalDatatypeT],
set_value: SignalDatatypeT,
read_signal: SignalR[SignalDatatypeV],
read_value: SignalDatatypeV,
match_signal: SignalR[SignalDatatypeV],
match_value: SignalDatatypeV | Callable[[SignalDatatypeV], bool],
timeout: float = DEFAULT_TIMEOUT,
set_timeout: float | None = None,
wait_for_set_completion: bool = True,
) -> AsyncStatus:
"""Set a signal and monitor another signal until it has the specified value.
This function sets a set_signal to a specified set_value and waits for
a read_signal to have the read_value.
a match_signal to have the match_value.
Parameters
----------
signal:
The signal to set
set_value:
The value to set it to
read_signal:
match_signal:
The signal to monitor
read_value:
match_value:
The value to wait for
timeout:
How long to wait for the signal to have the value
set_timeout:
How long to wait for the set to complete
wait_for_set_completion:
This will wait for set completion #More info in how-to docs
Notes
-----
Expand All @@ -565,36 +620,41 @@ async def set_and_wait_for_other_value(
set_and_wait_for_value(device.acquire, 1, device.acquire_rbv, 1)
"""
# Start monitoring before the set to avoid a race condition
values_gen = observe_value(read_signal)
values_gen = observe_value(match_signal)

# Get the initial value from the monitor to make sure we've created it
current_value = await anext(values_gen)

status = set_signal.set(set_value, timeout=set_timeout)

# If the value was the same as before no need to wait for it to change
if current_value != read_value:
if current_value != match_value:

async def _wait_for_value():
async for value in values_gen:
if value == read_value:
if value == match_value:
break

try:
await asyncio.wait_for(_wait_for_value(), timeout)
if wait_for_set_completion:
await status
return status
except asyncio.TimeoutError as e:
raise TimeoutError(
f"{read_signal.name} didn't match {read_value} in {timeout}s"
f"{match_signal.name} didn't match {match_value} in {timeout}s"
) from e

return status
return completed_status()


async def set_and_wait_for_value(
signal: SignalRW[SignalDatatypeT],
value: SignalDatatypeT,
match_value: SignalDatatypeT | Callable[[SignalDatatypeT], bool] | None = None,
timeout: float = DEFAULT_TIMEOUT,
status_timeout: float | None = None,
wait_for_set_completion: bool = True,
) -> AsyncStatus:
"""Set a signal and monitor it until it has that value.
Expand All @@ -609,17 +669,30 @@ async def set_and_wait_for_value(
The signal to set
value:
The value to set it to
match_value:
The expected value of the signal after the operation.
Used to verify that the set operation was successful.
timeout:
How long to wait for the signal to have the value
status_timeout:
How long the returned Status will wait for the set to complete
wait_for_set_completion:
This will wait for set completion #More info in how-to docs
Notes
-----
Example usage::
set_and_wait_for_value(device.acquire, 1)
"""
if match_value is None:
match_value = value
return await set_and_wait_for_other_value(
signal, value, signal, value, timeout, status_timeout
signal,
value,
signal,
match_value,
timeout,
status_timeout,
wait_for_set_completion,
)
4 changes: 3 additions & 1 deletion src/ophyd_async/epics/adcore/_core_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ async def start_acquiring_driver_and_ensure_status(
subsequent raising (if applicable) due to detector state.
"""

status = await set_and_wait_for_value(driver.acquire, True, timeout=timeout)
status = await set_and_wait_for_value(
driver.acquire, True, timeout=timeout, wait_for_set_completion=False
)

async def complete_acquisition() -> None:
"""NOTE: possible race condition here between the callback from
Expand Down
7 changes: 6 additions & 1 deletion src/ophyd_async/epics/eiger/_eiger_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ async def prepare(self, trigger_info: TriggerInfo):
async def arm(self):
# TODO: Detector state should be an enum see https://github.com/DiamondLightSource/eiger-fastcs/issues/43
self._arm_status = set_and_wait_for_other_value(
self._drv.arm, 1, self._drv.state, "ready", timeout=DEFAULT_TIMEOUT
self._drv.arm,
1,
self._drv.state,
"ready",
timeout=DEFAULT_TIMEOUT,
wait_for_set_completion=False,
)

async def wait_for_idle(self):
Expand Down
29 changes: 28 additions & 1 deletion tests/core/test_observe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@

import pytest

from ophyd_async.core import AsyncStatus, observe_value, soft_signal_r_and_setter
from ophyd_async.core import (
AsyncStatus,
observe_signals_value,
observe_value,
soft_signal_r_and_setter,
)


async def test_observe_value_working_correctly():
Expand All @@ -22,6 +27,28 @@ async def tick():
await status


async def test_observes_signals_values_working_correctly():
sig1, setter1 = soft_signal_r_and_setter(float)
sig2, setter2 = soft_signal_r_and_setter(float)

async def tick():
for i in range(2):
await asyncio.sleep(0.01)
setter1(i + 1)
setter2(i + 10)

recv1 = []
recv2 = []
status = AsyncStatus(tick())
async for signal, value in observe_signals_value(sig1, sig2, done_status=status):
if signal is sig1:
recv1.append(value)
elif signal is sig2:
recv2.append(value)
assert recv1 == [0, 1, 2] and recv2 == [0, 10, 11]
await status


async def test_observe_value_times_out():
sig, setter = soft_signal_r_and_setter(float)

Expand Down
Loading

0 comments on commit e4b18e5

Please sign in to comment.