-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_asyncqueue.py
121 lines (100 loc) · 3.38 KB
/
test_asyncqueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import pytest # type: ignore
from queue import Queue
# from asyncio.queues import QueueEmpty, QueueFull
from asyncio import (
Task,
create_task,
sleep,
timeout,
TimeoutError,
QueueEmpty,
QueueFull,
)
from queutils import AsyncQueue
QSIZE: int = 10
N: int = 100 # N >> QSIZE
# N : int = int(1e10)
@pytest.fixture
def test_asyncqueue_int() -> AsyncQueue[int]:
Q: Queue[int] = Queue[int](maxsize=QSIZE) # queue.Queue / non-async
return AsyncQueue(queue=Q)
async def _producer_int(Q: AsyncQueue[int], n: int):
for i in range(n):
await Q.put(i)
async def _consumer_int(Q: AsyncQueue[int], n: int = -1):
while n != 0:
_ = await Q.get()
Q.task_done()
n -= 1
# Test: put(), get(), join(), qsize()
@pytest.mark.timeout(5)
@pytest.mark.asyncio
async def test_1_put_get_async(test_asyncqueue_int: AsyncQueue[int]):
Q = test_asyncqueue_int
consumer: Task = create_task(_consumer_int(Q))
try:
async with timeout(3):
await _producer_int(Q, N)
except TimeoutError:
assert False, "AsyncQueue got stuck"
await Q.join()
assert Q.qsize() == 0, "queue not empty"
assert Q.empty(), "queue not empty"
assert Q.items == N, "Queue items counted wrong"
consumer.cancel()
@pytest.mark.timeout(10)
@pytest.mark.asyncio
async def test_2_put_get_nowait(test_asyncqueue_int: AsyncQueue[int]):
Q = test_asyncqueue_int
producer: Task = create_task(_producer_int(Q, N))
await sleep(1)
# In theory this could fail without a real error
# if QSIZE is huge and/or system is slow
assert Q.qsize() == Q.maxsize, "Queue was supposed to be at maxsize"
assert Q.full(), "Queue should be full"
assert Q.items == Q.maxsize, "Queue items counted wrong"
assert not Q.empty(), "Queue should not be empty"
assert Q.done == 0, "No queue items have been done"
try:
Q.put_nowait(1)
assert False, "Queue was supposed to be full, but was not"
except QueueFull:
pass # OK, Queue was supposed to be full
try:
while True:
_ = Q.get_nowait()
Q.task_done()
await sleep(0.01)
except QueueEmpty:
assert Q.qsize() == 0, "Queue size should be zero"
assert Q.done == N, f"Items done: {Q.done}"
try:
async with timeout(5):
await Q.join()
except TimeoutError:
assert False, "Queue.join() took longer than it should"
assert producer.done(), "producer has not finished"
assert Q.qsize() == 0, "queue not empty"
assert Q.empty(), "queue not empty"
# @pytest.mark.timeout(10)
# @pytest.mark.asyncio
# async def test_3_from_Queue() -> None:
# queue: Queue[int] = Queue()
# Q: AsyncQueue[int] = AsyncQueue.from_queue(queue)
# assert Q.maxsize == 0, "maxsize should be 0 since queue.Queue does not support maxsize property"
# try:
# Q.task_done()
# assert False, "This should raise ValueError"
# except ValueError:
# pass # OK
# await Q.put(1)
# assert Q.done == 0, "done should be 0"
# assert Q.items == 1, "Queue items counted wrong"
# _ = await Q.get()
# assert Q.done == 0, "done should be 0"
# try:
# Q.task_done()
# assert True, "This should not raise ValueError"
# except ValueError:
# assert False, "This should not raise ValueError"
# assert Q.done == 1, "done should be 1"