Skip to content

Commit 405aeee

Browse files
committed
Merge remote-tracking branch 'origin/main' into feat/forest-write-lock-semaphore
# Conflicts: # client/blockchain-service/src/handler_bsp.rs
2 parents 63f7ed9 + 50d8c4c commit 405aeee

40 files changed

+1114
-211
lines changed

.github/workflows/network.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,10 @@ jobs:
860860
name: "Integration Tests: Fisherman"
861861
runs-on: blacksmith-4vcpu-ubuntu-2404
862862
timeout-minutes: 30
863+
strategy:
864+
fail-fast: false
865+
matrix:
866+
shard: [1, 2]
863867
steps:
864868
- uses: actions/checkout@v4
865869
- uses: ./.github/workflow-templates/setup-pnpm
@@ -904,6 +908,7 @@ jobs:
904908
--test-reporter=spec \
905909
--test-reporter-destination=stdout \
906910
--test-concurrency=1 \
911+
--test-shard=${{ matrix.shard }}/2 \
907912
./suites/integration/fisherman/**.test.ts
908913
909914
- name: Collect Docker logs on failure

.github/workflows/sdk.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ jobs:
278278
pnpm dlx create-next-app@latest /tmp/hello_world_nextjs --yes --ts --eslint --app --tailwind false --use-pnpm
279279
- name: Install SDK packages
280280
working-directory: /tmp/hello_world_nextjs
281-
run: pnpm add -w @storagehub-sdk/core @storagehub-sdk/msp-client
281+
run: pnpm add @storagehub-sdk/core @storagehub-sdk/msp-client
282282
- name: Smoke build
283283
working-directory: /tmp/hello_world_nextjs
284284
env:

backend/lib/src/api/handlers/files.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,15 @@ pub async fn internal_upload_by_key(
7070
};
7171

7272
// Stream chunks to channel
73+
// Note: Session cleanup is handled automatically by the DownloadSessionGuard
74+
// in the download_by_key spawned task, so we don't need to manually remove it here
7375
let mut stream = body.into_data_stream();
7476
while let Some(chunk_result) = stream.next().await {
7577
match chunk_result {
7678
Ok(chunk) => {
7779
if tx.send(Ok(chunk)).await.is_err() {
7880
// Client disconnected
7981
tracing::info!("Client disconnected for session {}", session_id);
80-
services.download_sessions.remove_session(&session_id);
8182
return (StatusCode::OK, "Client disconnected".to_string());
8283
}
8384
}
@@ -89,7 +90,6 @@ pub async fn internal_upload_by_key(
8990
e.to_string(),
9091
)))
9192
.await;
92-
services.download_sessions.remove_session(&session_id);
9393
return (
9494
StatusCode::INTERNAL_SERVER_ERROR,
9595
"Stream error".to_string(),
@@ -98,7 +98,6 @@ pub async fn internal_upload_by_key(
9898
}
9999
}
100100

101-
services.download_sessions.remove_session(&session_id);
102101
(StatusCode::OK, "Upload successful".to_string())
103102
}
104103

@@ -139,14 +138,16 @@ pub async fn download_by_key(
139138
// have more than 1 Mb of allocated memory per download session (defined by MAX_BUFFER_BYTES)
140139
let (tx, rx) = mpsc::channel::<Result<Bytes, std::io::Error>>(QUEUE_BUFFER_SIZE);
141140

142-
// Add the transmitter to the active download sessions
143-
let _ = services
141+
// Create a download session and get its guard.
142+
// This ensures automatic cleanup when the download completes, fails, or if the client disconnects
143+
let guard = services
144144
.download_sessions
145-
.add_session(&session_id, tx)
145+
.start_session(session_id.clone(), tx)
146146
.map_err(|e| Error::BadRequest(e.to_string()))?;
147147

148+
// Move guard into spawned task so when the task ends, the guard is dropped and the session is automatically removed
148149
tokio::spawn(async move {
149-
// We trigger the download process via RPC call
150+
let _guard = guard;
150151
_ = services.msp.get_file(&session_id, file_info).await;
151152
});
152153

backend/lib/src/services/download_session.rs

Lines changed: 159 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ use tokio::sync::mpsc;
66

77
/// Manages active download sessions for streaming files from MSP nodes to clients.
88
///
9-
/// Each session maps a file key to a channel sender, allowing the internal upload
9+
/// Each session maps a session ID to a channel sender, allowing the internal upload
1010
/// endpoint (which receives chunks from the MSP node) to forward them to the
1111
/// download endpoint (which streams them to the client).
12+
#[derive(Debug)]
1213
pub struct DownloadSessionManager {
1314
sessions: Arc<RwLock<HashMap<String, mpsc::Sender<Result<Bytes, std::io::Error>>>>>,
1415
max_sessions: usize,
@@ -22,13 +23,15 @@ impl DownloadSessionManager {
2223
}
2324
}
2425

25-
/// Atomically adds a new download session for the given file key.
26-
/// Fails if there is already an active session for the file.
27-
pub fn add_session(
26+
/// Atomically registers a new download session for the given session ID.
27+
/// Returns a guard that will automatically clean up the session when dropped.
28+
/// Returns an error if there is already an active session with this ID
29+
/// or if the maximum number of concurrent downloads has been reached.
30+
pub fn start_session(
2831
&self,
29-
id: &String,
32+
session_id: String,
3033
sender: mpsc::Sender<Result<Bytes, std::io::Error>>,
31-
) -> Result<(), String> {
34+
) -> Result<DownloadSessionGuard, String> {
3235
let mut sessions = self
3336
.sessions
3437
.write()
@@ -41,22 +44,32 @@ impl DownloadSessionManager {
4144
));
4245
}
4346

44-
match sessions.entry(id.clone()) {
45-
Entry::Occupied(_) => Err("File is already being downloaded".to_string()),
47+
match sessions.entry(session_id.clone()) {
48+
Entry::Occupied(_) => Err(format!(
49+
"Session ID {} is already active. Please retry with a new session ID.",
50+
session_id
51+
)),
4652
Entry::Vacant(entry) => {
4753
entry.insert(sender);
48-
Ok(())
54+
Ok(DownloadSessionGuard {
55+
manager: self.clone(),
56+
session_id,
57+
})
4958
}
5059
}
5160
}
5261

53-
pub fn remove_session(&self, id: &str) {
62+
/// Removes a download session for the given session ID.
63+
/// This is called automatically by the guard's Drop implementation.
64+
fn end_session(&self, session_id: &str) {
5465
self.sessions
5566
.write()
5667
.expect("Download sessions lock poisoned")
57-
.remove(id);
68+
.remove(session_id);
5869
}
5970

71+
/// Retrieves the channel sender for the given session ID.
72+
/// Used by internal_upload_by_key to forward chunks to the client.
6073
pub fn get_session(&self, id: &str) -> Option<mpsc::Sender<Result<Bytes, std::io::Error>>> {
6174
self.sessions
6275
.read()
@@ -65,3 +78,138 @@ impl DownloadSessionManager {
6578
.cloned()
6679
}
6780
}
81+
82+
impl Clone for DownloadSessionManager {
83+
fn clone(&self) -> Self {
84+
DownloadSessionManager {
85+
sessions: Arc::clone(&self.sessions),
86+
max_sessions: self.max_sessions,
87+
}
88+
}
89+
}
90+
91+
/// RAII guard that ensures download sessions are always cleaned up.
92+
/// The download session will be automatically removed when this guard is dropped,
93+
/// regardless of whether the download succeeded, failed, or panicked.
94+
#[derive(Debug)]
95+
pub struct DownloadSessionGuard {
96+
manager: DownloadSessionManager,
97+
session_id: String,
98+
}
99+
100+
impl Drop for DownloadSessionGuard {
101+
fn drop(&mut self) {
102+
self.manager.end_session(&self.session_id);
103+
}
104+
}
105+
106+
#[cfg(test)]
107+
mod tests {
108+
use super::*;
109+
110+
#[test]
111+
fn test_start_session_success() {
112+
let manager = DownloadSessionManager::new(100);
113+
let session_id = "test_session_123";
114+
let (tx, _rx) = mpsc::channel(10);
115+
116+
let _guard = manager.start_session(session_id.to_string(), tx).unwrap();
117+
118+
// Session should exist
119+
assert!(manager.get_session(session_id).is_some());
120+
}
121+
122+
#[test]
123+
fn test_start_session_duplicate_fails() {
124+
let manager = DownloadSessionManager::new(100);
125+
let session_id = "test_session_123";
126+
let (tx1, _rx1) = mpsc::channel(10);
127+
let (tx2, _rx2) = mpsc::channel(10);
128+
129+
let _guard = manager.start_session(session_id.to_string(), tx1).unwrap();
130+
let result = manager.start_session(session_id.to_string(), tx2);
131+
132+
assert!(result.is_err());
133+
assert!(result.unwrap_err().contains("is already active"));
134+
}
135+
136+
#[test]
137+
fn test_guard_cleanup_on_drop() {
138+
let manager = DownloadSessionManager::new(100);
139+
let session_id = "test_session_123";
140+
let (tx1, _rx1) = mpsc::channel(10);
141+
let (tx2, _rx2) = mpsc::channel(10);
142+
143+
{
144+
let _guard = manager.start_session(session_id.to_string(), tx1).unwrap();
145+
assert!(manager.get_session(session_id).is_some());
146+
} // guard dropped here
147+
148+
assert!(manager.get_session(session_id).is_none());
149+
150+
// Should be able to start a new session after guard is dropped
151+
let _guard = manager.start_session(session_id.to_string(), tx2).unwrap();
152+
assert!(manager.get_session(session_id).is_some());
153+
}
154+
155+
#[test]
156+
fn test_max_sessions_limit() {
157+
let manager = DownloadSessionManager::new(2);
158+
let (tx1, _rx1) = mpsc::channel(10);
159+
let (tx2, _rx2) = mpsc::channel(10);
160+
let (tx3, _rx3) = mpsc::channel(10);
161+
162+
let _guard1 = manager.start_session("session1".to_string(), tx1).unwrap();
163+
let _guard2 = manager.start_session("session2".to_string(), tx2).unwrap();
164+
165+
// Third session should fail due to max sessions reached
166+
let result = manager.start_session("session3".to_string(), tx3);
167+
assert!(result.is_err());
168+
assert!(result.unwrap_err().contains("Maximum number"));
169+
}
170+
171+
#[test]
172+
fn test_multiple_different_sessions() {
173+
let manager = DownloadSessionManager::new(100);
174+
let (tx1, _rx1) = mpsc::channel(10);
175+
let (tx2, _rx2) = mpsc::channel(10);
176+
let (tx3, _rx3) = mpsc::channel(10);
177+
178+
let _guard1 = manager.start_session("session1".to_string(), tx1).unwrap();
179+
let _guard2 = manager.start_session("session2".to_string(), tx2).unwrap();
180+
let _guard3 = manager.start_session("session3".to_string(), tx3).unwrap();
181+
182+
assert!(manager.get_session("session1").is_some());
183+
assert!(manager.get_session("session2").is_some());
184+
assert!(manager.get_session("session3").is_some());
185+
186+
drop(_guard2);
187+
assert!(manager.get_session("session1").is_some());
188+
assert!(manager.get_session("session2").is_none());
189+
assert!(manager.get_session("session3").is_some());
190+
}
191+
192+
#[tokio::test]
193+
async fn test_guard_cleanup_on_task_failure() {
194+
let manager = DownloadSessionManager::new(100);
195+
let session_id = "test_session_123";
196+
let (tx, _rx) = mpsc::channel(10);
197+
198+
let guard = manager.start_session(session_id.to_string(), tx).unwrap();
199+
assert!(manager.get_session(session_id).is_some());
200+
201+
// Simulate what happens in download_by_key: move guard into task
202+
let manager_clone = manager.clone();
203+
let handle = tokio::spawn(async move {
204+
let _guard = guard;
205+
// Simulate RPC failure
206+
Err::<(), String>("RPC call failed".to_string())
207+
});
208+
209+
// Wait for task to complete
210+
let _ = handle.await;
211+
212+
// Session should be cleaned up even though task failed
213+
assert!(manager_clone.get_session(session_id).is_none());
214+
}
215+
}

client/blockchain-service/src/handler.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,12 @@ where
212212
pub enable_msp_distribute_files: bool,
213213
/// Optional Postgres URL for the pending transactions DB. If None, DB is disabled.
214214
pub pending_db_url: Option<String>,
215+
216+
/// Maximum number of BSP confirm storing requests to batch together.
217+
pub bsp_confirm_file_batch_size: u32,
218+
219+
/// Maximum number of MSP respond storage requests to batch together.
220+
pub msp_respond_storage_batch_size: u32,
215221
}
216222

217223
impl<Runtime> Default for BlockchainServiceConfig<Runtime>
@@ -225,6 +231,8 @@ where
225231
peer_id: None,
226232
enable_msp_distribute_files: false,
227233
pending_db_url: None,
234+
bsp_confirm_file_batch_size: 20,
235+
msp_respond_storage_batch_size: 20,
228236
}
229237
}
230238
}

client/blockchain-service/src/handler_bsp.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use log::{debug, error, info, trace, warn};
22
use shc_common::traits::StorageEnableRuntime;
3-
use std::{sync::Arc, u32};
3+
use std::sync::Arc;
44

55
use sc_client_api::HeaderBackend;
66
use sp_api::ProvideRuntimeApi;
77
use sp_blockchain::TreeRoute;
8-
use sp_core::{Get, U256};
8+
use sp_core::U256;
99
use sp_runtime::traits::{Block as BlockT, Zero};
1010

1111
use pallet_proofs_dealer_runtime_api::{
@@ -18,8 +18,8 @@ use shc_common::{
1818
consts::CURRENT_FOREST_KEY,
1919
typed_store::CFDequeAPI,
2020
types::{
21-
BackupStorageProviderId, BlockNumber, FileKey, Fingerprint, MaxBatchConfirmStorageRequests,
22-
StorageEnableEvents, TrieMutation,
21+
BackupStorageProviderId, BlockNumber, FileKey, Fingerprint, StorageEnableEvents,
22+
TrieMutation,
2323
},
2424
};
2525
use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler};
@@ -472,9 +472,9 @@ where
472472

473473
// If we have no pending submit proof requests, we can also check for pending confirm storing requests.
474474
if next_event_data.is_none() {
475-
let max_batch_confirm = <MaxBatchConfirmStorageRequests<Runtime> as Get<u32>>::get();
475+
let max_batch_confirm = self.config.bsp_confirm_file_batch_size;
476476

477-
// Batch multiple confirm file storing taking the runtime maximum.
477+
// Batch multiple confirm file storing taking the configured maximum.
478478
let mut confirm_storing_requests = Vec::new();
479479
for _ in 0..max_batch_confirm {
480480
if let Some(request) = state_store_context

client/blockchain-service/src/handler_msp.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,6 @@ use crate::{
4040
BlockchainService,
4141
};
4242

43-
// TODO: Make this configurable in the config file
44-
const MAX_BATCH_MSP_RESPOND_STORE_REQUESTS: u32 = 100;
45-
4643
impl<FSH, Runtime> BlockchainService<FSH, Runtime>
4744
where
4845
FSH: ForestStorageHandler<Runtime> + Clone + Send + Sync + 'static,
@@ -521,7 +518,7 @@ where
521518
msp_handler.pending_respond_storage_request_file_keys
522519
);
523520

524-
let max_batch_respond = MAX_BATCH_MSP_RESPOND_STORE_REQUESTS;
521+
let max_batch_respond = self.config.msp_respond_storage_batch_size;
525522

526523
// Batch multiple respond storing requests up to the runtime configured maximum.
527524
let mut respond_storage_requests = Vec::new();

0 commit comments

Comments
 (0)