diff --git a/src/fastcs/transport/epics/ca/ioc.py b/src/fastcs/transport/epics/ca/ioc.py index ac9ca00d7..36e4ca014 100644 --- a/src/fastcs/transport/epics/ca/ioc.py +++ b/src/fastcs/transport/epics/ca/ioc.py @@ -240,6 +240,7 @@ async def wrapped_method(_: Any): record = builder.Action( f"{pv_prefix}:{pv_name}", on_update=wrapped_method, + blocking=True, ) _add_attr_pvi_info(record, pv_prefix, attr_name, "x") diff --git a/src/fastcs/transport/epics/gui.py b/src/fastcs/transport/epics/gui.py index d08f8ce5e..ff586def1 100644 --- a/src/fastcs/transport/epics/gui.py +++ b/src/fastcs/transport/epics/gui.py @@ -1,4 +1,4 @@ -from pvi._format.dls import DLSFormatter +from pvi._format.dls import DLSFormatter # type: ignore from pvi.device import ( LED, ButtonPanel, diff --git a/src/fastcs/transport/epics/pva/_pv_handlers.py b/src/fastcs/transport/epics/pva/_pv_handlers.py index 801d4e63e..db6a1c6aa 100644 --- a/src/fastcs/transport/epics/pva/_pv_handlers.py +++ b/src/fastcs/transport/epics/pva/_pv_handlers.py @@ -81,11 +81,21 @@ async def put(self, pv: SharedPV, op: ServerOperation): "Maybe the command should spawn an asyncio task?" ) + # Check if record block request recieved + match op.pvRequest().todict(): + case {"record": {"_options": {"block": "true"}}}: + blocking = True + case _: + blocking = False + # Flip to true once command task starts pv.post({"value": True, **p4p_timestamp_now(), **p4p_alarm_states()}) - op.done() + if not blocking: + op.done() alarm_states = await self._run_command() pv.post({"value": False, **p4p_timestamp_now(), **alarm_states}) + if blocking: + op.done() else: raise RuntimeError("Commands should only take the value `True`.") diff --git a/tests/transport/epics/ca/test_softioc.py b/tests/transport/epics/ca/test_softioc.py index 9c758bfea..b27703d49 100644 --- a/tests/transport/epics/ca/test_softioc.py +++ b/tests/transport/epics/ca/test_softioc.py @@ -288,7 +288,9 @@ def test_ioc(mocker: MockerFixture, epics_controller_api: ControllerAPI): epics_controller_api.attributes["write_bool"].datatype ), ) - ioc_builder.Action.assert_any_call(f"{DEVICE}:Go", on_update=mocker.ANY) + ioc_builder.Action.assert_any_call( + f"{DEVICE}:Go", on_update=mocker.ANY, blocking=True + ) # Check info tags are added add_pvi_info.assert_called_once_with(f"{DEVICE}:PVI") @@ -474,8 +476,7 @@ def test_long_pv_names_discarded(mocker: MockerFixture): short_command_pv_name = "command_short_name".title().replace("_", "") ioc_builder.Action.assert_called_once_with( - f"{DEVICE}:{short_command_pv_name}", - on_update=mocker.ANY, + f"{DEVICE}:{short_command_pv_name}", on_update=mocker.ANY, blocking=True ) with pytest.raises(AssertionError): long_command_pv_name = long_command_name.title().replace("_", "") diff --git a/tests/transport/epics/pva/test_p4p.py b/tests/transport/epics/pva/test_p4p.py index ce192998d..bd98c4a80 100644 --- a/tests/transport/epics/pva/test_p4p.py +++ b/tests/transport/epics/pva/test_p4p.py @@ -574,11 +574,15 @@ async def some_task(): async def put_pvs(): await asyncio.sleep(0.1) ctxt = Context("pva") - await ctxt.put(f"{pv_prefix}:CommandSpawnsATask", True) - await ctxt.put(f"{pv_prefix}:CommandSpawnsATask", True) - await ctxt.put(f"{pv_prefix}:CommandRunsForAWhile", True) + await asyncio.gather( + ctxt.put(f"{pv_prefix}:CommandSpawnsATask", True), + ctxt.put(f"{pv_prefix}:CommandSpawnsATask", True), + ) assert expected_error_string not in caplog.text - await ctxt.put(f"{pv_prefix}:CommandRunsForAWhile", True) + await asyncio.gather( + ctxt.put(f"{pv_prefix}:CommandRunsForAWhile", True), + ctxt.put(f"{pv_prefix}:CommandRunsForAWhile", True), + ) assert expected_error_string in caplog.text serve = asyncio.ensure_future(fastcs.serve()) @@ -623,3 +627,46 @@ async def put_pvs(): pytest.approx((coro_end_time - coro_start_time).total_seconds(), abs=0.05) == 0.1 ) + + +def test_block_flag_waits_for_callback_completion(): + class SomeController(Controller): + @command() + async def command_runs_for_a_while(self): + await asyncio.sleep(0.2) + + controller = SomeController() + pv_prefix = str(uuid4()) + fastcs = make_fastcs(pv_prefix, controller) + command_runs_for_a_while_times = [] + + async def put_pvs(): + ctxt = Context("pva") + for block in [True, False]: + start_time = datetime.now() + await ctxt.put( + f"{pv_prefix}:CommandRunsForAWhile", + True, + wait=block, + ) + command_runs_for_a_while_times.append((start_time, datetime.now())) + + serve = asyncio.ensure_future(fastcs.serve()) + try: + asyncio.get_event_loop().run_until_complete( + asyncio.wait_for( + asyncio.gather(serve, put_pvs()), + timeout=0.5, + ) + ) + except TimeoutError: + ... + serve.cancel() + + assert len(command_runs_for_a_while_times) == 2 + + for put_call, expected_duration in enumerate([0.2, 0]): + start, end = command_runs_for_a_while_times[put_call] + assert ( + pytest.approx((end - start).total_seconds(), abs=0.05) == expected_duration + )