1
1
import os
2
+ import time
3
+ from multiprocessing import Process , Queue
2
4
3
5
import pandas as pd
4
- import numpy as np
5
6
import pytest
6
- import sys
7
-
8
7
from arcticdb_ext import set_config_int
9
- from tests .util .mark import REAL_S3_TESTS_MARK
10
8
11
- import time
12
-
13
- from multiprocessing import Process , Queue
9
+ from arcticdb .options import LibraryOptions , EnterpriseLibraryOptions
10
+ from arcticdb_ext .storage import KeyType
14
11
12
+ from python .arcticdb .config import set_log_level
15
13
16
14
one_sec = 1_000_000_000
17
15
18
16
19
- def write_symbols_worker (real_s3_storage_factory , lib_name , result_queue , run_time , step_symbol_id , first_symbol_id ):
20
- fixture = real_s3_storage_factory .create_fixture ()
21
- lib = fixture .create_arctic ()[lib_name ]
17
+ def write_symbols_worker (lib , result_queue ):
22
18
df = pd .DataFrame ({"col" : [1 , 2 , 3 ]})
23
- cnt = 0
24
- start_time = time . time ()
25
- while time . time () - start_time < run_time :
26
- id = cnt * step_symbol_id + first_symbol_id
27
- lib . write ( f"sym_ { id } " , df )
28
- cnt += 1
29
-
30
- result_queue . put (( first_symbol_id , cnt ) )
31
-
32
-
33
- def compact_symbol_list_worker ( real_s3_storage_factory , lib_name , run_time ):
34
- # Decrease the lock wait times to make lock failures more likely
35
- set_config_int ( "StorageLock.WaitMs" , 1 )
36
- # Trigger symbol list compaction on every list_symbols call
37
- set_config_int ( "SymbolList.MaxDelta" , 1 )
38
- fixture = real_s3_storage_factory . create_fixture ()
39
- lib = fixture . create_arctic ()[ lib_name ]
40
-
41
- start_time = time . time ( )
42
- while time . time () - start_time < run_time :
43
- lib . list_symbols ( )
19
+ id = os . getpid ()
20
+ sym = f"sym_ { id } "
21
+ lib . write ( sym , df )
22
+ result_queue . put ( id )
23
+
24
+ @ pytest . fixture ( params = [
25
+ #(0, 0, 0)
26
+ #(0.1, 10, 50) # (prob, min_ms, max_ms )
27
+ #(0.5, 700, 1500)
28
+ ( 0.5 , 1100 , 1700 )
29
+ ])
30
+ def slow_writing_library ( request , real_s3_storage , lib_name ):
31
+ write_slowdown_prob , write_slowdown_min_ms , write_slowdown_max_ms = request . param
32
+ arctic = real_s3_storage . create_arctic ()
33
+ cfg = arctic . _library_adapter . get_library_config ( lib_name , LibraryOptions (), EnterpriseLibraryOptions () )
34
+ cfg . lib_desc . version . failure_sim . write_slowdown_prob = write_slowdown_prob
35
+ cfg . lib_desc . version . failure_sim . slow_down_min_ms = write_slowdown_min_ms
36
+ cfg . lib_desc . version . failure_sim . slow_down_max_ms = write_slowdown_max_ms
37
+ arctic . _library_manager . write_library_config ( cfg , lib_name )
38
+ yield arctic . get_library ( lib_name )
39
+ arctic . delete_library ( lib_name )
44
40
45
41
#@REAL_S3_TESTS_MARK
46
- @pytest .mark .parametrize ("num_writers, num_compactors" , [(2 , 10 ), (10 , 100 )])
47
- def test_stress_only_add_v0 (real_s3_storage_factory , lib_name , num_writers , num_compactors ):
48
- run_time = 60
49
- fixture = real_s3_storage_factory .create_fixture ()
50
- ac = fixture .create_arctic ()
51
- ac .delete_library (lib_name ) # To make sure we have a clean slate
52
- lib = ac .create_library (lib_name )
42
+ @pytest .mark .parametrize ("num_writers, num_compactors" , [(2 , 10 )])
43
+ def test_stress_only_add_v0 (slow_writing_library , lib_name , num_writers , num_compactors ):
44
+ set_config_int ("SymbolList.MaxDelta" , 1 ) # Trigger symbol list compaction on every list_symbols call
45
+ set_log_level (specific_log_levels = {"lock" : "DEBUG" })
53
46
results_queue = Queue ()
54
47
55
48
writers = [
56
- Process (target = write_symbols_worker , args = (real_s3_storage_factory , lib_name , results_queue , run_time , num_writers , i ))
49
+ Process (target = write_symbols_worker , args = (slow_writing_library , results_queue ))
57
50
for i in range (num_writers )
58
51
]
59
52
60
53
compactors = [
61
- Process (target = compact_symbol_list_worker , args = ( real_s3_storage_factory , lib_name , run_time ) )
54
+ Process (target = slow_writing_library . list_symbols )
62
55
for i in range (num_compactors )
63
56
]
64
57
@@ -69,13 +62,18 @@ def test_stress_only_add_v0(real_s3_storage_factory, lib_name, num_writers, num_
69
62
70
63
for p in processes :
71
64
p .join ()
65
+ if p .exitcode != 0 :
66
+ raise RuntimeError (f"Process { p .pid } failed with exit code { p .exitcode } " )
72
67
73
68
expected_symbol_list = set ()
74
69
75
70
while not results_queue .empty ():
76
- first_id , cnt = results_queue .get ()
77
- expected_symbol_list .update ([f"sym_{ first_id + i * num_writers } " for i in range (cnt )])
71
+ expected_symbol_list .add (f"sym_{ results_queue .get ()} " )
78
72
79
- result_symbol_list = set (lib .list_symbols ())
73
+ result_symbol_list = set (slow_writing_library .list_symbols ())
80
74
assert len (result_symbol_list ) == len (expected_symbol_list )
81
75
assert result_symbol_list == expected_symbol_list
76
+
77
+ lt = slow_writing_library ._dev_tools .library_tool ()
78
+ compacted_keys = lt .find_keys_for_id (KeyType .SYMBOL_LIST , "__symbols__" )
79
+ assert len (compacted_keys ) == 1
0 commit comments