Skip to content

Commit 764c4c4

Browse files
committed
refactor(reader): move tile chunking logic for optimized bulk reads into separate module
1 parent b5510ac commit 764c4c4

3 files changed

Lines changed: 139 additions & 106 deletions

File tree

versatiles_container/src/container/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,7 @@ pub use tar::{TarTilesReader, TarTilesWriter};
3333
mod directory;
3434
pub use directory::{DirectoryReader, DirectoryWriter};
3535

36+
mod tile_chunking;
37+
3638
mod versatiles;
3739
pub use versatiles::{VersaTilesReader, VersaTilesWriter};
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
//! Shared chunking logic for coalescing nearby tile byte ranges into bulk reads.
2+
//!
3+
//! Both the VersaTiles and PMTiles readers use this module to minimize I/O calls
4+
//! when streaming tiles for a bounding box. Nearby byte ranges are grouped into
5+
//! chunks (up to ~64 MiB each, with a small gap tolerance), which are then read
6+
//! as single large blobs and sliced into individual tiles.
7+
8+
use crate::Tile;
9+
use futures::stream::StreamExt;
10+
use std::sync::Arc;
11+
use versatiles_core::{Blob, ByteRange, TileCompression, TileCoord, TileFormat, TileStream, io::DataReader};
12+
13+
const MAX_CHUNK_SIZE: u64 = 64 * 1024 * 1024;
14+
const MAX_CHUNK_GAP: u64 = 256 * 1024;
15+
16+
/// A group of tile byte ranges that can be served from a single large read.
17+
/// `range` tracks the combined byte span in the container.
18+
#[derive(Debug)]
19+
pub struct Chunk {
20+
pub tiles: Vec<(TileCoord, ByteRange)>,
21+
pub range: ByteRange,
22+
}
23+
24+
impl Chunk {
25+
pub fn new(start: u64) -> Self {
26+
Self {
27+
tiles: Vec::new(),
28+
range: ByteRange::new(start, 0),
29+
}
30+
}
31+
32+
pub fn push(&mut self, entry: (TileCoord, ByteRange)) {
33+
assert!(
34+
entry.1.offset >= self.range.offset,
35+
"entry offset must be >= range offset"
36+
);
37+
self.range.length = self
38+
.range
39+
.length
40+
.max(entry.1.offset + entry.1.length - self.range.offset);
41+
self.tiles.push(entry);
42+
}
43+
}
44+
45+
/// Sort tile ranges by byte offset and coalesce into chunks.
46+
///
47+
/// Nearby ranges (within `MAX_CHUNK_GAP`) are grouped together as long as the
48+
/// total chunk size stays below `MAX_CHUNK_SIZE`.
49+
pub fn coalesce_into_chunks(mut tile_ranges: Vec<(TileCoord, ByteRange)>) -> Vec<Chunk> {
50+
if tile_ranges.is_empty() {
51+
return Vec::new();
52+
}
53+
54+
tile_ranges.sort_by_key(|e| e.1.offset);
55+
56+
let mut chunks: Vec<Chunk> = Vec::new();
57+
let mut chunk = Chunk::new(tile_ranges[0].1.offset);
58+
59+
for entry in tile_ranges {
60+
let chunk_start = chunk.range.offset;
61+
let chunk_end = chunk.range.offset + chunk.range.length;
62+
63+
let tile_start = entry.1.offset;
64+
let tile_end = entry.1.offset + entry.1.length;
65+
66+
if (chunk_start + MAX_CHUNK_SIZE > tile_end) && (chunk_end + MAX_CHUNK_GAP > tile_start) {
67+
chunk.push(entry);
68+
} else {
69+
chunks.push(chunk);
70+
chunk = Chunk::new(entry.1.offset);
71+
chunk.push(entry);
72+
}
73+
}
74+
75+
if !chunk.tiles.is_empty() {
76+
chunks.push(chunk);
77+
}
78+
79+
chunks
80+
}
81+
82+
/// Convert chunks into a `TileStream` by reading each chunk as a single blob
83+
/// and slicing out individual tiles.
84+
pub fn stream_from_chunks(
85+
chunks: Vec<Chunk>,
86+
reader: Arc<DataReader>,
87+
tile_compression: TileCompression,
88+
tile_format: TileFormat,
89+
) -> TileStream<'static, Tile> {
90+
TileStream::from_stream(
91+
futures::stream::iter(chunks)
92+
.then(move |chunk| {
93+
let reader = Arc::clone(&reader);
94+
async move {
95+
let big_blob = match reader.read_range(&chunk.range).await {
96+
Ok(blob) => blob,
97+
Err(e) => {
98+
log::error!("failed to read chunk range {:?}: {e}", chunk.range);
99+
return futures::stream::iter(Vec::new());
100+
}
101+
};
102+
103+
let entries: Vec<(TileCoord, Tile)> = chunk
104+
.tiles
105+
.into_iter()
106+
.map(|(coord, range)| {
107+
let start = usize::try_from(range.offset - chunk.range.offset)
108+
.expect("range offset difference should fit in usize");
109+
let end = start + usize::try_from(range.length).expect("range length should fit in usize");
110+
111+
let blob = Blob::from(big_blob.range(start..end));
112+
let tile = Tile::from_blob(blob, tile_compression, tile_format);
113+
114+
(coord, tile)
115+
})
116+
.collect();
117+
118+
futures::stream::iter(entries)
119+
}
120+
})
121+
.flatten()
122+
.boxed(),
123+
)
124+
}

