@@ -11,7 +11,7 @@ use taskbroker::{
11
11
} ;
12
12
use tokio:: task:: JoinSet ;
13
13
14
- async fn process_activations ( num_activations : u32 , num_workers : u32 ) {
14
+ async fn get_pending_activations ( num_activations : u32 , num_workers : u32 ) {
15
15
let url = if cfg ! ( feature = "bench-with-mnt-disk" ) {
16
16
let mut rng = rand:: thread_rng ( ) ;
17
17
format ! (
@@ -48,23 +48,12 @@ async fn process_activations(num_activations: u32, num_workers: u32) {
48
48
join_set. spawn ( async move {
49
49
let mut num_activations_processed = 0 ;
50
50
51
- let mut pending = store
51
+ while store
52
52
. get_pending_activation ( Some ( "namespace" ) )
53
53
. await
54
- . unwrap ( ) ;
55
-
56
- while let Some ( ref activation) = pending {
57
- store
58
- . set_status (
59
- & activation. activation . id ,
60
- InflightActivationStatus :: Complete ,
61
- )
62
- . await
63
- . unwrap ( ) ;
64
- pending = store
65
- . get_pending_activation ( Some ( "namespace" ) )
66
- . await
67
- . unwrap ( ) ;
54
+ . unwrap ( )
55
+ . is_some ( )
56
+ {
68
57
num_activations_processed += 1 ;
69
58
}
70
59
num_activations_processed
@@ -76,20 +65,77 @@ async fn process_activations(num_activations: u32, num_workers: u32) {
76
65
) ;
77
66
}
78
67
68
+ async fn set_status ( num_activations : u32 , num_workers : u32 ) {
69
+ assert ! ( num_activations % num_workers == 0 ) ;
70
+
71
+ let url = if cfg ! ( feature = "bench-with-mnt-disk" ) {
72
+ let mut rng = rand:: thread_rng ( ) ;
73
+ format ! (
74
+ "/mnt/disks/sqlite/{}-{}.sqlite" ,
75
+ Utc :: now( ) ,
76
+ rng. r#gen:: <u64 >( )
77
+ )
78
+ } else {
79
+ generate_temp_filename ( )
80
+ } ;
81
+ let store = Arc :: new (
82
+ InflightActivationStore :: new (
83
+ & url,
84
+ InflightActivationStoreConfig {
85
+ max_processing_attempts : 1 ,
86
+ } ,
87
+ )
88
+ . await
89
+ . unwrap ( ) ,
90
+ ) ;
91
+
92
+ for chunk in make_activations ( num_activations) . chunks ( 1024 ) {
93
+ store. store ( chunk. to_vec ( ) ) . await . unwrap ( ) ;
94
+ }
95
+
96
+ assert_eq ! (
97
+ store. count_pending_activations( ) . await . unwrap( ) ,
98
+ num_activations as usize
99
+ ) ;
100
+
101
+ let mut join_set = JoinSet :: new ( ) ;
102
+ for i in 0 ..num_workers {
103
+ let store = store. clone ( ) ;
104
+ join_set. spawn ( async move {
105
+ for idx in 0 ..num_activations {
106
+ if idx % num_workers == i {
107
+ store
108
+ . set_status ( & format ! ( "id_{}" , i) , InflightActivationStatus :: Complete )
109
+ . await
110
+ . unwrap ( ) ;
111
+ }
112
+ }
113
+ } ) ;
114
+ }
115
+ }
116
+
79
117
fn store_bench ( c : & mut Criterion ) {
80
118
let num_activations: u32 = 4_096 ;
81
119
let num_workers = 64 ;
82
120
83
121
c. benchmark_group ( "bench_InflightActivationStore" )
84
122
. sample_size ( 256 )
85
123
. throughput ( criterion:: Throughput :: Elements ( num_activations. into ( ) ) )
86
- . bench_function ( "process_activations" , |b| {
124
+ . bench_function ( "get_pending_activation" , |b| {
125
+ let runtime = tokio:: runtime:: Builder :: new_multi_thread ( )
126
+ . enable_all ( )
127
+ . build ( )
128
+ . unwrap ( ) ;
129
+ b. to_async ( runtime)
130
+ . iter ( || get_pending_activations ( num_activations, num_workers) ) ;
131
+ } )
132
+ . bench_function ( "set_status" , |b| {
87
133
let runtime = tokio:: runtime:: Builder :: new_multi_thread ( )
88
134
. enable_all ( )
89
135
. build ( )
90
136
. unwrap ( ) ;
91
137
b. to_async ( runtime)
92
- . iter ( || process_activations ( num_activations, num_workers) ) ;
138
+ . iter ( || set_status ( num_activations, num_workers) ) ;
93
139
} ) ;
94
140
}
95
141
0 commit comments