-
Notifications
You must be signed in to change notification settings - Fork 369
Expand file tree
/
Copy pathartifact_tree.rs
More file actions
408 lines (374 loc) · 16.3 KB
/
Copy pathartifact_tree.rs
File metadata and controls
408 lines (374 loc) · 16.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is dual-licensed under either the MIT license found in the
* LICENSE-MIT file in the root directory of this source tree or the Apache
* License, Version 2.0 found in the LICENSE-APACHE file in the root directory
* of this source tree. You may select, at your option, one of the
* above-listed licenses.
*/
use std::sync::Arc;
use buck2_core::fs::project_rel_path::ProjectRelativePathBuf;
use buck2_directory::directory::directory_ref::DirectoryRef;
use buck2_directory::directory::entry::DirectoryEntry;
use buck2_error::BuckErrorContext;
use buck2_error::internal_error;
use buck2_execute::digest_config::DigestConfig;
use buck2_execute::directory::ActionDirectoryEntry;
use buck2_execute::directory::ActionDirectoryMember;
use buck2_execute::directory::ActionSharedDirectory;
use buck2_execute::materialize::materializer::ArtifactNotMaterializedReason;
use buck2_execute::materialize::materializer::CasDownloadInfo;
use buck2_execute::materialize::materializer::CopiedArtifact;
use buck2_execute::materialize::materializer::HttpDownloadInfo;
use buck2_execute::output_size::OutputSize;
use chrono::DateTime;
use chrono::Utc;
use derive_more::Display;
use dupe::Dupe;
use futures::future::BoxFuture;
use futures::future::Shared;
use tracing::instrument;
use crate::materializers::deferred::SharedMaterializingError;
use crate::materializers::deferred::WriteFile;
use crate::materializers::deferred::file_tree::FileTree;
use crate::sqlite::materializer_db::MaterializerState;
use crate::sqlite::materializer_db::MaterializerStateEntry;
use crate::sqlite::materializer_db::MaterializerStateSqliteDb;
/// A future that is materializing on a separate task spawned by the materializer
pub(crate) type MaterializingFuture =
Shared<BoxFuture<'static, Result<(), SharedMaterializingError>>>;
/// A future that is cleaning paths on a separate task spawned by the materializer
pub(crate) type CleaningFuture = Shared<BoxFuture<'static, buck2_error::Result<()>>>;
#[derive(Clone)]
pub(crate) enum ProcessingFuture {
Materializing(MaterializingFuture),
Cleaning(CleaningFuture),
}
/// Tree that stores materialization data for each artifact. Used internally by
/// the `DeferredMaterializer` to keep track of artifacts and how to
/// materialize them.
pub(crate) type ArtifactTree = FileTree<Box<ArtifactMaterializationData>>;
/// The Version of a processing future associated with an artifact. We use this to know if we can
/// clear the processing field when a callback is received, or if more work is expected.
#[derive(Eq, PartialEq, Copy, Clone, Dupe, Debug, Ord, PartialOrd, Display)]
pub struct Version(pub u64);
pub struct ArtifactMaterializationData {
/// Taken from `deps` of `ArtifactValue`. Used to materialize deps of the artifact.
pub(crate) deps: Option<ActionSharedDirectory>,
pub(crate) stage: ArtifactMaterializationStage,
/// An optional future that may be processing something at the current path
/// (for example, materializing or deleting). Any other future that needs to process
/// this path would need to wait on the existing future to finish.
/// TODO(scottcao): Turn this into a queue of pending futures.
pub(crate) processing: Processing,
}
/// Represents a processing future + the version at which it was issued. When receiving
/// notifications about processing futures that finish, their changes are only applied if their
/// version is greater than the current version.
///
/// The version is an internal counter that is shared between the current processing_fut and
/// this data. When multiple operations are queued on a ArtifactMaterializationData, this
/// allows us to identify which one is current.
pub(crate) enum Processing {
Done(Version),
Active {
future: ProcessingFuture,
version: Version,
},
}
impl Processing {
pub(crate) fn current_version(&self) -> Version {
match self {
Self::Done(version) => *version,
Self::Active { version, .. } => *version,
}
}
fn into_future(self) -> Option<ProcessingFuture> {
match self {
Self::Done(..) => None,
Self::Active { future, .. } => Some(future),
}
}
}
/// Metadata used to identify an artifact entry and stored for every materialized artifact.
pub type ArtifactMetadata = ActionDirectoryEntry<ActionSharedDirectory>;
pub(crate) fn artifact_metadata_matches_entry(
metadata: &ArtifactMetadata,
entry: &ArtifactMetadata,
) -> bool {
match (metadata, entry) {
(DirectoryEntry::Dir(d1), DirectoryEntry::Dir(d2)) => d1.fingerprint() == d2.fingerprint(),
(DirectoryEntry::Leaf(l1), DirectoryEntry::Leaf(l2)) => {
// In Windows, the 'executable bit' absence can cause Buck2 to re-download identical artifacts.
// To avoid this, we exclude the executable bit from the comparison.
if cfg!(windows) {
if let (ActionDirectoryMember::File(meta1), ActionDirectoryMember::File(meta2)) =
(l1, l2)
{
return meta1.digest == meta2.digest;
}
}
l1 == l2
}
_ => false,
}
}
pub(crate) fn artifact_metadata_size(metadata: &ArtifactMetadata) -> u64 {
match metadata {
DirectoryEntry::Dir(_) => metadata.calc_output_count_and_bytes(false).bytes,
DirectoryEntry::Leaf(ActionDirectoryMember::File(file_metadata)) => {
file_metadata.digest.size()
}
DirectoryEntry::Leaf(_) => 0,
}
}
pub enum ArtifactMaterializationStage {
/// The artifact was declared, but the materialization hasn't started yet.
/// If it did start but end with an error, it returns to this stage.
/// When the artifact is declared, we spawn a deletion future to delete
/// all existing paths that conflict with the output paths.
Declared {
/// Taken from `entry` of `ArtifactValue`. Used to materialize the actual artifact.
entry: ActionDirectoryEntry<ActionSharedDirectory>,
method: Arc<ArtifactMaterializationMethod>,
},
/// This artifact was materialized
Materialized {
/// Once the artifact is materialized, we don't need the full entry anymore.
/// We can throw away most of the entry and just keep some metadata used to
/// check if materialized artifact matches declared artifact.
metadata: ArtifactMetadata,
/// Used to clean older artifacts from buck-out.
last_access_time: DateTime<Utc>,
/// Artifact declared by running daemon.
/// Should not be deleted without invalidating DICE nodes, which currently
/// means killing the daemon.
active: bool,
},
}
/// Different ways to materialize the files of an artifact. Some artifacts need
/// to be fetched from the CAS, others copied locally.
#[derive(Debug, Display)]
pub enum ArtifactMaterializationMethod {
/// The files must be copied from a local path.
#[display("local copy")]
LocalCopy(
/// A map `[dest => src]`, meaning that a file at
/// `{artifact_path}/{dest}/{p}` needs to be copied from `{src}/{p}`.
FileTree<ProjectRelativePathBuf>,
/// Raw list of copied artifacts, as received in `declare_copy`.
Vec<CopiedArtifact>,
),
#[display("write")]
Write(Arc<WriteFile>),
/// The files must be fetched from the CAS.
#[display("cas download (action: {})", info.origin)]
CasDownload {
/// The digest of the action that produced this output
info: Arc<CasDownloadInfo>,
},
/// The file must be fetched over HTTP.
#[display("http download ({})", info)]
HttpDownload { info: HttpDownloadInfo },
#[cfg(test)]
Test,
}
pub(crate) trait MaterializationMethodToProto {
fn to_proto(&self) -> buck2_data::MaterializationMethod;
}
impl MaterializationMethodToProto for ArtifactMaterializationMethod {
fn to_proto(&self) -> buck2_data::MaterializationMethod {
match self {
ArtifactMaterializationMethod::LocalCopy { .. } => {
buck2_data::MaterializationMethod::LocalCopy
}
ArtifactMaterializationMethod::CasDownload { .. } => {
buck2_data::MaterializationMethod::CasDownload
}
ArtifactMaterializationMethod::Write { .. } => buck2_data::MaterializationMethod::Write,
ArtifactMaterializationMethod::HttpDownload { .. } => {
buck2_data::MaterializationMethod::HttpDownload
}
#[cfg(test)]
ArtifactMaterializationMethod::Test => unimplemented!(),
}
}
}
impl ArtifactTree {
pub(crate) fn initialize(sqlite_state: Option<MaterializerState>) -> Self {
let mut tree = ArtifactTree::new();
if let Some(sqlite_state) = sqlite_state {
for entry in sqlite_state.into_iter() {
let MaterializerStateEntry {
path,
metadata,
last_access_time,
} = entry;
tree.insert(
path.iter().map(|f| f.to_owned()),
Box::new(ArtifactMaterializationData {
deps: None,
stage: ArtifactMaterializationStage::Materialized {
metadata,
last_access_time,
active: false,
},
processing: Processing::Done(Version(0)),
}),
);
}
}
tree
}
/// Given a path that's (possibly) not yet materialized, returns the path
/// `contents_path` where its contents can be found. Returns Err if the
/// contents cannot be found (ex. if it requires HTTP or CAS download)
///
/// Note that the returned `contents_path` could be the same as `path`.
#[instrument(level = "trace", skip(self), fields(path = %path))]
pub(crate) fn file_contents_path(
&self,
path: ProjectRelativePathBuf,
digest_config: DigestConfig,
) -> Result<ProjectRelativePathBuf, ArtifactNotMaterializedReason> {
let mut path_iter = path.iter();
let materialization_data = match self.prefix_get(&mut path_iter) {
// Not in tree. Assume it's a source file that doesn't require materialization from materializer.
None => return Ok(path),
Some(data) => data,
};
let (entry, method) = match &materialization_data.stage {
ArtifactMaterializationStage::Materialized { .. } => {
return Ok(path);
}
ArtifactMaterializationStage::Declared { entry, method } => {
(entry.dupe(), method.dupe())
}
};
match method.as_ref() {
ArtifactMaterializationMethod::CasDownload { info } => {
let path_iter = path_iter.peekable();
let root_entry: ActionDirectoryEntry<ActionSharedDirectory> = entry.dupe();
let mut entry = Some(entry.as_ref());
// Check if the path we are asking for exists in this entry.
for name in path_iter {
entry = match entry {
Some(DirectoryEntry::Dir(d)) => d.get(name),
_ => break,
}
}
match entry {
Some(entry) => Err(ArtifactNotMaterializedReason::RequiresCasDownload {
path,
// TODO (@torozco): A nicer API to get an Immutable directory here.
entry: entry
.map_dir(|d| {
d.as_dyn()
.to_builder()
.fingerprint(digest_config.as_directory_serializer())
})
.map_leaf(|l| l.dupe()),
info: info.dupe(),
}),
None => Err(
ArtifactNotMaterializedReason::DeferredMaterializerCorruption {
path,
entry: root_entry,
info: info.dupe(),
},
),
}
}
ArtifactMaterializationMethod::HttpDownload { .. }
| ArtifactMaterializationMethod::Write { .. } => {
// TODO: Do the write directly to RE instead of materializing locally?
Err(ArtifactNotMaterializedReason::RequiresMaterialization { path })
}
// TODO: also record and check materialized_files for LocalCopy
ArtifactMaterializationMethod::LocalCopy(srcs, _) => {
match srcs.prefix_get(&mut path_iter) {
None => Ok(path),
Some(src_path) => match path_iter.next() {
None => self.file_contents_path(src_path.clone(), digest_config),
// This is not supposed to be reachable, and if it's, there
// is a bug somewhere else. Panic to prevent the bug from
// propagating.
Some(part) => panic!(
"While getting materialized path of {path:?}: path {src_path:?} is a file, so subpath {part:?} doesn't exist within.",
),
},
}
}
#[cfg(test)]
ArtifactMaterializationMethod::Test => unimplemented!(),
}
}
#[instrument(level = "debug", skip(self, result), fields(path = %artifact_path))]
pub(crate) fn cleanup_finished(
&mut self,
artifact_path: ProjectRelativePathBuf,
version: Version,
result: Result<(), SharedMaterializingError>,
) {
match self
.prefix_get_mut(&mut artifact_path.iter())
.ok_or_else(|| internal_error!("Path is vacant"))
{
Ok(info) => {
if info.processing.current_version() > version {
// We can only unset the future if version matches.
// Otherwise, we may be unsetting a different future from a newer version.
tracing::debug!("version conflict");
return;
}
if result.is_err() {
// Leave it alone, don't keep retrying.
} else {
info.processing = Processing::Done(version);
}
}
Err(e) => {
tracing::debug!("cleanup_finished_vacant: {}", e);
}
}
}
/// Removes paths from tree and returns a pair of two vecs.
/// First vec is a list of paths removed. Second vec is a list of
/// pairs of removed paths to futures that haven't finished.
pub(crate) fn invalidate_paths_and_collect_futures(
&mut self,
paths: Vec<ProjectRelativePathBuf>,
sqlite_db: Option<&mut MaterializerStateSqliteDb>,
) -> buck2_error::Result<Vec<(ProjectRelativePathBuf, ProcessingFuture)>> {
let mut invalidated_paths = Vec::new();
let mut futs = Vec::new();
for path in paths {
for (path, data) in self.remove_path(&path) {
if let Some(processing_fut) = data.processing.into_future() {
futs.push((path.clone(), processing_fut));
}
invalidated_paths.push(path);
}
}
#[cfg(test)]
{
use buck2_error::buck2_error;
for path in &invalidated_paths {
if path.as_str() == "test/invalidate/failure" {
return Err(buck2_error!(buck2_error::ErrorTag::Tier0, "Injected error"));
}
}
}
// We can invalidate the paths here even if materializations are currently running on
// the underlying nodes, because when materialization finishes we'll check the version
// number.
if let Some(sqlite_db) = sqlite_db {
sqlite_db
.materializer_state_table()
.delete(invalidated_paths)
.buck_error_context("Error invalidating paths in materializer state")?;
}
Ok(futs)
}
}