Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ containers = [
"edgehog-service?/containers",
]
# Enable the file transfer feature
file-transfer = []
file-transfer = ["windows-sys"]
# Enable the forwarder service
forwarder = ["dep:edgehog-forwarder"]
# Enable connecting to astarte through the Message Hub
Expand Down Expand Up @@ -172,6 +172,9 @@ procfs.workspace = true
udev = { workspace = true, optional = true }
systemd = { workspace = true, optional = true }

[target.'cfg(target_os = "windows")'.dependencies]
windows-sys = { workspace = true, optional = true, features = ["Win32_Storage_FileSystem"] }

[build-dependencies]
rustc_version.workspace = true

Expand Down Expand Up @@ -259,4 +262,5 @@ uuid = "1.23.1"
walkdir = "2.5.0"
webpki-roots = "1.0.4"
wifiscanner = "0.5.1"
windows-sys = "0.61.2"
zbus = { version = "5.14.0", default-features = false }
5 changes: 5 additions & 0 deletions src/file_transfer/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ impl Percentage {
None
}
}

/// Calculate the percentage.
pub fn calculate(&self, value: u64) -> u64 {
value.saturating_mul(self.0.into()).div_ceil(100)
}
}

impl Deref for Percentage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::fmt::Debug;
use std::io;
use std::path::{Path, PathBuf};

use cfg_if::cfg_if;
use edgehog_store::models::job::job_type::JobType;
use eyre::WrapErr;
use futures::{Stream, TryStreamExt};
Expand All @@ -34,12 +35,16 @@ use uuid::Uuid;
use crate::file_transfer::config::Percentage;
use crate::file_transfer::encoding::Paths;
use crate::file_transfer::interface::file::StoredFile;
use crate::file_transfer::request::FileDigest;
use crate::file_transfer::request::TransferJobTag;
use crate::file_transfer::request::{FileDigest, TransferJobTag};
use crate::jobs::Queue;

use super::{FileOptions, WriteHandle};

#[cfg(unix)]
mod unix;
#[cfg(windows)]
mod windows;

