-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathtest_request_queue.py
123 lines (97 loc) · 5.93 KB
/
test_request_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from __future__ import annotations
import secrets
import string
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from apify_client import ApifyClient, ApifyClientAsync
def random_string(length: int = 10) -> str:
return ''.join(secrets.choice(string.ascii_letters) for _ in range(length))
def random_queue_name() -> str:
return f'python-client-test-queue-{random_string(5)}'
class TestRequestQueueSync:
def test_request_queue_lock(self, apify_client: ApifyClient) -> None:
created_queue = apify_client.request_queues().get_or_create(name=random_queue_name())
queue = apify_client.request_queue(created_queue['id'], client_key=random_string(10))
# Add requests and check if correct number of requests was locked
for i in range(15):
queue.add_request({'url': f'http://test-lock.com/{i}', 'uniqueKey': f'http://test-lock.com/{i}'})
locked_requests_list = queue.list_and_lock_head(limit=10, lock_secs=10)
locked_requests = locked_requests_list['items']
for locked_request in locked_requests:
assert locked_request['lockExpiresAt'] is not None
# Check if the delete request works
queue.delete_request_lock(locked_requests[1]['id'])
delete_lock_request = queue.get_request(locked_requests[1]['id'])
assert delete_lock_request is not None
assert delete_lock_request.get('lockExpiresAt') is None
queue.delete_request_lock(locked_requests[2]['id'], forefront=True)
delete_lock_request2 = queue.get_request(locked_requests[2]['id'])
assert delete_lock_request2 is not None
assert delete_lock_request2.get('lockExpiresAt') is None
# Check if the prolong request works
assert queue.prolong_request_lock(locked_requests[3]['id'], lock_secs=15)['lockExpiresAt'] is not None
queue.delete()
assert apify_client.request_queue(created_queue['id']).get() is None
def test_request_batch_operations(self, apify_client: ApifyClient) -> None:
created_queue = apify_client.request_queues().get_or_create(name=random_queue_name())
queue = apify_client.request_queue(created_queue['id'])
# Add requests to queue and check if they were added
requests_to_add = [
{'url': f'http://test-batch.com/{i}', 'uniqueKey': f'http://test-batch.com/{i}'} for i in range(25)
]
added_requests = queue.batch_add_requests(requests_to_add)
assert len(added_requests.get('processedRequests', [])) > 0
requests_in_queue = queue.list_requests()
assert len(requests_in_queue['items']) == len(added_requests['processedRequests'])
# Delete requests from queue and check if they were deleted
requests_to_delete = requests_in_queue['items'][:20]
delete_response = queue.batch_delete_requests(
[{'uniqueKey': req.get('uniqueKey')} for req in requests_to_delete]
)
requests_in_queue2 = queue.list_requests()
assert len(requests_in_queue2['items']) == 25 - len(delete_response['processedRequests'])
queue.delete()
class TestRequestQueueAsync:
async def test_request_queue_lock(self, apify_client_async: ApifyClientAsync) -> None:
created_queue = await apify_client_async.request_queues().get_or_create(name=random_queue_name())
queue = apify_client_async.request_queue(created_queue['id'], client_key=random_string(10))
# Add requests and check if correct number of requests was locked
for i in range(15):
await queue.add_request({'url': f'http://test-lock.com/{i}', 'uniqueKey': f'http://test-lock.com/{i}'})
locked_requests_list = await queue.list_and_lock_head(limit=10, lock_secs=10)
locked_requests = locked_requests_list['items']
for locked_request in locked_requests:
assert locked_request['lockExpiresAt'] is not None
# Check if the delete request works
await queue.delete_request_lock(locked_requests[1]['id'])
delete_lock_request = await queue.get_request(locked_requests[1]['id'])
assert delete_lock_request is not None
assert delete_lock_request.get('lockExpiresAt') is None
await queue.delete_request_lock(locked_requests[2]['id'], forefront=True)
delete_lock_request2 = await queue.get_request(locked_requests[2]['id'])
assert delete_lock_request2 is not None
assert delete_lock_request2.get('lockExpiresAt') is None
# Check if the prolong request works
prolonged_request = await queue.prolong_request_lock(locked_requests[3]['id'], lock_secs=15)
assert prolonged_request['lockExpiresAt'] is not None
await queue.delete()
assert await apify_client_async.request_queue(created_queue['id']).get() is None
async def test_request_batch_operations(self, apify_client_async: ApifyClientAsync) -> None:
created_queue = await apify_client_async.request_queues().get_or_create(name=random_queue_name())
queue = apify_client_async.request_queue(created_queue['id'])
# Add requests to queue and check if they were added
requests_to_add = [
{'url': f'http://test-batch.com/{i}', 'uniqueKey': f'http://test-batch.com/{i}'} for i in range(25)
]
added_requests = await queue.batch_add_requests(requests_to_add)
assert len(added_requests.get('processedRequests', [])) > 0
requests_in_queue = await queue.list_requests()
assert len(requests_in_queue['items']) == len(added_requests['processedRequests'])
# Delete requests from queue and check if they were deleted
requests_to_delete = requests_in_queue['items'][:20]
delete_response = await queue.batch_delete_requests(
[{'uniqueKey': req.get('uniqueKey')} for req in requests_to_delete]
)
requests_in_queue2 = await queue.list_requests()
assert len(requests_in_queue2['items']) == 25 - len(delete_response['processedRequests'])
await queue.delete()