@@ -132,7 +132,17 @@ impl LogWriter {
132
132
} )
133
133
}
134
134
135
- /// Open or try once to initialize the log.
135
+ /// Given a contiguous subset of data from some other location (preferably another log),
136
+ /// construct a new log under storage/prefix using the provided options.
137
+ ///
138
+ /// This function is safe to run again on failure and will not bootstrap over a partially
139
+ /// bootstrapped collection.
140
+ ///
141
+ /// It is my intention to make this more robust as time goes on. Concretely, that means that
142
+ /// as we encounter partial failures left by the tool we fix them. There are 3 failure points
143
+ /// and I'd prefer to manually inspect failures than get the automation right to do it always
144
+ /// automatically. Bootstrap is intended only to last as long as there is a migration from the
145
+ /// go to the rust log services.
136
146
pub async fn bootstrap < D : MarkDirty > (
137
147
options : & LogWriterOptions ,
138
148
storage : & Arc < Storage > ,
@@ -141,14 +151,23 @@ impl LogWriter {
141
151
mark_dirty : D ,
142
152
first_record_offset : LogPosition ,
143
153
messages : Vec < Vec < u8 > > ,
144
- ) -> Result < Self , Error > {
154
+ ) -> Result < ( ) , Error > {
145
155
let num_records = messages. len ( ) ;
146
156
let start = first_record_offset;
147
157
let limit = first_record_offset + num_records;
158
+ // SAFETY(rescrv): This is a speculative load to narrow the window in which we would see a
159
+ // race between writers.
148
160
let manifest = Manifest :: load ( & ThrottleOptions :: default ( ) , storage, prefix) . await ?;
149
161
if manifest. is_some ( ) {
150
162
return Err ( Error :: LogContention ) ;
151
163
}
164
+ // SAFETY(rescrv): This will only succeed if the file doesn't exist. Technically the log
165
+ // could be initialized and garbage collected to leave a prefix hole, but our timing
166
+ // assumption is that every op happens in less than 1/2 the GC interval, so there's no way
167
+ // for that to happen.
168
+ //
169
+ // If the file exists, this will fail with LogContention, which fails us with
170
+ // LogContention. Other errors fail transparently, too.
152
171
let ( unprefixed_path, setsum, num_bytes) = upload_parquet (
153
172
options,
154
173
storage,
@@ -158,7 +177,9 @@ impl LogWriter {
158
177
messages,
159
178
)
160
179
. await ?;
180
+ // SAFETY(rescrv): Any error here is an error.
161
181
Manifest :: initialize ( options, storage, prefix, writer) . await ?;
182
+ // SAFETY(rescrv): We just initialized, so we should be able to load---done to get e_tag.
162
183
let Some ( ( manifest, e_tag) ) =
163
184
Manifest :: load ( & ThrottleOptions :: default ( ) , storage, prefix) . await ?
164
185
else {
@@ -177,11 +198,13 @@ impl LogWriter {
177
198
setsum,
178
199
} ;
179
200
let mut new_manifest = manifest. clone ( ) ;
201
+ // SAFETY(rescrv): This is unit tested to never happen. If it happens, add more tests.
180
202
if !new_manifest. can_apply_fragment ( & frag) {
181
203
tracing:: error!( "Cannot apply frag to a clean manifest." ) ;
182
204
return Err ( Error :: Internal ) ;
183
205
}
184
206
new_manifest. apply_fragment ( frag) ;
207
+ // SAFETY(rescrv): If this fails, there's nothing left to do.
185
208
manifest
186
209
. install (
187
210
& ThrottleOptions :: default ( ) ,
@@ -191,8 +214,11 @@ impl LogWriter {
191
214
& new_manifest,
192
215
)
193
216
. await ?;
217
+ // Not Safety:
218
+ // We mark dirty, but if we lose that we lose that.
219
+ // Failure to mark dirty fails the bootstrap.
194
220
mark_dirty. mark_dirty ( limit, num_records) . await ?;
195
- todo ! ( ) ;
221
+ Ok ( ( ) )
196
222
}
197
223
198
224
/// This will close the log.
@@ -563,6 +589,9 @@ pub async fn upload_parquet(
563
589
loop {
564
590
let ( buffer, setsum) = construct_parquet ( log_position, & messages) ?;
565
591
tracing:: info!( "upload_parquet: {:?} with {} bytes" , path, buffer. len( ) ) ;
592
+ // NOTE(rescrv): This match block has been thoroughly reasoned through within the
593
+ // `bootstrap` call above. Don't change the error handling here without re-reasoning
594
+ // there.
566
595
match storage
567
596
. put_bytes (
568
597
& path,
0 commit comments