-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtest_object_store.py
More file actions
209 lines (158 loc) · 7.28 KB
/
test_object_store.py
File metadata and controls
209 lines (158 loc) · 7.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# SPDX-FileCopyrightText: 2025-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
import time
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from deepset_mcp.tokonomics import (
InMemoryBackend,
ObjectStore,
RedisBackend,
)
@pytest.fixture(params=["memory", "redis"])
def backend(request: Any) -> InMemoryBackend | RedisBackend:
"""Fixture providing both InMemoryBackend and mocked RedisBackend."""
if request.param == "memory":
return InMemoryBackend()
else: # request.param == "redis"
mock_redis = MagicMock()
mock_redis.from_url.return_value = mock_redis
# Mock Redis operations for consistent behavior
stored_data: dict[str, bytes] = {}
def mock_set(key: str, value: bytes) -> None:
stored_data[key] = value
def mock_setex(key: str, ttl: int, value: bytes) -> None:
stored_data[key] = value
def mock_get(key: str) -> bytes | None:
return stored_data.get(key)
def mock_delete(key: str) -> int:
if key in stored_data:
del stored_data[key]
return 1
return 0
mock_redis.set.side_effect = mock_set
mock_redis.setex.side_effect = mock_setex
mock_redis.get.side_effect = mock_get
mock_redis.delete.side_effect = mock_delete
with patch("redis.from_url", return_value=mock_redis):
return RedisBackend("redis://localhost:6379")
class TestObjectStore:
"""Test ObjectStore with parametrized backends."""
def test_init_default_ttl(self, backend: InMemoryBackend | RedisBackend) -> None:
"""Test ObjectStore initialization with default TTL."""
store = ObjectStore(backend=backend)
assert store._ttl == 600
assert store._backend == backend
def test_init_custom_ttl(self, backend: InMemoryBackend | RedisBackend) -> None:
"""Test ObjectStore initialization with custom TTL."""
store = ObjectStore(backend=backend, ttl=7200)
assert store._ttl == 7200
def test_init_zero_ttl(self, backend: InMemoryBackend | RedisBackend) -> None:
"""Test ObjectStore initialization with zero TTL (no expiry)."""
store = ObjectStore(backend=backend, ttl=0)
assert store._ttl == 0
def test_put_get_single_object(self, backend: InMemoryBackend | RedisBackend) -> None:
"""Test storing and retrieving a single object."""
store = ObjectStore(backend=backend)
test_obj = {"test": "data"}
obj_id = store.put(test_obj)
retrieved_obj = store.get(obj_id)
assert retrieved_obj == test_obj
def test_put_get_multiple_objects(self, backend: InMemoryBackend | RedisBackend) -> None:
"""Test storing and retrieving multiple objects."""
store = ObjectStore(backend=backend)
obj1 = {"first": "object"}
obj2 = {"second": "object"}
obj3 = [1, 2, 3]
id1 = store.put(obj1)
id2 = store.put(obj2)
id3 = store.put(obj3)
# All IDs should be unique
assert len({id1, id2, id3}) == 3
assert store.get(id1) == obj1
assert store.get(id2) == obj2
assert store.get(id3) == obj3
def test_get_nonexistent_object(self, backend: InMemoryBackend | RedisBackend) -> None:
"""Test retrieving a non-existent object."""
store = ObjectStore(backend=backend)
result = store.get("nonexistent_key")
assert result is None
def test_delete_existing_object(self, backend: InMemoryBackend | RedisBackend) -> None:
"""Test deleting an existing object."""
store = ObjectStore(backend=backend)
test_obj = {"test": "data"}
obj_id = store.put(test_obj)
result = store.delete(obj_id)
assert result is True
assert store.get(obj_id) is None
def test_delete_nonexistent_object(self, backend: InMemoryBackend | RedisBackend) -> None:
"""Test deleting a non-existent object."""
store = ObjectStore(backend=backend)
result = store.delete("nonexistent_key")
assert result is False
def test_json_serialization_with_set(self, backend: InMemoryBackend | RedisBackend) -> None:
"""Test JSON serialization with set objects."""
store = ObjectStore(backend=backend)
test_obj = {"tags": {1, 2, 3}, "name": "test"}
obj_id = store.put(test_obj)
retrieved = store.get(obj_id)
# Set should be converted to list
assert retrieved is not None
assert retrieved["name"] == "test"
assert set(retrieved["tags"]) == {1, 2, 3}
def test_json_serialization_with_pydantic(self, backend: InMemoryBackend | RedisBackend) -> None:
"""Test JSON serialization with Pydantic models."""
from pydantic import BaseModel
class TestModel(BaseModel):
name: str
age: int
store = ObjectStore(backend=backend)
model = TestModel(name="test", age=25)
obj_id = store.put(model)
retrieved = store.get(obj_id)
assert retrieved == {"name": "test", "age": 25}
def test_get_with_zero_ttl(self, backend: InMemoryBackend | RedisBackend) -> None:
"""Test that objects don't expire with zero TTL."""
store = ObjectStore(backend=backend, ttl=0)
test_obj = {"test": "data"}
obj_id = store.put(test_obj)
# Even after time passes, object should still exist
with patch("time.time", return_value=time.time() + 10000):
retrieved = store.get(obj_id)
assert retrieved == test_obj
@pytest.mark.parametrize("backend_name", ["memory"])
def test_ttl_expiration_memory_only(self, backend_name: str) -> None:
"""Test that objects expire after TTL (InMemoryBackend only)."""
# TTL expiration testing only works with InMemoryBackend due to time mocking
backend = InMemoryBackend()
store = ObjectStore(backend=backend, ttl=1)
test_obj = {"test": "data"}
obj_id = store.put(test_obj)
# Object should exist immediately
assert store.get(obj_id) == test_obj
# Mock time to be after TTL expiration
with patch("time.time", return_value=time.time() + 2.0):
result = store.get(obj_id)
assert result is None
@pytest.mark.parametrize("backend_name", ["memory"])
def test_partial_expiration_memory_only(self, backend_name: str) -> None:
"""Test that only expired objects are evicted (InMemoryBackend only)."""
backend = InMemoryBackend()
store = ObjectStore(backend=backend, ttl=2)
# Put first object
obj1 = {"first": "object"}
id1 = store.put(obj1)
# Wait 1 second and put second object
with patch("time.time", return_value=time.time() + 1.0):
obj2 = {"second": "object"}
id2 = store.put(obj2)
# Wait another 1.5 seconds (total 2.5) - only first object should expire
with patch("time.time", return_value=time.time() + 2.5):
obj3 = {"third": "object"}
id3 = store.put(obj3)
# First object should be expired
assert store.get(id1) is None
# Second and third objects should exist
assert store.get(id2) == obj2
assert store.get(id3) == obj3