@@ -161,6 +161,294 @@ def test_reclaiming_01(self):
161161 # now the writing of key=16 should success
162162 self ._write (16 )
163163
164+ def test_persist_recover_00 (self ):
165+ """Test e2e persist/recover: cache locations and metadata
166+ survive a normal server restart.
167+
168+ The meta indexer is configured with the dummy backend with
169+ filesystem persistence enabled so that
170+
171+ 1. cache locations are always flushed to disk, and
172+ 2. metadata like key_count and storage usage accounting data are
173+ flushed to disk *before* every metadata READ.
174+
175+ After a controlled server restart the instance group is
176+ re-registered (which reinitialise the MetaIndexer from the
177+ persisted file), and the test verifies:
178+
179+ 1. All block meta written before the restart are still
180+ addressable.
181+ 2. key_count was recovered (not reset to zero), so the capacity
182+ limit is enforced correctly.
183+ """
184+ add_storage_req = {
185+ "trace_id" : self ._trace_id ,
186+ "storage" : self ._make_dummy_storage (),
187+ }
188+ self ._admin_client .add_storage (add_storage_req )
189+
190+ # add instance group
191+ # start with the reclaiming trigger would not happen
192+ ig = self ._make_dummy_instance_group ()
193+ create_ig_req = {
194+ "trace_id" : self ._trace_id ,
195+ "instance_group" : ig ,
196+ }
197+ self ._admin_client .create_instance_group (create_ig_req )
198+
199+ # register instance
200+ reg_ins_data_req = self ._make_dummy_ins_req ()
201+ self ._client .register_instance (reg_ins_data_req )
202+
203+ # write 8 blocks (half of max_key_count=16)
204+ write_count = 8
205+ for i in range (write_count ):
206+ self ._write (i )
207+
208+ # --- restart ---
209+ self .worker_manager .stop_worker (0 )
210+ self .assertTrue (self .worker_manager .start_worker (0 ))
211+
212+ # reconnect clients: update_ports() assigns fresh ports on every
213+ # start
214+ self ._admin_client .close ()
215+ self ._client .close ()
216+ self ._admin_client , self ._client = self ._get_manager_client ()
217+
218+ # the registry is in-memory only, so re-add the storage and
219+ # instance group after restart
220+ # crucially the same storage_uri is used so that
221+ # MetaIndexer.Init -> RecoverMetaData will reload key_count and
222+ # storage_usage_data from the persisted file
223+ self ._admin_client .add_storage (add_storage_req )
224+ create_ig_req ["trace_id" ] = self ._trace_id + "_restart"
225+ self ._admin_client .create_instance_group (create_ig_req )
226+ self ._client .register_instance (reg_ins_data_req )
227+
228+ # 1. verify that all blocks written before the restart are still
229+ # addressable (cache locations was persisted and recovered)
230+ for i in range (write_count ):
231+ get_location_req = {
232+ "trace_id" : f"{ self ._trace_id } _verify_{ i } " ,
233+ "query_type" : "QT_PREFIX_MATCH" ,
234+ "block_keys" : [i ],
235+ "instance_id" : self ._instance_id ,
236+ "block_mask" : {"offset" : 0 },
237+ }
238+ resp = self ._client .get_cache_location (get_location_req )
239+ self .assertGreater (
240+ len (resp ["locations" ]),
241+ 0 ,
242+ f"block { i } should be accessible after restart" ,
243+ )
244+
245+ # 2. verify key_count was recovered (not reset to zero)
246+ # write the remaining 8 blocks (keys 8-15) to reach
247+ # max_key_count=16
248+ for i in range (write_count , 16 ):
249+ self ._write (i )
250+ # key_count is now:
251+ # 8 (recovered) + 8 (just written) = 16 = max_key_count
252+ # so the next write must be rejected
253+ self ._start_write_expect_fail (16 )
254+
255+ def test_persist_recover_01 (self ):
256+ """Test e2e persist/recover: storage usage data survives a
257+ normal server restart.
258+
259+ After writing all 16 blocks (filling key_count to
260+ max_key_count=16), each block contributes 1024 bytes to
261+ StorageUsageData for the NFS storage type. StorageUsageData is
262+ persisted to the dummy backend together with key_count.
263+
264+ After a controlled server restart the instance group is
265+ re-registered, which reloads StorageUsageData from the persisted
266+ file. The test verifies recovery by observing the reclaimer's
267+ byte-usage trigger:
268+
269+ * If StorageUsageData IS recovered (grp_used_byte_sz_ > 0):
270+ the group byte-usage ratio is 0.5 which exceeds the 0.1
271+ threshold → the reclaimer fires → some blocks are freed → a
272+ new write succeeds.
273+
274+ * If StorageUsageData is NOT recovered (grp_used_byte_sz_ == 0):
275+ cache_reclaimer.cc line 480 returns early with water-level
276+ = false even though key_count was recovered; the reclaimer
277+ does NOT trigger → the write still fails.
278+ """
279+ add_storage_req = {
280+ "trace_id" : self ._trace_id ,
281+ "storage" : self ._make_dummy_storage (),
282+ }
283+ self ._admin_client .add_storage (add_storage_req )
284+
285+ # add instance group
286+ # start with the reclaiming trigger would not happen
287+ ig = self ._make_dummy_instance_group ()
288+ create_ig_req = {
289+ "trace_id" : self ._trace_id ,
290+ "instance_group" : ig ,
291+ }
292+ self ._admin_client .create_instance_group (create_ig_req )
293+
294+ # register instance
295+ reg_ins_data_req = self ._make_dummy_ins_req ()
296+ self ._client .register_instance (reg_ins_data_req )
297+
298+ # write all 16 blocks to fill key_count to max_key_count=16
299+ # each block also adds 1024 bytes to StorageUsageData (NFS type)
300+ for i in range (16 ):
301+ self ._write (i )
302+
303+ # key_count is now at max; the next write must be rejected
304+ self ._start_write_expect_fail (16 )
305+
306+ # --- restart ---
307+ self .worker_manager .stop_worker (0 )
308+ self .assertTrue (self .worker_manager .start_worker (0 ))
309+
310+ # reconnect clients after restart
311+ self ._admin_client .close ()
312+ self ._client .close ()
313+ self ._admin_client , self ._client = self ._get_manager_client ()
314+
315+ # re-register storage and instance group using the same storage_uri
316+ # so that MetaIndexer.Init -> RecoverMetaData reloads both key_count
317+ # and storage_usage_data from the persisted file
318+ self ._admin_client .add_storage (add_storage_req )
319+ create_ig_req ["trace_id" ] = self ._trace_id + "_restart"
320+ self ._admin_client .create_instance_group (create_ig_req )
321+ self ._client .register_instance (reg_ins_data_req )
322+
323+ # key_count is recovered to 16 = max, so write must still fail
324+ self ._start_write_expect_fail (16 )
325+
326+ # lower the reclaim trigger so the reclaimer fires ONLY if
327+ # storage_usage_data was recovered:
328+ # not recovered → grp_used_byte_sz_ == 0 → trigger checker
329+ # early-return false → reclaimer does NOT run
330+ # → write fails
331+ # recovered → grp_used_byte_sz_ == 16 * 1024 = 16384 bytes
332+ # group ratio = 16384 / 32768 = 0.5 > 0.1
333+ # → reclaimer fires → blocks freed → write
334+ # succeeds
335+ curr_ver = ig ["version" ]
336+ ig ["version" ] = curr_ver + 1
337+ ig [
338+ "cache_config"
339+ ][
340+ "reclaim_strategy"
341+ ][
342+ "trigger_strategy"
343+ ][
344+ "used_percentage"
345+ ] = 0.1
346+ update_ig_req = {
347+ "trace_id" : self ._trace_id + "_update_ig" ,
348+ "instance_group" : ig ,
349+ "current_version" : curr_ver ,
350+ }
351+ self ._admin_client .update_instance_group (update_ig_req )
352+
353+ # 2 seconds is enough for the background reclaimer to run
354+ time .sleep (2 )
355+
356+ # write should succeed only if reclaiming happened, which
357+ # requires storage_usage_data to have survived the restart
358+ self ._write (16 )
359+
360+ def test_persist_recover_02 (self ):
361+ """Test metadata persistence-recovery backward compatibility
362+ handling.
363+
364+ Scenario
365+ --------
366+ 1. Setup: Fill test_instance_01 with 16 blocks but do not
367+ trigger reclaiming.
368+ 2. Restart the server with intentionally crafted version 0
369+ metadata file, that is, only key_count is included.
370+ 3. Re-register test_instance_01, lower the threshold for
371+ test_group_01; make the reclaimer fires.
372+ 4. Assert: a new write to test_instance_01 should success, which
373+ means the reclaiming worked as expected, thus the backward
374+ compatibility is properly handled.
375+ """
376+ # add storage
377+ add_storage_req = {
378+ "trace_id" : self ._trace_id ,
379+ "storage" : self ._make_dummy_storage (),
380+ }
381+ self ._admin_client .add_storage (add_storage_req )
382+
383+ # add instance group; reclaim trigger is intentionally too high
384+ # to fire at this point
385+ ig = self ._make_dummy_instance_group ()
386+ create_ig_req = {
387+ "trace_id" : self ._trace_id ,
388+ "instance_group" : ig ,
389+ }
390+ self ._admin_client .create_instance_group (create_ig_req )
391+
392+ # register test_instance_01
393+ reg_ins_01_req = self ._make_dummy_ins_req ()
394+ self ._client .register_instance (reg_ins_01_req )
395+
396+ # write 16 keys into test_instance_01
397+ for i in range (16 ):
398+ self ._write (i )
399+
400+ # storage quota is full
401+ self ._start_write_expect_fail (16 )
402+
403+ # restart the server
404+ self .worker_manager .stop_worker (0 )
405+ meta_storage_backend_config = ig [
406+ "cache_config"
407+ ][
408+ "meta_indexer_config"
409+ ][
410+ "meta_storage_backend_config"
411+ ]
412+ self ._make_v0_persist_data (meta_storage_backend_config )
413+ self .assertTrue (self .worker_manager .start_worker (0 ))
414+
415+ # reconnect clients: update_ports() assigns fresh ports on every
416+ # start
417+ self ._admin_client .close ()
418+ self ._client .close ()
419+ self ._admin_client , self ._client = self ._get_manager_client ()
420+
421+ # re-add the storage and instance group after restart
422+ # re-register test_instance_01 to bring the recovered indexer
423+ # back online
424+ self ._admin_client .add_storage (add_storage_req )
425+ create_ig_req ["trace_id" ] = self ._trace_id + "_restart"
426+ self ._admin_client .create_instance_group (create_ig_req )
427+ self ._client .register_instance (reg_ins_01_req )
428+
429+ # fire the reclaimer for test_group_01
430+ curr_ver = ig ["version" ]
431+ ig ["version" ] = curr_ver + 1
432+ ig [
433+ "cache_config"
434+ ][
435+ "reclaim_strategy"
436+ ][
437+ "trigger_strategy"
438+ ][
439+ "used_percentage"
440+ ] = 0.1
441+ self ._admin_client .update_instance_group ({
442+ "trace_id" : self ._trace_id + "_update_ig" ,
443+ "instance_group" : ig ,
444+ "current_version" : curr_ver ,
445+ })
446+
447+ time .sleep (2 )
448+
449+ # this write can succeed only when v0 meta is properly handled
450+ self ._write (16 )
451+
164452 def _get_manager_client (self ):
165453 self ._admin_http_port = self .worker_manager .get_worker (
166454 0 ).env .admin_http_port
@@ -276,7 +564,7 @@ def _make_dummy_storage(self) -> Dict:
276564 return {
277565 "global_unique_name" : self ._storage_name ,
278566 "nfs" : {
279- "root_path" : f"/tmp/ { self ._storage_name } " ,
567+ "root_path" : f"{ self . get_workdir () } / { self ._storage_name } / " ,
280568 }
281569 }
282570
@@ -312,13 +600,14 @@ def _make_dummy_instance_group(self) -> Dict:
312600 "max_key_count" : 16 , # start with 16 max key
313601 "mutex_shard_num" : 16 ,
314602 "meta_storage_backend_config" : {
315- "storage_type" : "local " ,
316- "storage_uri" : "" , # disable persistence
603+ "storage_type" : "dummy " ,
604+ "storage_uri" : f"file:// { self . get_workdir () } /meta_storage_ { self . _instance_group_name } " ,
317605 },
318606 "meta_cache_policy_config" : {
319607 "capacity" : 1024 * 1024 * 1024 ,
320608 "type" : "LRU" ,
321- }
609+ },
610+ "persist_metadata_interval_time_ms" : 0 ,
322611 }
323612 },
324613 "user_data" : "user-defined info" ,
@@ -350,6 +639,34 @@ def _make_dummy_model_deployment(self):
350639 "pp_size" : 1 ,
351640 }
352641
642+ def _make_v0_persist_data (self , meta_storage_backend_conf ):
643+ if meta_storage_backend_conf ["storage_type" ] == "dummy" :
644+ # rewrite the persisted metadata file
645+ import json
646+ d = {}
647+
648+ persist_path = meta_storage_backend_conf ["storage_uri" ]
649+ persist_path = persist_path .removeprefix ("file://" )
650+ persist_path = "_" .join ((persist_path , self ._instance_id ,))
651+
652+ with open (persist_path , 'r' ) as f :
653+ meta_key = "__metadata__"
654+ key = "__storage_usage_data__"
655+ d = json .load (f )
656+ d1 = json .loads (d [meta_key ])
657+ if key in d1 :
658+ del d1 [key ]
659+ d [meta_key ] = json .dumps (d1 )
660+
661+ with open (persist_path , 'w' ) as f :
662+ json .dump (d , f )
663+
664+ elif meta_storage_backend_conf ["storage_type" ] == "local" :
665+ pass
666+ elif meta_storage_backend_conf ["storage_type" ] == "redis" :
667+ # TODO
668+ pass
669+
353670
354671if __name__ == "__main__" :
355672 unittest .main ()
0 commit comments