@@ -14,7 +14,7 @@ use tracing::Instrument;
1414use crate :: {
1515 unprefixed_fragment_path, BatchManager , CursorStore , CursorStoreOptions , Error ,
1616 ExponentialBackoff , Fragment , FragmentSeqNo , LogPosition , LogReader , LogReaderOptions ,
17- LogWriterOptions , Manifest , ManifestManager ,
17+ LogWriterOptions , Manifest , ManifestManager , ThrottleOptions ,
1818} ;
1919
2020/// The epoch writer is a counting writer. Every epoch exists. An epoch goes
@@ -132,6 +132,69 @@ impl LogWriter {
132132 } )
133133 }
134134
135+ /// Open or try once to initialize the log.
136+ pub async fn bootstrap < D : MarkDirty > (
137+ options : & LogWriterOptions ,
138+ storage : & Arc < Storage > ,
139+ prefix : & str ,
140+ writer : & str ,
141+ mark_dirty : D ,
142+ first_record_offset : LogPosition ,
143+ messages : Vec < Vec < u8 > > ,
144+ ) -> Result < Self , Error > {
145+ let num_records = messages. len ( ) ;
146+ let start = first_record_offset;
147+ let limit = first_record_offset + num_records;
148+ let manifest = Manifest :: load ( & ThrottleOptions :: default ( ) , storage, prefix) . await ?;
149+ if manifest. is_some ( ) {
150+ return Err ( Error :: LogContention ) ;
151+ }
152+ let ( unprefixed_path, setsum, num_bytes) = upload_parquet (
153+ options,
154+ storage,
155+ prefix,
156+ FragmentSeqNo ( 1 ) ,
157+ first_record_offset,
158+ messages,
159+ )
160+ . await ?;
161+ Manifest :: initialize ( options, storage, prefix, writer) . await ?;
162+ let Some ( ( manifest, e_tag) ) =
163+ Manifest :: load ( & ThrottleOptions :: default ( ) , storage, prefix) . await ?
164+ else {
165+ tracing:: error!( "Manifest was initialized and then was None." ) ;
166+ return Err ( Error :: Internal ) ;
167+ } ;
168+ let path = unprefixed_path;
169+ let seq_no = FragmentSeqNo ( 1 ) ;
170+ let num_bytes = num_bytes as u64 ;
171+ let frag = Fragment {
172+ path,
173+ seq_no,
174+ start,
175+ limit,
176+ num_bytes,
177+ setsum,
178+ } ;
179+ let mut new_manifest = manifest. clone ( ) ;
180+ if !new_manifest. can_apply_fragment ( & frag) {
181+ tracing:: error!( "Cannot apply frag to a clean manifest." ) ;
182+ return Err ( Error :: Internal ) ;
183+ }
184+ new_manifest. apply_fragment ( frag) ;
185+ manifest
186+ . install (
187+ & ThrottleOptions :: default ( ) ,
188+ storage,
189+ prefix,
190+ Some ( & e_tag) ,
191+ & new_manifest,
192+ )
193+ . await ?;
194+ mark_dirty. mark_dirty ( limit, num_records) . await ?;
195+ todo ! ( ) ;
196+ }
197+
135198 /// This will close the log.
136199 pub async fn close ( self ) -> Result < ( ) , Error > {
137200 // SAFETY(rescrv): Mutex poisoning.
0 commit comments