Skip to content

833 alien partitions api #863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Bob versions changelog
## [Unreleased]
#### Added
- Blob performes fsync if buffered bytes are larger than max_dirty_bytes_before_sync config param (#748)
- Partitions API for aliens (#833)

#### Changed
- Use cargo workspace to declare dependencies to avoid their duplication (#821)
Expand Down
2 changes: 1 addition & 1 deletion bob-backend/src/pearl/disk_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ impl DiskController {
.await
}

pub(crate) fn groups(&self) -> Arc<RwLock<Vec<Group>>> {
pub fn groups(&self) -> Arc<RwLock<Vec<Group>>> {
self.groups.clone()
}

Expand Down
23 changes: 22 additions & 1 deletion bob-backend/src/pearl/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ impl Group {
let mut total_count = 0;
// General assumption is that processing in order from new to old holders is better, but this is not strictly required
for holder in holders.into_iter().rev() {
let holder_id = holder.1.get_id();
let holder_id = holder.1.get_id().to_owned();
let delete = Self::delete_common(holder.1, key, meta, false).await;
total_count += match delete {
Ok(count) => {
Expand Down Expand Up @@ -517,6 +517,27 @@ impl Group {
Ok(removed)
}

pub async fn detach_by_id(&self, id: &str) -> BackendResult<Holder> {
let mut holders = self.holders.write().await;
debug!("write lock acquired");
let ts = get_current_timestamp();
for ind in 0..holders.len() {
if let Some(holder) = holders.get_child(ind) {
if holder.data.get_id() == id {
if !holder.data.gets_into_interval(ts) {
let removed = holders.remove(ind).expect("should be presented");
removed.close_storage().await;
return Ok(removed);
}
else {
return Err(Error::pearl_change_state(format!("Cannot detach active partition (pearl:{})", id)));
}
}
}
}
Err(Error::pearl_change_state(format!("pearl:{} not found", id)))
}

pub async fn detach_all(&self) -> BackendResult<()> {
let mut holders_lock = self.holders.write().await;
let holders: Vec<_> = holders_lock.clear_and_get_values();
Expand Down
3 changes: 1 addition & 2 deletions bob-backend/src/pearl/holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,11 @@ impl Holder {
self.inner.end_timestamp
}

pub fn get_id(&self) -> String {
pub fn get_id(&self) -> &str {
self.inner.disk_path
.file_name()
.and_then(std::ffi::OsStr::to_str)
.unwrap_or("unparsable string")
.to_owned()
}

pub fn cloned_storage(&self) -> Arc<RwLock<PearlSync>> {
Expand Down
192 changes: 185 additions & 7 deletions bob/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ pub(crate) struct Partition {
records_count: usize,
}

#[derive(Debug, Serialize, Clone)]
pub(crate) struct PartitionSlim {
id: String,
timestamp: u64
}

#[derive(Debug)]
pub struct StatusExt {
status: StatusCode,
Expand Down Expand Up @@ -235,6 +241,8 @@ where
get(vdisk_records_count::<A>),
),
("/vdisks/:vdisk_id/partitions", get(partitions::<A>)),
("/disks/:disk_name/vdisks/:vdisk_id/partitions", get(partitions_by_disk_vdisk::<A>)),
("/alien/nodes/:node_name/vdisks/:vdisk_id/partitions", get(alien_partitions_by_node_vdisk::<A>)),
(
"/vdisks/:vdisk_id/partitions/:partition_id",
get(partition_by_id::<A>),
Expand All @@ -248,6 +256,8 @@ where
"/vdisks/:vdisk_id/partitions/by_timestamp/:timestamp",
delete(delete_partition::<A>),
),
("/disks/:disk_name/vdisks/:vdisk_id/partitions/:partition_id", delete(delete_partition_by_id::<A>)),
("/alien/nodes/:node_name/vdisks/:vdisk_id/partitions/:partition_id", delete(alien_delete_partition_by_id::<A>)),
("/alien", get(alien)),
("/alien/detach", post(detach_alien_partitions::<A>)),
("/alien/dir", get(get_alien_directory::<A>)),
Expand Down Expand Up @@ -738,7 +748,7 @@ where
let holders = group.holders();
let pearls = holders.read().await;
debug!("get pearl holders: OK");
let partitions = pearls.iter().map(Holder::get_id).collect();
let partitions = pearls.iter().map(|h| h.get_id().to_owned()).collect();

let node_name = group.node_name().to_string();
let disk_name = group.disk_name().to_string();
Expand Down Expand Up @@ -880,7 +890,7 @@ where
let group = find_group(&bob, vdisk_id).await?;
let pearls = group.detach(timestamp).await.ok();
if let Some(holders) = pearls {
drop_directories(holders, timestamp, vdisk_id).await
drop_directories(holders, format!("timestamp {} on vdisk {}", timestamp, vdisk_id)).await
} else {
let msg = format!(
"partitions with timestamp {} not found on vdisk {} or it is active",
Expand All @@ -892,20 +902,19 @@ where

async fn drop_directories(
holders: Vec<Holder>,
timestamp: u64,
vdisk_id: u32,
partitions_description: String
) -> Result<StatusExt, StatusExt> {
let mut result_msg = String::new();
let mut is_error = false;
for holder in holders {
let msg = if let Some(err) = holder.drop_directory().await.err() {
is_error = true;
format!(
"partitions with timestamp {} delete failed on vdisk {}, error: {}",
timestamp, vdisk_id, err
"partitions {} delete failed, error: {}",
partitions_description, err
)
} else {
format!("partitions deleted with timestamp {}", timestamp)
format!("partitions {} deleted", partitions_description)
};
result_msg.push_str(&msg);
result_msg.push('\n');
Expand Down Expand Up @@ -1143,6 +1152,175 @@ where
Ok(StatusExt::new(StatusCode::OK, true, format!("Done")))
}

// GET /disks/:disk_name/vdisks/:vdisk_id/partitions
async fn partitions_by_disk_vdisk<A>(
bob: Extension<BobServer<A>>,
AxumPath((disk_name, vdisk_id)): AxumPath<(String, u32)>,
creds: CredentialsHolder<A>,
) -> Result<Json<Vec<PartitionSlim>>, StatusExt>
where
A: Authenticator,
{
if !bob
.auth()
.check_credentials_rest(creds.into())?
.has_rest_read()
{
return Err(AuthError::PermissionDenied.into());
}
let group = find_group_on_disk(&bob, &disk_name, vdisk_id).await?;
let partitions = create_slim_partitions(group).await;
Ok(Json(partitions))
}

// GET /alien/nodes/:node_name/vdisks/:vdisk_id/partitions
async fn alien_partitions_by_node_vdisk<A>(
bob: Extension<BobServer<A>>,
AxumPath((node_name, vdisk_id)): AxumPath<(String, u32)>,
creds: CredentialsHolder<A>,
) -> Result<Json<Vec<PartitionSlim>>, StatusExt>
where
A: Authenticator,
{
if !bob
.auth()
.check_credentials_rest(creds.into())?
.has_rest_read()
{
return Err(AuthError::PermissionDenied.into());
}
let group = find_alien_group_on_disk(&bob, &node_name, vdisk_id).await?;
let partitions = create_slim_partitions(group).await;
Ok(Json(partitions))
}

// DELETE /disks/:disk_name/vdisks/:vdisk_id/partitions/:partition_id
async fn delete_partition_by_id<A>(
bob: Extension<BobServer<A>>,
AxumPath((disk_name, vdisk_id, partition_id)): AxumPath<(String, u32, String)>,
creds: CredentialsHolder<A>,
) -> Result<StatusExt, StatusExt>
where
A: Authenticator,
{
if !bob
.auth()
.check_credentials_rest(creds.into())?
.has_rest_write()
{
return Err(AuthError::PermissionDenied.into());
}
let group = find_group_on_disk(&bob, &disk_name, vdisk_id).await?;
let pearl = group.detach_by_id(&partition_id).await.ok();
if let Some(holder) = pearl {
drop_directories(vec![holder], format!("id {} in vdisk {} on disk {}", partition_id, vdisk_id, disk_name)).await
} else {
let msg = format!(
"partition {} not found on vdisk {} on disk {} or it is active",
partition_id, vdisk_id, disk_name
);
Err(StatusExt::new(StatusCode::NOT_FOUND, true, msg))
}
}

// DELETE /alien/nodes/:node_name/vdisks/:vdisk_id/partitions/:partition_id
async fn alien_delete_partition_by_id<A>(
bob: Extension<BobServer<A>>,
AxumPath((node_name, vdisk_id, partition_id)): AxumPath<(String, u32, String)>,
creds: CredentialsHolder<A>,
) -> Result<StatusExt, StatusExt>
where
A: Authenticator,
{
if !bob
.auth()
.check_credentials_rest(creds.into())?
.has_rest_write()
{
return Err(AuthError::PermissionDenied.into());
}
let group = find_alien_group_on_disk(&bob, &node_name, vdisk_id).await?;
let pearl = group.detach_by_id(&partition_id).await.ok();
if let Some(holder) = pearl {
drop_directories(vec![holder], format!("id {} in vdisk {} for node {}", partition_id, vdisk_id, node_name)).await
} else {
let msg = format!(
"alien partition {} not found on vdisk {} for node {} or it is active",
partition_id, vdisk_id, node_name
);
Err(StatusExt::new(StatusCode::NOT_FOUND, true, msg))
}
}

async fn find_group_on_disk<A: Authenticator>(
bob: &BobServer<A>,
disk_name: &str,
vdisk_id: u32
) -> Result<PearlGroup, StatusExt> {
let backend = bob.grinder().backend().inner();
let (dcs, _) = backend
.disk_controllers()
.ok_or_else(not_acceptable_backend)?;
find_disk_vdisk_group(dcs, disk_name, vdisk_id).await
}

async fn find_alien_group_on_disk<A: Authenticator>(
bob: &BobServer<A>,
node_name: &str,
vdisk_id: u32
) -> Result<PearlGroup, StatusExt> {
let backend = bob.grinder().backend().inner();
let (_, adc) = backend
.disk_controllers()
.ok_or_else(not_acceptable_backend)?;
let groups = adc.groups();
let pearls = groups.read().await;
pearls.iter()
.find(|g| g.node_name() == node_name && g.vdisk_id() == vdisk_id)
.cloned()
.ok_or_else(|| {
let msg = format!("Alien vdisk group for node {} vdisk {} not found", node_name, vdisk_id);
StatusExt::new(StatusCode::NOT_FOUND, false, msg)
})
}

async fn find_disk_vdisk_group(
dcs: &[std::sync::Arc<bob_backend::pearl::DiskController>],
disk_name: &str,
vdisk_id: u32
) -> Result<PearlGroup, StatusExt> {
let needed_dc = dcs
.iter()
.find(|dc| dc.disk().name() == disk_name)
.ok_or_else(|| {
let dcs = dcs.iter()
.map(|dc| format!("DC: {}, vdisks: {}",
dc.disk().name(),
dc.vdisks().iter().map(|v| format!("#{}", v)).collect::<Vec<_>>().join(", ")))
.collect::<Vec<_>>()
.join(", ");
let err = format!("Disk Controller {} with vdisk #{} not found, available dcs: {}", disk_name, vdisk_id, dcs);
warn!("{}", err);
StatusExt::new(StatusCode::NOT_FOUND, false, err)
})?;
needed_dc.vdisk_group(vdisk_id).await.map_err(|e| {
let err = format!("VDiskGroup #{} is missing on disk controller '{}', available vdisks: {}",
vdisk_id,
needed_dc.disk().name(),
needed_dc.vdisks().iter().map(|v| format!("#{}", v)).collect::<Vec<_>>().join(", "));
warn!("{}. Error: {:?}", err, e);
StatusExt::new(StatusCode::NOT_FOUND, false, err)
})
}

async fn create_slim_partitions(group: PearlGroup) -> Vec<PartitionSlim> {
let holders = group.holders();
let pearls = holders.read().await;
return pearls.iter()
.map(|h| PartitionSlim { id: h.get_id().to_owned(), timestamp: h.start_timestamp() })
.collect();
}

fn internal(message: String) -> StatusExt {
StatusExt::new(StatusCode::INTERNAL_SERVER_ERROR, false, message)
}
Expand Down
Loading