@@ -21,6 +21,7 @@ use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
2121use risingwave_object_store:: object:: {
2222 InMemObjectStore , MonitoredObjectStore , ObjectError , ObjectStoreImpl , ObjectStoreRef ,
2323} ;
24+ use tokio:: sync:: RwLock ;
2425
2526use crate :: meta_snapshot:: { MetaSnapshot , Metadata } ;
2627use crate :: {
@@ -42,7 +43,7 @@ pub trait MetaSnapshotStorage: 'static + Sync + Send {
4243 async fn get < S : Metadata > ( & self , id : MetaSnapshotId ) -> BackupResult < MetaSnapshot < S > > ;
4344
4445 /// Gets local snapshot manifest.
45- fn manifest ( & self ) -> Arc < MetaSnapshotManifest > ;
46+ async fn manifest ( & self ) -> Arc < MetaSnapshotManifest > ;
4647
4748 /// Refreshes local snapshot manifest.
4849 async fn refresh_manifest ( & self ) -> BackupResult < ( ) > ;
@@ -55,7 +56,7 @@ pub trait MetaSnapshotStorage: 'static + Sync + Send {
5556pub struct ObjectStoreMetaSnapshotStorage {
5657 path : String ,
5758 store : ObjectStoreRef ,
58- manifest : Arc < parking_lot :: RwLock < Arc < MetaSnapshotManifest > > > ,
59+ manifest : Arc < RwLock < Arc < MetaSnapshotManifest > > > ,
5960}
6061
6162// TODO #6482: purge stale snapshots that is not in manifest.
@@ -70,13 +71,18 @@ impl ObjectStoreMetaSnapshotStorage {
7071 Ok ( instance)
7172 }
7273
73- async fn update_manifest ( & self , new_manifest : MetaSnapshotManifest ) -> BackupResult < ( ) > {
74+ async fn update_manifest (
75+ & self ,
76+ update : impl FnOnce ( MetaSnapshotManifest ) -> MetaSnapshotManifest ,
77+ ) -> BackupResult < ( ) > {
78+ let mut guard = self . manifest . write ( ) . await ;
79+ let new_manifest = update ( ( * * guard) . clone ( ) ) ;
7480 let bytes =
7581 serde_json:: to_vec ( & new_manifest) . map_err ( |e| BackupError :: Encoding ( e. into ( ) ) ) ?;
7682 self . store
7783 . upload ( & self . get_manifest_path ( ) , bytes. into ( ) )
7884 . await ?;
79- * self . manifest . write ( ) = Arc :: new ( new_manifest) ;
85+ * guard = Arc :: new ( new_manifest) ;
8086 Ok ( ( ) )
8187 }
8288
@@ -124,19 +130,17 @@ impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage {
124130 ) -> BackupResult < ( ) > {
125131 let path = self . get_snapshot_path ( snapshot. id ) ;
126132 self . store . upload ( & path, snapshot. encode ( ) ?. into ( ) ) . await ?;
127-
128- // update manifest last
129- let mut new_manifest = ( * * self . manifest . read ( ) ) . clone ( ) ;
130- new_manifest. manifest_id += 1 ;
131- new_manifest
132- . snapshot_metadata
133- . push ( MetaSnapshotMetadata :: new (
133+ self . update_manifest ( |mut manifest : MetaSnapshotManifest | {
134+ manifest. manifest_id += 1 ;
135+ manifest. snapshot_metadata . push ( MetaSnapshotMetadata :: new (
134136 snapshot. id ,
135137 snapshot. metadata . hummock_version_ref ( ) ,
136138 snapshot. format_version ,
137139 remarks,
138140 ) ) ;
139- self . update_manifest ( new_manifest) . await ?;
141+ manifest
142+ } )
143+ . await ?;
140144 Ok ( ( ) )
141145 }
142146
@@ -146,13 +150,13 @@ impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage {
146150 MetaSnapshot :: decode ( & data)
147151 }
148152
149- fn manifest ( & self ) -> Arc < MetaSnapshotManifest > {
150- self . manifest . read ( ) . clone ( )
153+ async fn manifest ( & self ) -> Arc < MetaSnapshotManifest > {
154+ self . manifest . read ( ) . await . clone ( )
151155 }
152156
153157 async fn refresh_manifest ( & self ) -> BackupResult < ( ) > {
154158 if let Some ( manifest) = self . get_manifest ( ) . await ? {
155- let mut guard = self . manifest . write ( ) ;
159+ let mut guard = self . manifest . write ( ) . await ;
156160 if manifest. manifest_id > guard. manifest_id {
157161 * guard = Arc :: new ( manifest) ;
158162 }
@@ -161,15 +165,15 @@ impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage {
161165 }
162166
163167 async fn delete ( & self , ids : & [ MetaSnapshotId ] ) -> BackupResult < ( ) > {
164- // update manifest first
165168 let to_delete: HashSet < MetaSnapshotId > = HashSet :: from_iter ( ids. iter ( ) . cloned ( ) ) ;
166- let mut new_manifest = ( * * self . manifest . read ( ) ) . clone ( ) ;
167- new_manifest. manifest_id += 1 ;
168- new_manifest
169- . snapshot_metadata
170- . retain ( |m| !to_delete. contains ( & m. id ) ) ;
171- self . update_manifest ( new_manifest) . await ?;
172-
169+ self . update_manifest ( |mut manifest : MetaSnapshotManifest | {
170+ manifest. manifest_id += 1 ;
171+ manifest
172+ . snapshot_metadata
173+ . retain ( |m| !to_delete. contains ( & m. id ) ) ;
174+ manifest
175+ } )
176+ . await ?;
173177 let paths = ids
174178 . iter ( )
175179 . map ( |id| self . get_snapshot_path ( * id) )
0 commit comments