Skip to content

Commit a1bfc1b

Browse files
authored
Merge pull request #353 from influxdata/tm/create-directory-at
feat: implement create_directory_at
2 parents 9be5fe5 + 465ffaf commit a1bfc1b

File tree

2 files changed

+366
-31
lines changed

2 files changed

+366
-31
lines changed

host/src/vfs/mod.rs

Lines changed: 340 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
//! running.
99
1010
use std::{
11-
collections::HashMap,
11+
collections::{HashMap, hash_map::Entry},
1212
hash::Hash,
1313
io::{Cursor, Read},
1414
sync::{
@@ -435,6 +435,36 @@ impl<'a> VfsCtxView<'a> {
435435
};
436436
VfsNode::traverse(start, directions)
437437
}
438+
439+
/// Get the parent node and base name for a given path.
440+
fn parent_node_and_name(
441+
&self,
442+
node: SharedVfsNode,
443+
path: &str,
444+
) -> FsResult<(SharedVfsNode, PathSegment)> {
445+
let (is_root, directions) = PathTraversal::parse(path, &self.vfs_state.limits)?;
446+
let mut directions = directions.collect::<Vec<_>>();
447+
448+
let start = if is_root {
449+
Arc::clone(&self.vfs_state.root)
450+
} else {
451+
node
452+
};
453+
454+
let name = match directions
455+
.pop()
456+
.ok_or_else(|| FsError::trap(ErrorCode::Invalid))?
457+
{
458+
Ok(PathTraversal::Down(segment)) => segment,
459+
_other @ (Ok(PathTraversal::Stay) | Ok(PathTraversal::Up) | Err(_)) => {
460+
return Err(FsError::trap(ErrorCode::Invalid));
461+
}
462+
};
463+
464+
let parent = VfsNode::traverse(start, directions.into_iter())?;
465+
466+
Ok((parent, name))
467+
}
438468
}
439469

