@@ -14,7 +14,7 @@ use tracing::Instrument;
14
14
use crate :: {
15
15
unprefixed_fragment_path, BatchManager , CursorStore , CursorStoreOptions , Error ,
16
16
ExponentialBackoff , Fragment , FragmentSeqNo , LogPosition , LogReader , LogReaderOptions ,
17
- LogWriterOptions , Manifest , ManifestManager ,
17
+ LogWriterOptions , Manifest , ManifestManager , ThrottleOptions ,
18
18
} ;
19
19
20
20
/// The epoch writer is a counting writer. Every epoch exists. An epoch goes
@@ -132,6 +132,69 @@ impl LogWriter {
132
132
} )
133
133
}
134
134
135
+ /// Open or try once to initialize the log.
136
+ pub async fn bootstrap < D : MarkDirty > (
137
+ options : & LogWriterOptions ,
138
+ storage : & Arc < Storage > ,
139
+ prefix : & str ,
140
+ writer : & str ,
141
+ mark_dirty : D ,
142
+ first_record_offset : LogPosition ,
143
+ messages : Vec < Vec < u8 > > ,
144
+ ) -> Result < Self , Error > {
145
+ let num_records = messages. len ( ) ;
146
+ let start = first_record_offset;
147
+ let limit = first_record_offset + num_records;
148
+ let manifest = Manifest :: load ( & ThrottleOptions :: default ( ) , storage, prefix) . await ?;
149
+ if manifest. is_some ( ) {
150
+ return Err ( Error :: LogContention ) ;
151
+ }
152
+ let ( unprefixed_path, setsum, num_bytes) = upload_parquet (
153
+ options,
154
+ storage,
155
+ prefix,
156
+ FragmentSeqNo ( 1 ) ,
157
+ first_record_offset,
158
+ messages,
159
+ )
160
+ . await ?;
161
+ Manifest :: initialize ( options, storage, prefix, writer) . await ?;
162
+ let Some ( ( manifest, e_tag) ) =
163
+ Manifest :: load ( & ThrottleOptions :: default ( ) , storage, prefix) . await ?
164
+ else {
165
+ tracing:: error!( "Manifest was initialized and then was None." ) ;
166
+ return Err ( Error :: Internal ) ;
167
+ } ;
168
+ let path = unprefixed_path;
169
+ let seq_no = FragmentSeqNo ( 1 ) ;
170
+ let num_bytes = num_bytes as u64 ;
171
+ let frag = Fragment {
172
+ path,
173
+ seq_no,
174
+ start,
175
+ limit,
176
+ num_bytes,
177
+ setsum,
178
+ } ;
179
+ let mut new_manifest = manifest. clone ( ) ;
180
+ if !new_manifest. can_apply_fragment ( & frag) {
181
+ tracing:: error!( "Cannot apply frag to a clean manifest." ) ;
182
+ return Err ( Error :: Internal ) ;
183
+ }
184
+ new_manifest. apply_fragment ( frag) ;
185
+ manifest
186
+ . install (
187
+ & ThrottleOptions :: default ( ) ,
188
+ storage,
189
+ prefix,
190
+ Some ( & e_tag) ,
191
+ & new_manifest,
192
+ )
193
+ . await ?;
194
+ mark_dirty. mark_dirty ( limit, num_records) . await ?;
195
+ todo ! ( ) ;
196
+ }
197
+
135
198
/// This will close the log.
136
199
pub async fn close ( self ) -> Result < ( ) , Error > {
137
200
// SAFETY(rescrv): Mutex poisoning.
0 commit comments