Skip to content

Commit 6d6a200

Browse files
committed
feat(reader): implement chunking for optimized tile streaming in pmtiles reader
1 parent 764c4c4 commit 6d6a200

1 file changed

Lines changed: 78 additions & 33 deletions

File tree

  • versatiles_container/src/container/pmtiles

versatiles_container/src/container/pmtiles/reader.rs

Lines changed: 78 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@
4747
//! `PMTiles` header/directories cannot be parsed or decompressed, or a requested tile is missing.
4848
4949
use super::types::{EntriesV3, HeaderV3};
50-
use crate::{SourceType, Tile, TileSource, TileSourceMetadata, TilesRuntime, Traversal, TraversalOrder, TraversalSize};
50+
use crate::{
51+
SourceType, Tile, TileSource, TileSourceMetadata, TilesRuntime, Traversal, TraversalOrder, TraversalSize,
52+
container::tile_chunking::{coalesce_into_chunks, stream_from_chunks},
53+
};
5154
use anyhow::{Result, bail};
5255
use async_trait::async_trait;
5356
use futures::lock::Mutex;
@@ -221,6 +224,73 @@ impl PMTilesReader {
221224
}
222225
bail!("not found")
223226
}
227+
228+
/// Resolve a tile's byte range by Hilbert index without reading tile data.
229+
///
230+
/// Like `lookup_tile_by_id` but returns only the `ByteRange` (shifted to the
231+
/// tile data section) instead of reading and wrapping the blob. Used by
232+
/// `get_chunks` to collect ranges before coalescing into bulk reads.
233+
async fn resolve_tile_range(
234+
tile_id: u64,
235+
root_entries: Arc<EntriesV3>,
236+
leaves_cache: &Mutex<LimitedCache<ByteRange, Arc<EntriesV3>>>,
237+
leaves_bytes: &Blob,
238+
tile_data_offset: u64,
239+
internal_compression: TileCompression,
240+
) -> Result<Option<ByteRange>> {
241+
let mut entries = root_entries;
242+
243+
for _depth in 0..3 {
244+
let Some(entry) = entries.find_tile(tile_id) else {
245+
return Ok(None);
246+
};
247+
248+
if entry.range.length == 0 {
249+
return Ok(None);
250+
}
251+
252+
if entry.run_length > 0 {
253+
return Ok(Some(entry.range.shifted_forward(tile_data_offset)));
254+
}
255+
256+
let range = entry.range;
257+
let mut cache = leaves_cache.lock().await;
258+
entries = cache.get_or_set(&range, || {
259+
let mut blob = leaves_bytes.read_range(&range)?;
260+
blob = decompress(blob, internal_compression)?;
261+
Ok(Arc::new(EntriesV3::from_blob(&blob)?))
262+
})?;
263+
}
264+
265+
Ok(None)
266+
}
267+
268+
/// Build read chunks by resolving tile byte ranges and coalescing nearby ones.
269+
async fn get_chunks(&self, bbox: TileBBox) -> Result<Vec<crate::container::tile_chunking::Chunk>> {
270+
let mut tile_ranges: Vec<(TileCoord, ByteRange)> = Vec::new();
271+
272+
// Collect coords first so the non-Send iterator is not held across await.
273+
let coords: Vec<TileCoord> = bbox.iter_coords().collect();
274+
for coord in coords {
275+
let Ok(tile_id) = coord.get_hilbert_index() else {
276+
continue;
277+
};
278+
if let Some(range) = Self::resolve_tile_range(
279+
tile_id,
280+
Arc::clone(&self.root_entries),
281+
&self.leaves_cache,
282+
&self.leaves_bytes,
283+
self.header.tile_data.offset,
284+
self.internal_compression,
285+
)
286+
.await?
287+
{
288+
tile_ranges.push((coord, range));
289+
}
290+
}
291+
292+
Ok(coalesce_into_chunks(tile_ranges))
293+
}
224294
}
225295

226296
/// Build the per‑zoom bounding box pyramid by traversing `PMTiles` directory entries.
@@ -341,38 +411,13 @@ impl TileSource for PMTilesReader {
341411
}
342412

343413
async fn get_tile_stream(&self, bbox: TileBBox) -> Result<TileStream<'static, Tile>> {
344-
let data_reader = Arc::clone(&self.data_reader);
345-
let root_entries = Arc::clone(&self.root_entries);
346-
let leaves_cache = Arc::clone(&self.leaves_cache);
347-
let leaves_bytes = Arc::clone(&self.leaves_bytes);
348-
let tile_data_offset = self.header.tile_data.offset;
349-
let tile_compression = self.metadata.tile_compression;
350-
let tile_format = self.metadata.tile_format;
351-
let internal_compression = self.internal_compression;
352-
353-
Ok(TileStream::from_bbox_async_parallel(bbox, move |coord| {
354-
let data_reader = Arc::clone(&data_reader);
355-
let root_entries = Arc::clone(&root_entries);
356-
let leaves_cache = Arc::clone(&leaves_cache);
357-
let leaves_bytes = Arc::clone(&leaves_bytes);
358-
async move {
359-
let tile_id = coord.get_hilbert_index().ok()?;
360-
let tile = PMTilesReader::lookup_tile_by_id(
361-
tile_id,
362-
&data_reader,
363-
root_entries,
364-
&leaves_cache,
365-
&leaves_bytes,
366-
tile_data_offset,
367-
tile_compression,
368-
tile_format,
369-
internal_compression,
370-
)
371-
.await
372-
.ok()??;
373-
Some((coord, tile))
374-
}
375-
}))
414+
let chunks = self.get_chunks(bbox).await?;
415+
Ok(stream_from_chunks(
416+
chunks,
417+
Arc::clone(&self.data_reader),
418+
self.metadata.tile_compression,
419+
self.metadata.tile_format,
420+
))
376421
}
377422

378423
// deep probe of container meta

0 commit comments

Comments
 (0)