forked from ISISNeutronMuon/digital-muon-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathflush_to_archive.rs
More file actions
97 lines (91 loc) · 3.15 KB
/
flush_to_archive.rs
File metadata and controls
97 lines (91 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use crate::{
error::{ErrorCodeLocation, NexusWriterError, NexusWriterResult},
NexusSettings,
};
use std::path::{Path, PathBuf};
use tokio::{
signal::unix::{signal, SignalKind},
task::JoinHandle,
time::Interval,
};
use tracing::{debug, info, warn};
/// Moves a single file to the archive
#[tracing::instrument(skip_all, level = "info", fields(
from_path = from_path.to_string_lossy().to_string(),
to_path
))]
fn move_file_to_archive(from_path: &Path, archive_path: &Path) -> NexusWriterResult<()> {
let mut to_path = archive_path.to_path_buf();
if let Some(file_name) = from_path.file_name() {
to_path.push(file_name);
tracing::Span::current().record("to_path", to_path.to_string_lossy().to_string());
}
match std::fs::copy(from_path, to_path) {
Ok(bytes) => info!("File Move Succesful. {bytes} byte(s) moved."),
Err(e) => {
warn!("File Move Error {e}");
return Err(e.into());
}
};
if let Err(e) = std::fs::remove_file(from_path) {
warn!("Error removing temporary file: {e}");
return Err(e.into());
}
Ok(())
}
/// Flushes all files in the local completed directory to the archive
#[tracing::instrument(level = "debug", fields(
glob_pattern = glob_pattern,
archive_path = archive_path.to_string_lossy().to_string()
))]
pub(crate) async fn flush_to_archive(
glob_pattern: &str,
archive_path: &Path,
) -> NexusWriterResult<()> {
for file_path in glob::glob(glob_pattern)? {
move_file_to_archive(file_path?.as_path(), archive_path)?;
}
Ok(())
}
/// This task periodically moves any files in the completed directory to the archive
#[tracing::instrument(skip_all, level = "info", fields(
glob_pattern = glob_pattern,
archive_path = archive_path.to_string_lossy().to_string()
))]
async fn archive_flush_task(
glob_pattern: String,
archive_path: PathBuf,
mut interval: Interval,
) -> NexusWriterResult<()> {
// Is used to await any sigint signals
let mut sigint = signal(SignalKind::interrupt())?;
debug!("Finding files matched to {glob_pattern}");
loop {
tokio::select! {
_ = interval.tick() => flush_to_archive(&glob_pattern, &archive_path).await?,
_ = sigint.recv() => return Ok(())
}
}
}
/// If the user specified an archive path,
/// creates the archive flush task and returns the JoinHandle
/// Otherwise returns None
#[tracing::instrument(skip_all, level = "info")]
pub(crate) fn create_archive_flush_task(
nexus_settings: &NexusSettings,
) -> NexusWriterResult<Option<JoinHandle<NexusWriterResult<()>>>> {
let local_completed_glob_pattern =
nexus_settings
.get_local_completed_glob_pattern()
.map_err(|path| NexusWriterError::CannotConvertPath {
path: path.to_path_buf(),
location: ErrorCodeLocation::FlushToArchive,
})?;
Ok(nexus_settings.get_archive_path().map(|archive_path| {
tokio::spawn(archive_flush_task(
local_completed_glob_pattern,
archive_path.to_path_buf(),
nexus_settings.get_archive_flush_interval(),
))
}))
}