Skip to content

Commit 2b9aa00

Browse files
authored
[Storage] Remove unneeded bounds on journal::contiguous::Contiguous (#2064)
1 parent 5f5cdfd commit 2b9aa00

3 files changed

Lines changed: 63 additions & 70 deletions

File tree

storage/src/journal/contiguous/fixed.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -620,9 +620,7 @@ impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> Journal<E, A> {
620620
}
621621

622622
// Implement Contiguous trait for fixed-length journals
623-
impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()> + Send + Sync> super::Contiguous
624-
for Journal<E, A>
625-
{
623+
impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> super::Contiguous for Journal<E, A> {
626624
type Item = A;
627625

628626
async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {

storage/src/journal/contiguous/mod.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,14 @@ pub trait Contiguous {
3232
///
3333
/// Returns an error if the underlying storage operation fails or if the item cannot
3434
/// be encoded.
35-
fn append(
36-
&mut self,
37-
item: Self::Item,
38-
) -> impl std::future::Future<Output = Result<u64, Error>> + Send;
35+
fn append(&mut self, item: Self::Item)
36+
-> impl std::future::Future<Output = Result<u64, Error>>;
3937

4038
/// Return the total number of items that have been appended to the journal.
4139
///
4240
/// This count is NOT affected by pruning. The next appended item will receive this
4341
/// position as its value.
44-
fn size(&self) -> impl std::future::Future<Output = u64> + Send;
42+
fn size(&self) -> impl std::future::Future<Output = u64>;
4543

4644
/// Return the position of the oldest item still retained in the journal.
4745
///
@@ -50,9 +48,7 @@ pub trait Contiguous {
5048
/// After pruning, this returns the position of the first item that remains.
5149
/// Note that due to section/blob alignment, this may be less than the `min_position`
5250
/// passed to `prune()`.
53-
fn oldest_retained_pos(
54-
&self,
55-
) -> impl std::future::Future<Output = Result<Option<u64>, Error>> + Send;
51+
fn oldest_retained_pos(&self) -> impl std::future::Future<Output = Result<Option<u64>, Error>>;
5652

5753
/// Prune items at positions strictly less than `min_position`.
5854
///
@@ -72,7 +68,7 @@ pub trait Contiguous {
7268
fn prune(
7369
&mut self,
7470
min_position: u64,
75-
) -> impl std::future::Future<Output = Result<bool, Error>> + Send;
71+
) -> impl std::future::Future<Output = Result<bool, Error>>;
7672

7773
/// Rewind the journal to the given size, discarding items from the end.
7874
///
@@ -95,7 +91,7 @@ pub trait Contiguous {
9591
///
9692
/// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
9793
/// Returns an error if the underlying storage operation fails.
98-
fn rewind(&mut self, size: u64) -> impl std::future::Future<Output = Result<(), Error>> + Send;
94+
fn rewind(&mut self, size: u64) -> impl std::future::Future<Output = Result<(), Error>>;
9995

10096
/// Return a stream of all items in the journal starting from `start_pos`.
10197
///
@@ -112,31 +108,28 @@ pub trait Contiguous {
112108
buffer: NonZeroUsize,
113109
) -> impl std::future::Future<
114110
Output = Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error>,
115-
> + Send;
111+
>;
116112

117113
/// Read the item at the given position.
118114
///
119115
/// # Errors
120116
///
121117
/// - Returns [Error::ItemPruned] if the item at `position` has been pruned.
122118
/// - Returns [Error::ItemOutOfRange] if the item at `position` does not exist.
123-
fn read(
124-
&self,
125-
position: u64,
126-
) -> impl std::future::Future<Output = Result<Self::Item, Error>> + Send;
119+
fn read(&self, position: u64) -> impl std::future::Future<Output = Result<Self::Item, Error>>;
127120

128121
/// Sync all pending writes to storage.
129122
///
130123
/// This ensures all previously appended items are durably persisted.
131-
fn sync(&mut self) -> impl std::future::Future<Output = Result<(), Error>> + Send;
124+
fn sync(&mut self) -> impl std::future::Future<Output = Result<(), Error>>;
132125

133126
/// Close the journal, syncing all pending writes and releasing resources.
134-
fn close(self) -> impl std::future::Future<Output = Result<(), Error>> + Send;
127+
fn close(self) -> impl std::future::Future<Output = Result<(), Error>>;
135128

136129
/// Destroy the journal, removing all associated storage.
137130
///
138131
/// This method consumes the journal and deletes all persisted data including blobs,
139132
/// metadata, and any other storage artifacts. Use this for cleanup in tests or when
140133
/// permanently removing a journal.
141-
fn destroy(self) -> impl std::future::Future<Output = Result<(), Error>> + Send;
134+
fn destroy(self) -> impl std::future::Future<Output = Result<(), Error>>;
142135
}

storage/src/journal/contiguous/variable.rs

Lines changed: 51 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ pub struct Journal<E: Storage + Metrics, V: Codec> {
151151
oldest_retained_pos: u64,
152152
}
153153

154-
impl<E: Storage + Metrics, V: Codec + Send> Journal<E, V> {
154+
impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
155155
/// Initialize a contiguous variable journal.
156156
///
157157
/// # Crash Recovery
@@ -475,53 +475,6 @@ impl<E: Storage + Metrics, V: Codec + Send> Journal<E, V> {
475475
Ok(pruned)
476476
}
477477

478-
/// Return a stream of all items in the journal starting from `start_pos`.
479-
///
480-
/// Each item is yielded as a tuple `(position, item)` where position is the item's
481-
/// position in the journal.
482-
///
483-
/// # Errors
484-
///
485-
/// Returns an error if `start_pos` exceeds the journal size or if any storage/decoding
486-
/// errors occur during replay.
487-
pub async fn replay(
488-
&self,
489-
start_pos: u64,
490-
buffer_size: NonZeroUsize,
491-
) -> Result<Pin<Box<dyn Stream<Item = Result<(u64, V), Error>> + Send + '_>>, Error> {
492-
// Validate start position is within bounds.
493-
if start_pos < self.oldest_retained_pos {
494-
return Err(Error::ItemPruned(start_pos));
495-
}
496-
if start_pos > self.size {
497-
return Err(Error::ItemOutOfRange(start_pos));
498-
}
499-
500-
// If replaying at exactly size, return empty stream
501-
if start_pos == self.size {
502-
return Ok(Box::pin(stream::empty()));
503-
}
504-
505-
// Use offsets index to find offset to start from, calculate section from position
506-
let start_offset = self.offsets.read(start_pos).await?;
507-
let start_section = position_to_section(start_pos, self.items_per_section);
508-
let data_stream = self
509-
.data
510-
.replay(start_section, start_offset, buffer_size)
511-
.await?;
512-
513-
// Transform the stream to include position information
514-
let transformed = data_stream.enumerate().map(move |(idx, result)| {
515-
result.map(|(_section, _offset, _size, item)| {
516-
// Calculate position: start_pos + items read
517-
let pos = start_pos + idx as u64;
518-
(pos, item)
519-
})
520-
});
521-
522-
Ok(Box::pin(transformed))
523-
}
524-
525478
/// Read the item at the given position.
526479
///
527480
/// # Errors
@@ -803,8 +756,57 @@ impl<E: Storage + Metrics, V: Codec + Send> Journal<E, V> {
803756
}
804757
}
805758

759+
impl<E: Storage + Metrics, V: Codec + Send> Journal<E, V> {
760+
/// Return a stream of all items in the journal starting from `start_pos`.
761+
///
762+
/// Each item is yielded as a tuple `(position, item)` where position is the item's
763+
/// position in the journal.
764+
///
765+
/// # Errors
766+
///
767+
/// Returns an error if `start_pos` exceeds the journal size or if any storage/decoding
768+
/// errors occur during replay.
769+
pub async fn replay(
770+
&self,
771+
start_pos: u64,
772+
buffer_size: NonZeroUsize,
773+
) -> Result<Pin<Box<dyn Stream<Item = Result<(u64, V), Error>> + '_>>, Error> {
774+
// Validate start position is within bounds.
775+
if start_pos < self.oldest_retained_pos {
776+
return Err(Error::ItemPruned(start_pos));
777+
}
778+
if start_pos > self.size {
779+
return Err(Error::ItemOutOfRange(start_pos));
780+
}
781+
782+
// If replaying at exactly size, return empty stream
783+
if start_pos == self.size {
784+
return Ok(Box::pin(stream::empty()));
785+
}
786+
787+
// Use offsets index to find offset to start from, calculate section from position
788+
let start_offset = self.offsets.read(start_pos).await?;
789+
let start_section = position_to_section(start_pos, self.items_per_section);
790+
let data_stream = self
791+
.data
792+
.replay(start_section, start_offset, buffer_size)
793+
.await?;
794+
795+
// Transform the stream to include position information
796+
let transformed = data_stream.enumerate().map(move |(idx, result)| {
797+
result.map(|(_section, _offset, _size, item)| {
798+
// Calculate position: start_pos + items read
799+
let pos = start_pos + idx as u64;
800+
(pos, item)
801+
})
802+
});
803+
804+
Ok(Box::pin(transformed))
805+
}
806+
}
807+
806808
// Implement Contiguous trait for variable-length items
807-
impl<E: Storage + Metrics, V: Codec + Send + Sync> Contiguous for Journal<E, V> {
809+
impl<E: Storage + Metrics, V: Codec + Send> Contiguous for Journal<E, V> {
808810
type Item = V;
809811

810812
async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {

0 commit comments

Comments
 (0)