41
41
# Must use isolation_level=None for consistency between Python 3.8 and 3.12
42
42
# Can't use the STRICT keyword for tables, requires sqlite 3.37.0
43
43
# Can't use the octet_length() either, requires sqlite 3.43.0
44
+ # Can't use DELETE ... RETURNING, requires sqlite 3.35.0
44
45
#
45
46
# Ubuntu 20.04 Python 3.8.2 Sqlite 3.31.1 Adds UPSERT, window functions
46
47
# Ubuntu 22.04 Python 3.10.x Sqlite 3.37.2 Adds STRICT tables, JSON ops
@@ -80,6 +81,7 @@ class Buffer:
80
81
"""tracing data ids buffered during this dispatch invocation."""
81
82
observed = False
82
83
"""Marks that data from this dispatch invocation has been marked observed."""
84
+ stored : int | None = None
83
85
84
86
def __init__ (self , path : str ):
85
87
self .path = path
@@ -165,19 +167,17 @@ def pump(self, chunk: bytes | None = None) -> tuple[int, bytes] | None:
165
167
# - or a read transaction later upgraded to write (check space, then delete some)
166
168
# currently I've made `self.tx()` return a write transaction always
167
169
# which is safer, but may incur a filesystem modification cost.
170
+ collected_size = 0
171
+ chunklen = 0
168
172
with self .tx (readonly = not chunk ) as conn :
169
173
if chunk :
170
174
# Ensure that there's enough space in the buffer
171
175
chunklen = (len (chunk ) + 4095 ) // 4096 * 4096
172
- stored : int | None = conn .execute (
173
- """
174
- SELECT sum((length(data)+4095)/4096*4096)
175
- FROM tracing
176
- """
177
- ).fetchone ()[0 ]
176
+
178
177
# TODO: expose `stored` in metrics, one day
179
- excess = (stored or 0 ) + chunklen - BUFFER_SIZE
180
- logging .debug (f'{ excess = } ' )
178
+ if self .stored is None :
179
+ self .stored = self ._stored_size (conn )
180
+ excess = self .stored + chunklen - BUFFER_SIZE
181
181
182
182
if excess > 0 :
183
183
# Drop lower-priority, older data
@@ -190,15 +190,13 @@ def pump(self, chunk: bytes | None = None) -> tuple[int, bytes] | None:
190
190
)
191
191
192
192
collected_ids : set [int ] = set ()
193
- collected_size : int = 0
194
193
for id_ , size in cursor :
195
194
collected_ids .add (id_ )
196
195
collected_size += size
197
196
if collected_size > excess :
198
197
break
199
198
200
199
assert collected_ids
201
- logging .debug (f'{ len (collected_ids )= } ' )
202
200
conn .execute (
203
201
f"""
204
202
DELETE FROM tracing
@@ -222,7 +220,7 @@ def pump(self, chunk: bytes | None = None) -> tuple[int, bytes] | None:
222
220
self .ids .add (cursor .lastrowid )
223
221
224
222
# Return oldest important data
225
- return conn .execute (
223
+ rv = conn .execute (
226
224
"""
227
225
SELECT id, data
228
226
FROM tracing
@@ -231,14 +229,43 @@ def pump(self, chunk: bytes | None = None) -> tuple[int, bytes] | None:
231
229
"""
232
230
).fetchone ()
233
231
232
+ assert self .stored is not None
233
+ self .stored += chunklen - collected_size
234
+ return rv
235
+
236
+ def _stored_size (self , conn : sqlite3 .Connection ) -> int :
237
+ """Must be called in a transaction."""
238
+ stored : int | None = conn .execute (
239
+ """
240
+ SELECT sum((length(data)+4095)/4096*4096)
241
+ FROM tracing
242
+ """
243
+ ).fetchone ()[0 ]
244
+ return stored or 0
245
+
234
246
@retry
235
- def remove (self , id_ : int ):
247
+ def remove (self , id_ : int ) -> None :
236
248
with self .tx () as conn :
249
+ # NOTE: can't use the RETURNING clause
250
+ row = conn .execute (
251
+ """
252
+ SELECT (length(data)+4095)/4096*4096
253
+ FROM tracing
254
+ WHERE id = ?
255
+ """
256
+ ).fetchone ()
257
+
258
+ if not row :
259
+ return
260
+
237
261
conn .execute (
238
262
"""
239
263
DELETE FROM tracing
240
264
WHERE id = ?
241
265
""" ,
242
266
(id_ ,),
243
267
)
268
+
244
269
self .ids -= {id_ }
270
+ if self .stored is not None :
271
+ self .stored -= row [0 ]
0 commit comments