Skip to content

Reduce allocator usage in Gossip implementation #4640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 39 additions & 19 deletions gossip/src/epoch_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,50 @@ impl std::convert::From<flate2::DecompressError> for Error {
}

impl Flate2 {
fn deflate(mut unc: Uncompressed) -> Result<Self> {
let mut compressed = Vec::with_capacity(unc.slots.block_capacity());
fn deflate(unc: Uncompressed) -> Result<Self> {
// Next operation loses the number of occupied bytes so we save it
let num_bytes = unc.slots.block_len();
// Extract the slots array as raw bytes
let slots_array = unc.slots.into_boxed_slice();
// Assume compressed version will be smaller than the original allocation
let mut compressed = Vec::with_capacity(slots_array.len());
// Perform compression and check status to make sure everything fits
let mut compressor = Compress::new(Compression::best(), false);
let first_slot = unc.first_slot;
let num = unc.num;
unc.slots.shrink_to_fit();
let bits = unc.slots.into_boxed_slice();
compressor.compress_vec(&bits, &mut compressed, FlushCompress::Finish)?;
let status = compressor.compress_vec(
&slots_array[0..num_bytes],
&mut compressed,
FlushCompress::Finish,
)?;
if status != flate2::Status::StreamEnd {
return Err(Error::CompressError);
}

let rv = Self {
first_slot,
num,
first_slot: unc.first_slot,
num: unc.num,
compressed,
};
let _ = rv.inflate()?;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently this was a hack to validate that compression worked out. A crude but effective one I guess.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it was a sanity check.
I don't know how often it does fail though.
Have you checked if it ever fails or not?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep this line as a debug check:

debug_assert_matches!(rv.inflate(), Ok(_));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we worried about forcing sanity check on the receiver of this EpochSlot? Looks like maybe no since you added the check here:

 if status != flate2::Status::StreamEnd {
        return Err(Error::DecompressError);
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the original idea was to validate that compression worked (no idea why else it would be here). But since flate2 can tell us that "it worked", I see no point running decompression to achieve the same result. The check you refer is exactly that, StreamEnd according to docs indicates that all input was consumed and all output fits into buffer provided without reallocation.

Ok(rv)
}

pub fn inflate(&self) -> Result<Uncompressed> {
//add some head room for the decompressor which might spill more bits
let mut uncompressed = Vec::with_capacity(32 + (self.num + 4) / 8);
// Perform the actual decompression and check that the result fits into provided buffer
let mut decompress = Decompress::new(false);
decompress.decompress_vec(&self.compressed, &mut uncompressed, FlushDecompress::Finish)?;
let status = decompress.decompress_vec(
&self.compressed,
&mut uncompressed,
FlushDecompress::Finish,
)?;
if status != flate2::Status::StreamEnd {
return Err(Error::DecompressError);
}

Ok(Uncompressed {
first_slot: self.first_slot,
num: self.num,
slots: BitVec::from_bits(&uncompressed),
slots: BitVec::from_bits(uncompressed),
})
}
}
Expand All @@ -123,13 +142,13 @@ impl Uncompressed {
let start = if min_slot < self.first_slot {
0
} else {
(min_slot - self.first_slot) as usize
min_slot - self.first_slot
};
for i in start..self.num {
if i >= self.slots.len() as usize {
for i in start..self.num as u64 {
if i >= self.slots.len() {
break;
}
if self.slots.get(i as u64) {
if self.slots.get(i) {
rv.push(self.first_slot + i as Slot);
Comment on lines +145 to 152

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these just stylistic changes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its to keep clippy happy.

}
}
Expand All @@ -146,11 +165,12 @@ impl Uncompressed {
if *s < self.first_slot {
return i;
}
if *s - self.first_slot >= self.slots.len() {
let offset = *s - self.first_slot;
if offset >= self.slots.len() {
return i;
}
self.slots.set(*s - self.first_slot, true);
self.num = std::cmp::max(self.num, 1 + (*s - self.first_slot) as usize);
self.slots.set(offset, true);
self.num = std::cmp::max(self.num, 1 + offset as usize);
Comment on lines +168 to +173

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is purely to make the code more readable.

}
slots.len()
}
Expand Down
Loading