/// Stores files in the storage
#[derive(Debug)]
pub(crate) struct FileStorage<F> {
Expand All @@ -59,7 +64,7 @@ impl FileStorage<Fs> {
impl<F> FileStorage<F> {
#[instrument(skip_all)]
pub(crate) async fn init(&self, queue: &Queue) -> eyre::Result<()> {
trace!(path = %self.dir.display(), "initializing store dir");
trace!(path = %self.dir.display(), "initialazing store dir");

tokio::fs::create_dir_all(&self.dir)
.await
Expand Down Expand Up @@ -166,45 +171,45 @@ impl<F> FileStorage<F> {
}

#[instrument(skip_all, fields(id = %opt.id))]
pub(crate) async fn create_write_handle(&self, opt: &FileOptions) -> io::Result<WriteHandle>
pub(crate) async fn create_write_handle(&mut self, opt: &FileOptions) -> io::Result<WriteHandle>
where
F: Space,
{
let file_path = self.file_path(&opt.id);

trace!(path = %file_path.display(), "opening file for write");

let handle = WriteHandle::open(file_path, opt).await?;

self.fs
.reserve_space(opt.id, &file_path, opt.file_size)
.reserve_space(opt.id, &handle.partial, opt.file_size)
.await?;

trace!(path = %file_path.display(), "opening file for write");

WriteHandle::open(file_path, opt).await
Ok(handle)
}

#[instrument(skip_all, fields(id = %opt.id))]
pub(crate) async fn finalize_write(
&self,
&mut self,
handle: &mut WriteHandle,
opt: &FileOptions,
) -> io::Result<StoredFile<Uuid>>
) -> io::Result<StoredFile>
where
F: Space,
{
handle.finalize(opt).await?;

self.fs.finalize(opt.id).await?;
self.fs.finalize(opt.id, &handle.path).await?;

let id = opt.id;
let size = opt.file_size;
let path = handle.path.clone();

Ok(StoredFile::create(id, path, size))
Ok(StoredFile::create(id.to_string(), path, size))
}

#[instrument(skip_all)]
pub(crate) async fn files(
&self,
) -> io::Result<impl Stream<Item = io::Result<StoredFile<String>>>> {
pub(crate) async fn files(&self) -> io::Result<impl Stream<Item = io::Result<StoredFile>>> {
let read_dir = tokio::fs::read_dir(&self.dir).await?;

let stream = ReadDirStream::new(read_dir).try_filter_map(async |e| {
Expand Down Expand Up @@ -232,55 +237,123 @@ pub(crate) trait Space {
/// It will make sure that at least the 10% of free space is available on the device the files
/// are stored on.
fn reserve_space(
&self,
&mut self,
id: Uuid,
path: &Path,
file_size: u64,
) -> impl Future<Output = io::Result<()>> + Send;

/// Marks the file as saved and refreshes the current quota
fn finalize(&self, _id: Uuid) -> impl Future<Output = io::Result<()>> + Send;
fn finalize(&mut self, id: Uuid, path: &Path) -> impl Future<Output = io::Result<()>> + Send;
}

#[derive(Debug)]
pub(crate) struct Fs {
#[expect(unused)]
reserved: Percentage,
}

impl Fs {}
impl Fs {
fn has_avail(&self, mut file_size: u64, stat: &FsStat) -> bool {
// Use next multiple of allocation_granularity
let rem = file_size % stat.fragment_size;
file_size = file_size.saturating_add(stat.fragment_size.saturating_sub(rem));

let reserved = self.reserved.calculate(stat.fs_total);

reserved < stat.user_avail.saturating_sub(file_size)
}

fn has_total_free(&self, stat: &FsStat) -> bool {
let reserved = self.reserved.calculate(stat.fs_total);

reserved < stat.user_avail
}
}

// TODO: should use tokio here
impl Space for Fs {
async fn reserve_space(&self, _id: Uuid, _path: &Path, _file_size: u64) -> io::Result<()> {
// TODO: ensure 10% of the free space on disk
// TODO: we could pre-allocate the file size?
#[instrument(skip(self, path))]
async fn reserve_space(&mut self, id: Uuid, path: &Path, file_size: u64) -> io::Result<()> {
let stat = FsStat::read(path)?;

if !self.has_avail(file_size, &stat) {
return Err(io::Error::new(
io::ErrorKind::FileTooLarge,
"file size exceeds the required reserved free space",
));
}

Ok(())
}

async fn finalize(&self, _id: Uuid) -> io::Result<()> {
// TODO: refresh the space
// TODO: we should cleanup the file
#[instrument(skip(self, path))]
async fn finalize(&mut self, id: Uuid, path: &Path) -> io::Result<()> {
let stat = FsStat::read(path)?;

if !self.has_total_free(&stat) {
return Err(io::Error::new(
io::ErrorKind::FileTooLarge,
"file size exceeds the required reserved free space",
));
}

Ok(())
}
}

/// Information of a mounted filesystem.
///
/// We use bytes to overcome cross system compatibility.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct FsStat {
/// Fragment size
///
/// Size of a single contiguous fragment of data that can be stored.
fragment_size: u64,
/// Filesystem user space.
///
/// Space on fs to store data for an unprivileged user, calculated as multiple of fragment size.
user_avail: u64,
/// Total filesystem space.
///
/// Total filesystem space calculated as a multiple of fragment size.
fs_total: u64,
}

impl FsStat {
fn read(path: &Path) -> io::Result<Self> {
cfg_if! {
if #[cfg(windows)] {
self::windows::get_disk_free_space(path)
} else {
self::unix::read_statvfs(path)
}
}
}
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
#[cfg(unix)]
use std::os::unix::fs::MetadataExt;

use mockall::{Sequence, predicate};
use tempdir::TempDir;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};

use crate::file_transfer::config::DEFAULT_MAX_FREE_PERCENTAGE;
#[cfg(unix)]
use crate::file_transfer::request::FilePermissions;

use super::*;

use pretty_assertions::assert_eq;

// NOTE: GitHub windows runner have a little less than 20% of free space available. So to not
// have the CI failing, we stay well under that limit with a 10% reserved space
pub(crate) const TEST_RESERVED_PERCENTAGE: Percentage = Percentage::new(10).unwrap();

fn partial_file_path(path: PathBuf) -> PathBuf {
let mut path = path.into_os_string();

Expand All @@ -293,7 +366,7 @@ mod tests {
let dir = TempDir::new("fs_storage").expect("couldn't create temp directory");

(
FileStorage::new(dir.path().to_path_buf(), DEFAULT_MAX_FREE_PERCENTAGE),
FileStorage::new(dir.path().to_path_buf(), TEST_RESERVED_PERCENTAGE),
dir,
)
}
Expand Down Expand Up @@ -350,12 +423,15 @@ mod tests {
.in_sequence(&mut seq)
.returning(|_, _, _| Box::pin(std::future::ready(Ok(()))));
mock.expect_finalize()
.with(predicate::eq(opt.id))
.with(
predicate::eq(opt.id),
predicate::function(move |p: &Path| p.to_str().unwrap().contains(&id.to_string())),
)
.once()
.in_sequence(&mut seq)
.returning(|_| Box::pin(std::future::ready(Ok(()))));
.returning(|_, _| Box::pin(std::future::ready(Ok(()))));

let (store, dir) = mock_fs_storage(mock);
let (mut store, dir) = mock_fs_storage(mock);

let mut write = store.create_write_handle(&opt).await.unwrap();

Expand Down Expand Up @@ -385,7 +461,7 @@ mod tests {

#[tokio::test]
async fn write_handle_new() {
let (store, dir) = fs_storage();
let (mut store, dir) = fs_storage();

let id = Uuid::new_v4();

Expand Down Expand Up @@ -430,7 +506,7 @@ mod tests {

#[tokio::test]
async fn write_handle_existing() {
let (store, dir) = fs_storage();
let (mut store, dir) = fs_storage();

let id = Uuid::new_v4();

Expand Down
36 changes: 36 additions & 0 deletions src/file_transfer/file_system/store/unix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// This file is part of Edgehog.
//
// Copyright 2026 SECO Mind Srl
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

use std::io;
use std::path::Path;

use super::FsStat;

pub(crate) fn read_statvfs(path: &Path) -> io::Result<FsStat> {
let stat = rustix::fs::statvfs(path)?;
let fragment_size = stat.f_frsize;

let user_avail = stat.f_bsize.saturating_mul(stat.f_bavail);
let fs_total = stat.f_frsize.saturating_mul(stat.f_blocks);

Ok(FsStat {
fragment_size,
user_avail,
fs_total,
})
}
Loading
Loading