440470
impl<'a> filesystem::types::HostDescriptor for VfsCtxView<'a> {
@@ -600,10 +630,51 @@ impl<'a> filesystem::types::HostDescriptor for VfsCtxView<'a> {
600630

601631
async fn create_directory_at(
602632
&mut self,
603-
_self_: Resource<Descriptor>,
604-
_path: String,
633+
self_: Resource<Descriptor>,
634+
path: String,
605635
) -> FsResult<()> {
606-
Err(FsError::trap(ErrorCode::ReadOnly))
636+
let desc = self.get_descriptor(self_)?;
637+
if !desc.flags.contains(DescriptorFlags::MUTATE_DIRECTORY) {
638+
return Err(FsError::trap(ErrorCode::ReadOnly));
639+
}
640+
641+
let (parent_node, name) = self.parent_node_and_name(Arc::clone(&desc.node), &path)?;
642+
643+
let new_dir = Arc::new(RwLock::new(VfsNode {
644+
kind: VfsNodeKind::Directory {
645+
children: HashMap::new(),
646+
},
647+
parent: Some(Arc::downgrade(&parent_node)),
648+
}));
649+
650+
self.vfs_state
651+
.inodes_allocation
652+
.inc(1)
653+
.map_err(FsError::trap)?;
654+
self.vfs_state
655+
.limiter
656+
.grow(name.len())
657+
.map_err(|_| FsError::trap(ErrorCode::InsufficientMemory))?;
658+
self.vfs_state
659+
.limiter
660+
.grow(std::mem::size_of_val(&new_dir))
661+
.map_err(|_| FsError::trap(ErrorCode::InsufficientMemory))?;
662+
663+
match &mut parent_node.write().unwrap().kind {
664+
VfsNodeKind::File { .. } => {
665+
return Err(FsError::trap(ErrorCode::NotDirectory));
666+
}
667+
VfsNodeKind::Directory { children } => match children.entry(name) {
668+
Entry::Vacant(entry) => {
669+
entry.insert(new_dir);
670+
}
671+
Entry::Occupied(_) => {
672+
return Err(FsError::trap(ErrorCode::Exist));
673+
}
674+
},
675+
}
676+
677+
Ok(())
607678
}
608679

609680
async fn stat(&mut self, self_: Resource<Descriptor>) -> FsResult<DescriptorStat> {
@@ -797,7 +868,9 @@ impl<'a> filesystem::preopens::Host for VfsCtxView<'a> {
797868
// Create new preopen descriptor for root with read-write access
798869
let desc = VfsDescriptor {
799870
node: Arc::clone(&self.vfs_state.root),
800-
flags: DescriptorFlags::READ,
871+
flags: DescriptorFlags::READ
872+
| DescriptorFlags::MUTATE_DIRECTORY
873+
| DescriptorFlags::WRITE,
801874
};
802875

803876
let res = self.table.push(desc)?;
@@ -840,3 +913,265 @@ pub(crate) struct HasFs;
840913
impl HasData for HasFs {
841914
type Data<'a> = VfsCtxView<'a>;
842915
}
916+
917+
#[cfg(test)]
918+
mod tests {
919+
use std::sync::Arc;
920+
921+
use crate::vfs::DescriptorFlags;
922+
use datafusion_execution::memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool};
923+
use wasmtime_wasi::p2::bindings::filesystem::types::HostDescriptor;
924+
925+
use super::*;
926+
use crate::limiter::{Limiter, StaticResourceLimits};
927+
928+
/// Create a test VfsCtxView with default limits
929+
fn create_test_vfs() -> (ResourceTable, VfsState) {
930+
let limits = VfsLimits::default();
931+
let pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
932+
let limiter = Limiter::new(StaticResourceLimits::default(), &pool);
933+
let vfs_state = VfsState::new(limits, limiter);
934+
let table = ResourceTable::new();
935+
936+
(table, vfs_state)
937+
}
938+
939+
/// Create a test descriptor with the given flags
940+
fn create_test_descriptor(
941+
ctx: &mut VfsCtxView<'_>,
942+
flags: DescriptorFlags,
943+
) -> Resource<Descriptor> {
944+
let desc = VfsDescriptor {
945+
node: Arc::clone(&ctx.vfs_state.root),
946+
flags,
947+
};
948+
let res = ctx.table.push(desc).unwrap();
949+
res.cast()
950+
}
951+
952+
#[tokio::test]
953+
async fn test_create_directory_readonly_descriptor_fails() {
954+
let (mut table, mut vfs_state) = create_test_vfs();
955+
let mut ctx = VfsCtxView {
956+
table: &mut table,
957+
vfs_state: &mut vfs_state,
958+
};
959+
960+
// Create descriptor with READ flags only (no MUTATE_DIRECTORY)
961+
let desc = create_test_descriptor(&mut ctx, DescriptorFlags::READ);
962+
963+
// Attempt to create directory should fail with ReadOnly error
964+
let result = ctx.create_directory_at(desc, "testdir".to_string()).await;
965+
assert!(result.is_err());
966+
967+
let err = result.unwrap_err();
968+
assert_eq!(*err.downcast_ref().unwrap(), ErrorCode::ReadOnly);
969+
}
970+
971+
#[tokio::test]
972+
async fn test_create_directory_already_exists_fails() {
973+
let (mut table, mut vfs_state) = create_test_vfs();
974+
let mut ctx = VfsCtxView {
975+
table: &mut table,
976+
vfs_state: &mut vfs_state,
977+
};
978+
979+
let desc = create_test_descriptor(
980+
&mut ctx,
981+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
982+
);
983+
984+
// First creation should succeed
985+
let result = ctx.create_directory_at(desc, "testdir".to_string()).await;
986+
assert!(result.is_ok());
987+
988+
let desc = create_test_descriptor(
989+
&mut ctx,
990+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
991+
);
992+
993+
// Second creation should fail with Exist error
994+
let result = ctx.create_directory_at(desc, "testdir".to_string()).await;
995+
assert!(result.is_err());
996+
997+
let err = result.unwrap_err();
998+
assert_eq!(*err.downcast_ref().unwrap(), ErrorCode::Exist);
999+
}
1000+
1001+
#[tokio::test]
1002+
async fn test_create_directory_success() {
1003+
let (mut table, mut vfs_state) = create_test_vfs();
1004+
let mut ctx = VfsCtxView {
1005+
table: &mut table,
1006+
vfs_state: &mut vfs_state,
1007+
};
1008+
1009+
let desc = create_test_descriptor(
1010+
&mut ctx,
1011+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
1012+
);
1013+
1014+
let result = ctx.create_directory_at(desc, "testdir".to_string()).await;
1015+
assert!(result.is_ok());
1016+
1017+
let desc = create_test_descriptor(
1018+
&mut ctx,
1019+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
1020+
);
1021+
1022+
// Verify the directory was created by checking it exists
1023+
let node_result = ctx.node_at(desc, "testdir");
1024+
assert!(node_result.is_ok());
1025+
1026+
let node = node_result.unwrap();
1027+
let node_guard = node.read().unwrap();
1028+
match &node_guard.kind {
1029+
VfsNodeKind::Directory { .. } => {
1030+
// Success - it's a directory
1031+
}
1032+
VfsNodeKind::File { .. } => {
1033+
panic!("Expected directory, got file");
1034+
}
1035+
}
1036+
}
1037+
1038+
#[tokio::test]
1039+
async fn test_create_directory_insufficient_inodes_fails() {
1040+
// Create VFS with very limited inodes (1 inode, does not include root)
1041+
let limits = VfsLimits {
1042+
inodes: 1,
1043+
max_path_length: 255,
1044+
max_path_segment_size: 50,
1045+
};
1046+
let pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
1047+
let limiter = Limiter::new(StaticResourceLimits::default(), &pool);
1048+
let mut vfs_state = VfsState::new(limits, limiter);
1049+
let mut table = ResourceTable::new();
1050+
1051+
let mut ctx = VfsCtxView {
1052+
table: &mut table,
1053+
vfs_state: &mut vfs_state,
1054+
};
1055+
1056+
// Create descriptor with proper flags
1057+
let desc = create_test_descriptor(
1058+
&mut ctx,
1059+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
1060+
);
1061+
1062+
// Directory creation should fail due to insufficient inodes
1063+
let result = ctx.create_directory_at(desc, "testdir".to_string()).await;
1064+
assert!(result.is_ok());
1065+
1066+
let desc = create_test_descriptor(
1067+
&mut ctx,
1068+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
1069+
);
1070+
1071+
let result = ctx.create_directory_at(desc, "testdir2".to_string()).await;
1072+
assert!(result.is_err());
1073+
}
1074+
1075+
#[tokio::test]
1076+
async fn test_create_directory_insufficient_space_fails() {
1077+
// Create VFS with very limited space (10 bytes)
1078+
let limits = VfsLimits::default();
1079+
let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(2));
1080+
let static_limits = StaticResourceLimits {
1081+
n_elements_per_table: 1,
1082+
n_instances: 1,
1083+
n_tables: 1,
1084+
n_memories: 1,
1085+
};
1086+
let limiter = Limiter::new(static_limits, &pool);
1087+
let mut vfs_state = VfsState::new(limits, limiter);
1088+
let mut table = ResourceTable::new();
1089+
1090+
let mut ctx = VfsCtxView {
1091+
table: &mut table,
1092+
vfs_state: &mut vfs_state,
1093+
};
1094+
1095+
// Create descriptor with proper flags
1096+
let desc = create_test_descriptor(
1097+
&mut ctx,
1098+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
1099+
);
1100+
1101+
// Directory creation should fail due to insufficient space for name
1102+
let result = ctx
1103+
.create_directory_at(
1104+
desc,
1105+
"very_long_directory_name_with_a_bunch_of_limits".to_string(),
1106+
)
1107+
.await;
1108+
assert!(result.is_err());
1109+
}
1110+
1111+
#[tokio::test]
1112+
async fn test_create_directory_nested_path_success() {
1113+
let (mut table, mut vfs_state) = create_test_vfs();
1114+
let mut ctx = VfsCtxView {
1115+
table: &mut table,
1116+
vfs_state: &mut vfs_state,
1117+
};
1118+
1119+
// Create descriptor with proper flags
1120+
let desc = create_test_descriptor(
1121+
&mut ctx,
1122+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
1123+
);
1124+
1125+
// First create parent directory
1126+
let result = ctx.create_directory_at(desc, "parent".to_string()).await;
1127+
assert!(result.is_ok());
1128+
1129+
let desc = create_test_descriptor(
1130+
&mut ctx,
1131+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
1132+
);
1133+
1134+
// Then create child directory using relative path
1135+
let result = ctx
1136+
.create_directory_at(desc, "parent/child".to_string())
1137+
.await;
1138+
assert!(result.is_ok());
1139+
1140+
// Verify both directories were created
1141+
let desc = create_test_descriptor(
1142+
&mut ctx,
1143+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
1144+
);
1145+
assert!(ctx.node_at(desc, "parent").is_ok());
1146+
1147+
let desc = create_test_descriptor(
1148+
&mut ctx,
1149+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
1150+
);
1151+
assert!(ctx.node_at(desc, "parent/child").is_ok());
1152+
}
1153+
1154+
#[tokio::test]
1155+
async fn test_create_directory_invalid_parent_fails() {
1156+
let (mut table, mut vfs_state) = create_test_vfs();
1157+
let mut ctx = VfsCtxView {
1158+
table: &mut table,
1159+
vfs_state: &mut vfs_state,
1160+
};
1161+
1162+
// Create descriptor with proper flags
1163+
let desc = create_test_descriptor(
1164+
&mut ctx,
1165+
DescriptorFlags::READ | DescriptorFlags::MUTATE_DIRECTORY,
1166+
);
1167+
1168+
// Try to create directory with non-existent parent
1169+
let result = ctx
1170+
.create_directory_at(desc, "nonexistent/child".to_string())
1171+
.await;
1172+
assert!(result.is_err());
1173+
1174+
let err = result.unwrap_err();
1175+
assert_eq!(*err.downcast_ref().unwrap(), ErrorCode::NoEntry);
1176+
}
1177+
}

0 commit comments

Comments
 (0)