diff --git a/src/ophyd_async/epics/signal/signal.py b/src/ophyd_async/epics/signal/signal.py index d4259c66c8..3aa071ad0d 100644 --- a/src/ophyd_async/epics/signal/signal.py +++ b/src/ophyd_async/epics/signal/signal.py @@ -58,6 +58,25 @@ def epics_signal_rw( return SignalRW(backend) +def epics_signal_rw_rbv( + datatype: Type[T], + write_pv: Optional[str] = None, + read_suffix: str = "_RBV", +) -> SignalRW[T]: + """Create a `SignalRW` backed by 1 or 2 EPICS PVs, with a suffix on the readback pv + + Parameters + ---------- + datatype: + Check that the PV is of this type + write_pv: + The PV to write to + read_suffix: + Append this suffix to the write pv to create the readback pv + """ + return epics_signal_rw(datatype, f"{write_pv}{read_suffix}", write_pv) + + def epics_signal_r(datatype: Type[T], read_pv: str) -> SignalR[T]: """Create a `SignalR` backed by 1 EPICS PV diff --git a/tests/core/test_signal.py b/tests/core/test_signal.py index 5393cb1a78..e7460ac4a9 100644 --- a/tests/core/test_signal.py +++ b/tests/core/test_signal.py @@ -14,6 +14,13 @@ wait_for_value, ) from ophyd_async.core.utils import DEFAULT_TIMEOUT +from ophyd_async.epics.signal.signal import ( + epics_signal_r, + epics_signal_rw, + epics_signal_rw_rbv, + epics_signal_w, + epics_signal_x, +) class MySignal(Signal): @@ -119,3 +126,30 @@ async def test_set_and_wait_for_value(): assert not st.done set_sim_put_proceeds(sim_signal, True) assert await time_taken_by(st) < 0.1 + + +def test_signal_helpers(): + read_write = epics_signal_rw(int, "ReadWrite") + assert read_write._backend.read_pv == "ReadWrite" + assert read_write._backend.write_pv == "ReadWrite" + + read_write_rbv_manual = epics_signal_rw(int, "ReadWrite_RBV", "ReadWrite") + assert read_write_rbv_manual._backend.read_pv == "ReadWrite_RBV" + assert read_write_rbv_manual._backend.write_pv == "ReadWrite" + + read_write_rbv = epics_signal_rw_rbv(int, "ReadWrite") + assert read_write_rbv._backend.read_pv == "ReadWrite_RBV" + assert read_write_rbv._backend.write_pv == "ReadWrite" + + read_write_rbv_suffix = epics_signal_rw_rbv(int, "ReadWrite", read_suffix=":RBV") + assert read_write_rbv_suffix._backend.read_pv == "ReadWrite:RBV" + assert read_write_rbv_suffix._backend.write_pv == "ReadWrite" + + read = epics_signal_r(int, "Read") + assert read._backend.read_pv == "Read" + + write = epics_signal_w(int, "Write") + assert write._backend.write_pv == "Write" + + execute = epics_signal_x("Execute") + assert execute._backend.write_pv == "Execute"