Skip to content

Commit 511ddbc

Browse files
authored
Merge pull request #89 from yuiseki/feat/parallel-layer-list
Parallelize MBTiles layer aggregation
2 parents 80487d5 + a9524a2 commit 511ddbc

File tree

3 files changed

+148
-52
lines changed

3 files changed

+148
-52
lines changed

Cargo.lock

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ mvt = "0.10.3"
4545
protobuf = "3.7"
4646
geo-types = "0.7"
4747
crossbeam-channel = "0.5"
48+
rayon = "1"
4849

4950
[dev-dependencies]
5051

src/mbtiles.rs

Lines changed: 107 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use geo_types::{
1616
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
1717
use mvt::{GeomData, GeomEncoder, GeomType, Tile};
1818
use mvt_reader::Reader;
19+
use rayon::prelude::*;
1920
use rusqlite::{Connection, OpenFlags, params};
2021
use serde::Serialize;
2122
use tracing::warn;
@@ -851,11 +852,23 @@ struct LayerAccum {
851852
property_values: HashSet<String>,
852853
}
853854

855+
impl LayerAccum {
856+
fn new() -> Self {
857+
Self {
858+
feature_count: 0,
859+
vertex_count: 0,
860+
property_keys: HashSet::new(),
861+
property_values: HashSet::new(),
862+
}
863+
}
864+
}
865+
854866
fn build_file_layer_list(
855867
conn: &Connection,
856868
sample: Option<&SampleSpec>,
857869
total_tiles: u64,
858870
zoom: Option<u8>,
871+
no_progress: bool,
859872
) -> Result<Vec<FileLayerSummary>> {
860873
let data_expr = tiles_data_expr(conn)?;
861874
let source = tiles_source_clause(conn)?;
@@ -869,7 +882,25 @@ fn build_file_layer_list(
869882
let mut rows = stmt.query([]).context("query layer list scan")?;
870883

871884
let mut index: u64 = 0;
872-
let mut map: BTreeMap<String, LayerAccum> = BTreeMap::new();
885+
let mut tiles: Vec<Vec<u8>> = Vec::new();
886+
let read_progress = if no_progress {
887+
ProgressBar::hidden()
888+
} else if total_tiles > 0 {
889+
let bar = make_progress_bar(total_tiles);
890+
bar.set_message("reading layers");
891+
bar
892+
} else {
893+
let spinner = ProgressBar::new_spinner();
894+
spinner.set_draw_target(ProgressDrawTarget::stderr_with_hz(20));
895+
spinner.set_style(
896+
ProgressStyle::with_template("[{elapsed_precise}] {spinner:.cyan} {msg}")
897+
.unwrap()
898+
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
899+
);
900+
spinner.set_message("reading layers");
901+
spinner.enable_steady_tick(Duration::from_millis(80));
902+
spinner
903+
};
873904

874905
while let Some(row) = rows.next().context("read layer list row")? {
875906
let row_zoom: u8 = row.get(0)?;
@@ -883,41 +914,82 @@ fn build_file_layer_list(
883914
continue;
884915
}
885916
let data: Vec<u8> = row.get(1)?;
886-
let payload = decode_tile_payload(&data)?;
887-
let reader =
888-
Reader::new(payload).map_err(|err| anyhow::anyhow!("decode vector tile: {err}"))?;
889-
let layers = reader
890-
.get_layer_metadata()
891-
.map_err(|err| anyhow::anyhow!("read layer metadata: {err}"))?;
892-
for layer in layers {
893-
let entry = map.entry(layer.name.clone()).or_insert_with(|| LayerAccum {
894-
feature_count: 0,
895-
vertex_count: 0,
896-
property_keys: HashSet::new(),
897-
property_values: HashSet::new(),
898-
});
899-
entry.feature_count += layer.feature_count as u64;
900-
let features = reader
901-
.get_features(layer.layer_index)
902-
.map_err(|err| anyhow::anyhow!("read layer features: {err}"))?;
903-
for feature in features {
904-
entry.vertex_count += count_vertices(&feature.geometry) as u64;
905-
if let Some(props) = feature.properties {
906-
for (key, value) in props {
907-
entry.property_keys.insert(key.clone());
908-
entry.property_values.insert(format_property_value(&value));
909-
}
910-
}
911-
}
912-
}
917+
tiles.push(data);
913918

914919
if let Some(SampleSpec::Count(limit)) = sample
915920
&& index >= *limit
916921
{
917922
break;
918923
}
924+
925+
if index == 1 || index.is_multiple_of(100) {
926+
read_progress.set_position(index);
927+
}
919928
}
920929

930+
read_progress.set_position(index);
931+
read_progress.finish();
932+
933+
let processing = if no_progress {
934+
ProgressBar::hidden()
935+
} else {
936+
let bar = make_progress_bar(tiles.len() as u64);
937+
bar.set_message("processing layers");
938+
bar
939+
};
940+
941+
let map = tiles
942+
.into_par_iter()
943+
.map(|data| {
944+
let result = (|| -> Result<BTreeMap<String, LayerAccum>> {
945+
let payload = decode_tile_payload(&data)?;
946+
let reader = Reader::new(payload)
947+
.map_err(|err| anyhow::anyhow!("decode vector tile: {err}"))?;
948+
let layers = reader
949+
.get_layer_metadata()
950+
.map_err(|err| anyhow::anyhow!("read layer metadata: {err}"))?;
951+
let mut local = BTreeMap::new();
952+
for layer in layers {
953+
let entry = local
954+
.entry(layer.name.clone())
955+
.or_insert_with(LayerAccum::new);
956+
entry.feature_count += layer.feature_count as u64;
957+
let features = reader
958+
.get_features(layer.layer_index)
959+
.map_err(|err| anyhow::anyhow!("read layer features: {err}"))?;
960+
for feature in features {
961+
entry.vertex_count += count_vertices(&feature.geometry) as u64;
962+
if let Some(props) = feature.properties {
963+
for (key, value) in props {
964+
entry.property_keys.insert(key);
965+
entry.property_values.insert(format_property_value(&value));
966+
}
967+
}
968+
}
969+
}
970+
Ok(local)
971+
})();
972+
processing.inc(1);
973+
result
974+
})
975+
.reduce(
976+
|| Ok(BTreeMap::new()),
977+
|left, right| -> Result<BTreeMap<String, LayerAccum>> {
978+
let mut left = left?;
979+
let right = right?;
980+
for (name, accum) in right {
981+
let entry = left.entry(name).or_insert_with(LayerAccum::new);
982+
entry.feature_count += accum.feature_count;
983+
entry.vertex_count += accum.vertex_count;
984+
entry.property_keys.extend(accum.property_keys);
985+
entry.property_values.extend(accum.property_values);
986+
}
987+
Ok(left)
988+
},
989+
)?;
990+
991+
processing.finish();
992+
921993
let mut result = map
922994
.into_iter()
923995
.map(|(name, accum)| FileLayerSummary {
@@ -1499,19 +1571,6 @@ fn make_progress_bar(total: u64) -> ProgressBar {
14991571
bar
15001572
}
15011573

1502-
fn make_spinner(message: &str) -> ProgressBar {
1503-
let spinner = ProgressBar::new_spinner();
1504-
spinner.set_draw_target(ProgressDrawTarget::stderr_with_hz(20));
1505-
spinner.set_style(
1506-
ProgressStyle::with_template("{spinner:.cyan} {msg}")
1507-
.unwrap()
1508-
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
1509-
);
1510-
spinner.set_message(message.to_string());
1511-
spinner.enable_steady_tick(Duration::from_millis(80));
1512-
spinner
1513-
}
1514-
15151574
pub fn inspect_mbtiles(path: &Path) -> Result<MbtilesReport> {
15161575
inspect_mbtiles_with_options(path, InspectOptions::default())
15171576
}
@@ -1786,17 +1845,13 @@ pub fn inspect_mbtiles_with_options(path: &Path, options: InspectOptions) -> Res
17861845
result.sort_by(|a, b| a.name.cmp(&b.name));
17871846
result
17881847
} else if options.include_layer_list && options.sample.is_none() {
1789-
let spinner = if options.no_progress {
1790-
None
1791-
} else {
1792-
Some(make_spinner("processing layers"))
1793-
};
1794-
let result =
1795-
build_file_layer_list(&conn, options.sample.as_ref(), total_tiles, options.zoom)?;
1796-
if let Some(spinner) = spinner {
1797-
spinner.finish_and_clear();
1798-
}
1799-
result
1848+
build_file_layer_list(
1849+
&conn,
1850+
options.sample.as_ref(),
1851+
total_tiles,
1852+
options.zoom,
1853+
options.no_progress,
1854+
)?
18001855
} else {
18011856
Vec::new()
18021857
};

0 commit comments

Comments
 (0)