Skip to content

Commit

Permalink
AsyncBool/Value: add held_for option to wait_value()
Browse files Browse the repository at this point in the history
  • Loading branch information
belm0 committed Jun 29, 2020
1 parent 0cb3df0 commit 2f15e56
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 14 deletions.
25 changes: 19 additions & 6 deletions src/trio_util/_async_bool.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,28 @@ def value(self, x):
self._edge_events[x].unpark_all()
self._edge_events[None].unpark_all()

async def wait_value(self, value):
"""Wait until given value."""
async def wait_value(self, value, *, held_for=0):
"""Wait until given value.
If held_for > 0, the value must match for that duration from the
time of the call. "held" means that the value is continuously in the
requested state.
"""
if not isinstance(value, _BOOL_TYPES):
raise TypeError
if value != self.value:
await self._level_events[value].park()
else:
await trio.sleep(0)
while True:
if value != self.value:
await self._level_events[value].park()
else:
await trio.sleep(0)
if held_for > 0:
with trio.move_on_after(held_for):
if value == self.value:
await self._level_events[not value].park()
continue
break

# TODO: held_for
async def wait_transition(self, value=None):
"""Wait until transition to given value (default None which means any)."""
if not isinstance(value, _BOOL_OR_NONE_TYPES):
Expand Down
29 changes: 21 additions & 8 deletions src/trio_util/_async_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,37 @@ def value(self, x):
result.event.unpark_all()
del self._edge_results[f]

async def wait_value(self, predicate):
async def wait_value(self, predicate, *, held_for=0):
"""
Wait until given predicate f(value) is True.
The predicate is tested immediately and, if false, whenever
the `value` property changes.
If held_for > 0, the predicate must match for that duration from the
time of the call. "held" means that the predicate is continuously true.
returns value which satisfied the predicate
(when held_for > 0, it's the most recent value)
"""
if not predicate(self._value):
result = self._level_results[predicate]
await result.event.park()
value = result.value
else:
value = self._value
await trio.sleep(0)
while True:
if not predicate(self._value):
result = self._level_results[predicate]
await result.event.park()
value = result.value
else:
value = self._value
await trio.sleep(0)
if held_for > 0:
with trio.move_on_after(held_for):
if predicate(self._value):
result = self._level_results[lambda v: not predicate(v)]
await result.event.park()
continue
break
return value

# TODO: held_for
async def wait_transition(self, predicate=None):
"""
Wait until given predicate f(value, old_value) is True.
Expand Down
30 changes: 30 additions & 0 deletions tests/test_async_bool.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import trio
from trio.testing import assert_checkpoints, wait_all_tasks_blocked

from trio_util import AsyncBool
Expand Down Expand Up @@ -93,3 +94,32 @@ async def listener(event: AsyncBool):
await wait_all_tasks_blocked()
x.value = False
await wait_all_tasks_blocked()


async def test_wait_value_held_for(nursery, autojump_clock):
test1_done = trio.Event()
test2_done = trio.Event()

async def listener(event: AsyncBool):
assert not event.value # condition already true
t0 = trio.current_time()
await event.wait_value(False, held_for=1)
assert trio.current_time() - t0 == 1
test1_done.set()

assert not event.value # condition not yet true
t0 = trio.current_time()
await event.wait_value(True, held_for=1)
assert trio.current_time() - t0 == 1.5
test2_done.set()

x = AsyncBool()
nursery.start_soon(listener, x)
await test1_done.wait()

x.value = True
await trio.sleep(.25)
x.value = False
await trio.sleep(.25)
x.value = True
await test2_done.wait()
30 changes: 30 additions & 0 deletions tests/test_async_value.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import trio
from trio.testing import assert_checkpoints, wait_all_tasks_blocked

from trio_util import AsyncValue
Expand Down Expand Up @@ -46,3 +47,32 @@ async def waiter(event: AsyncValue):
foo.value = 51
foo.value = 0
await wait_all_tasks_blocked()


async def test_wait_value_held_for(nursery, autojump_clock):
test1_done = trio.Event()
test2_done = trio.Event()

async def listener(event: AsyncValue):
assert event.value == 10 # condition already true
t0 = trio.current_time()
assert await event.wait_value(lambda x: x == 10, held_for=1) == 10
assert trio.current_time() - t0 == 1
test1_done.set()

assert event.value < 20 # condition not yet true
t0 = trio.current_time()
assert await event.wait_value(lambda x: x >= 20, held_for=1) == 22
assert trio.current_time() - t0 == 1.5
test2_done.set()

x = AsyncValue(10)
nursery.start_soon(listener, x)
await test1_done.wait()

x.value = 20
await trio.sleep(.25)
x.value = 5
await trio.sleep(.25)
x.value = 22
await test2_done.wait()

0 comments on commit 2f15e56

Please sign in to comment.