versatiles_container/src/container/versatiles/reader.rs

Lines changed: 13 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,18 @@
5353
//! or when a requested tile is missing.
5454
5555
use super::types::{BlockDefinition, BlockIndex, FileHeader, TileIndex};
56-
use crate::{SourceType, Tile, TileSource, TileSourceMetadata, TilesRuntime, Traversal, TraversalOrder, TraversalSize};
56+
use crate::{
57+
SourceType, Tile, TileSource, TileSourceMetadata, TilesRuntime, Traversal, TraversalOrder, TraversalSize,
58+
container::tile_chunking::{Chunk, coalesce_into_chunks, stream_from_chunks},
59+
};
5760
use anyhow::Result;
5861
use async_trait::async_trait;
5962
use futures::{lock::Mutex, stream::StreamExt};
6063
use std::{fmt::Debug, ops::Shr, path::Path, sync::Arc};
6164
#[cfg(feature = "cli")]
6265
use versatiles_core::utils::PrettyPrint;
6366
use versatiles_core::{
64-
Blob, ByteRange, LimitedCache, TileBBox, TileCoord, TileJSON, TileStream,
67+
ByteRange, LimitedCache, TileBBox, TileCoord, TileJSON, TileStream,
6568
compression::decompress,
6669
io::{DataReader, DataReaderFile},
6770
};
@@ -193,9 +196,6 @@ impl VersaTilesReader {
193196
/// Coalesces nearby ranges into at most ~64 MiB chunks (with a small gap tolerance)
194197
/// to minimize I/O calls during streaming.
195198
async fn get_chunks(&self, bbox: TileBBox) -> Result<Vec<Chunk>> {
196-
const MAX_CHUNK_SIZE: u64 = 64 * 1024 * 1024;
197-
const MAX_CHUNK_GAP: u64 = 256 * 1024;
198-
199199
let block_coords: Vec<TileCoord> = bbox.scaled_down(256).iter_coords().collect();
200200

201201
let stream = futures::stream::iter(block_coords).then(|block_coord: TileCoord| {
@@ -223,7 +223,7 @@ impl VersaTilesReader {
223223
let tile_index: Arc<TileIndex> = self.get_block_tile_index(&block).await?;
224224
log::trace!("tile_index.len() {}", tile_index.len());
225225

226-
let mut tile_ranges: Vec<(TileCoord, ByteRange)> = tile_index
226+
let tile_ranges: Vec<(TileCoord, ByteRange)> = tile_index
227227
.iter()
228228
.enumerate()
229229
.filter_map(|(index, range)| {
@@ -236,38 +236,7 @@ impl VersaTilesReader {
236236
})
237237
.collect();
238238

239-
if tile_ranges.is_empty() {
240-
return Ok(Vec::new());
241-
}
242-
243-
tile_ranges.sort_by_key(|e| e.1.offset);
244-
245-
let mut chunks: Vec<Chunk> = Vec::new();
246-
let mut chunk = Chunk::new(tile_ranges[0].1.offset);
247-
248-
for entry in tile_ranges {
249-
let chunk_start = chunk.range.offset;
250-
let chunk_end = chunk.range.offset + chunk.range.length;
251-
252-
let tile_start = entry.1.offset;
253-
let tile_end = entry.1.offset + entry.1.length;
254-
255-
if (chunk_start + MAX_CHUNK_SIZE > tile_end) && (chunk_end + MAX_CHUNK_GAP > tile_start) {
256-
// chunk size is still inside the limits
257-
chunk.push(entry);
258-
} else {
259-
// chunk becomes to big, create a new one
260-
chunks.push(chunk);
261-
chunk = Chunk::new(entry.1.offset);
262-
chunk.push(entry);
263-
}
264-
}
265-
266-
if !chunk.tiles.is_empty() {
267-
chunks.push(chunk);
268-
}
269-
270-
Ok(chunks)
239+
Ok(coalesce_into_chunks(tile_ranges))
271240
}
272241
});
273242

@@ -283,34 +252,6 @@ impl VersaTilesReader {
283252
}
284253
}
285254

286-
// Internal helper to group tile reads: collects (coord, range) pairs that can be served
287-
// from a single large read. `range` tracks the combined byte span in the container.
288-
#[derive(Debug)]
289-
struct Chunk {
290-
tiles: Vec<(TileCoord, ByteRange)>,
291-
range: ByteRange,
292-
}
293-
294-
impl Chunk {
295-
fn new(start: u64) -> Self {
296-
Self {
297-
tiles: Vec::new(),
298-
range: ByteRange::new(start, 0),
299-
}
300-
}
301-
fn push(&mut self, entry: (TileCoord, ByteRange)) {
302-
self.tiles.push(entry);
303-
assert!(
304-
entry.1.offset >= self.range.offset,
305-
"entry offset must be >= range offset"
306-
);
307-
self.range.length = self
308-
.range
309-
.length
310-
.max(entry.1.offset + entry.1.length - self.range.offset);
311-
}
312-
}
313-
314255
#[async_trait]
315256
/// [`TileSource`] implementation — provides `container_name`, `parameters`, `tilejson`,
316257
/// on-the-fly `override_compression`, single-tile fetch via `get_tile`, and bbox streaming via
@@ -439,45 +380,11 @@ impl TileSource for VersaTilesReader {
439380
async fn get_tile_stream(&self, bbox: TileBBox) -> Result<TileStream<'static, Tile>> {
440381
log::debug!("get_tile_stream {bbox:?}");
441382
let chunks = self.get_chunks(bbox).await?;
442-
let reader = Arc::clone(&self.reader);
443-
let tile_compression = self.metadata.tile_compression;
444-
let tile_format = self.metadata.tile_format;
445-
446-
Ok(TileStream::from_stream(
447-
futures::stream::iter(chunks)
448-
.then(move |chunk| {
449-
let reader = Arc::clone(&reader);
450-
async move {
451-
let big_blob = match reader.read_range(&chunk.range).await {
452-
Ok(blob) => blob,
453-
Err(e) => {
454-
log::error!("failed to read chunk range {:?}: {e}", chunk.range);
455-
return futures::stream::iter(Vec::new());
456-
}
457-
};
458-
459-
let entries: Vec<(TileCoord, Tile)> = chunk
460-
.tiles
461-
.into_iter()
462-
.map(|(coord, range)| {
463-
debug_assert!(bbox.contains(&coord), "outer_bbox {bbox:?} does not contain {coord:?}");
464-
465-
let start = usize::try_from(range.offset - chunk.range.offset)
466-
.expect("range offset difference should fit in usize");
467-
let end = start + usize::try_from(range.length).expect("range length should fit in usize");
468-
469-
let blob = Blob::from(big_blob.range(start..end));
470-
let tile = Tile::from_blob(blob, tile_compression, tile_format);
471-
472-
(coord, tile)
473-
})
474-
.collect();
475-
476-
futures::stream::iter(entries)
477-
}
478-
})
479-
.flatten()
480-
.boxed(),
383+
Ok(stream_from_chunks(
384+
chunks,
385+
Arc::clone(&self.reader),
386+
self.metadata.tile_compression,
387+
self.metadata.tile_format,
481388
))
482389
}
483390

@@ -593,7 +500,7 @@ mod tests {
593500
use super::*;
594501
use crate::{MOCK_BYTES_PBF, MockReader, TilesRuntime, TilesWriter, VersaTilesWriter, make_test_file};
595502
use assert_fs::NamedTempFile;
596-
use versatiles_core::{TileBBoxPyramid, TileCompression, TileFormat, assert_wildcard, io::DataWriterBlob};
503+
use versatiles_core::{Blob, TileBBoxPyramid, TileCompression, TileFormat, assert_wildcard, io::DataWriterBlob};
597504

598505
// Helper to quickly create a test reader and bbox
599506
async fn mk_reader() -> Result<(NamedTempFile, VersaTilesReader)> {

0 commit comments

Comments
 (0)