Skip to content

feat(mbtiles) - Add validation to martin .mbtiles #741 #1689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion martin/benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use criterion::async_executor::FuturesExecutor;
use criterion::{Criterion, criterion_group, criterion_main};
use martin::file_config::{OnInvalid, ValidationLevel};
use martin::srv::DynTileSource;
use martin::{CatalogSourceEntry, MartinResult, Source, TileData, TileSources, UrlQuery};
use martin_tile_utils::{Encoding, Format, TileCoord, TileInfo};
Expand Down Expand Up @@ -42,6 +43,18 @@ impl Source for NullSource {
false
}

fn get_validation_level(&self) -> Option<ValidationLevel> {
None
}

fn get_on_invalid(&self) -> Option<OnInvalid> {
None
}

async fn validate(&self, _validation_level: ValidationLevel) -> MartinResult<()> {
MartinResult::Ok(())
}

async fn get_tile(
&self,
_xyz: TileCoord,
Expand All @@ -63,7 +76,7 @@ async fn process_tile(sources: &TileSources) {
}

fn bench_null_source(c: &mut Criterion) {
let sources = TileSources::new(vec![vec![Box::new(NullSource::new())]]);
let sources = TileSources::new(vec![Box::new(NullSource::new())]);
c.bench_function("get_table_source_tile", |b| {
b.to_async(FuturesExecutor).iter(|| process_tile(&sources));
});
Expand Down
17 changes: 16 additions & 1 deletion martin/src/args/srv.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use clap::ValueEnum;
use serde::{Deserialize, Serialize};

use crate::srv::{KEEP_ALIVE_DEFAULT, LISTEN_ADDRESSES_DEFAULT, SrvConfig};
use crate::{
file_config::{OnInvalid, ValidationLevel},
srv::{KEEP_ALIVE_DEFAULT, LISTEN_ADDRESSES_DEFAULT, SrvConfig},
};

#[allow(clippy::doc_markdown)]
#[derive(clap::Args, Debug, PartialEq, Default)]
Expand Down Expand Up @@ -34,6 +37,12 @@ pub struct SrvArgs {
#[arg(short = 'u', long = "webui")]
#[cfg(feature = "webui")]
pub web_ui: Option<WebUiMode>,
/// Level of validation to apply to sources
#[arg(long)]
pub validate: Option<ValidationLevel>,
/// How to handle invalid source
#[arg(long)]
pub on_invalid: Option<OnInvalid>,
}

#[cfg(feature = "webui")]
Expand Down Expand Up @@ -81,6 +90,12 @@ impl SrvArgs {
if self.preferred_encoding.is_some() {
srv_config.preferred_encoding = self.preferred_encoding;
}
if let Some(v) = self.validate {
srv_config.validate = v;
}
if let Some(v) = self.on_invalid {
srv_config.on_invalid = v;
}
#[cfg(feature = "webui")]
if self.web_ui.is_some() {
srv_config.web_ui = self.web_ui;
Expand Down
20 changes: 17 additions & 3 deletions martin/src/cog/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use url::Url;
use super::source::CogSource;
use crate::Source;
use crate::config::UnrecognizedValues;
use crate::file_config::{ConfigExtras, FileResult, SourceConfigExtras};
use crate::file_config::{
ConfigExtras, FileResult, OnInvalid, SourceConfigExtras, ValidationLevel,
};

#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct CogConfig {
Expand All @@ -22,13 +24,25 @@ impl ConfigExtras for CogConfig {
}

impl SourceConfigExtras for CogConfig {
async fn new_sources(&self, id: String, path: PathBuf) -> FileResult<Box<dyn Source>> {
async fn new_sources(
&self,
id: String,
path: PathBuf,
_validation_level: Option<ValidationLevel>,
_on_invalid: Option<OnInvalid>,
) -> FileResult<Box<dyn Source>> {
let cog = CogSource::new(id, path)?;
Ok(Box::new(cog))
}

#[allow(clippy::no_effect_underscore_binding)]
async fn new_sources_url(&self, _id: String, _url: Url) -> FileResult<Box<dyn Source>> {
async fn new_sources_url(
&self,
_id: String,
_url: Url,
_validation_level: Option<ValidationLevel>,
_on_invalid: Option<OnInvalid>,
) -> FileResult<Box<dyn Source>> {
unreachable!()
}

Expand Down
18 changes: 17 additions & 1 deletion martin/src/cog/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tiff::tags::Tag::{self, GdalNodata};
use tilejson::{TileJSON, tilejson};

use super::CogError;
use crate::file_config::{FileError, FileResult};
use crate::file_config::{FileError, FileResult, OnInvalid, ValidationLevel};
use crate::{MartinResult, Source, TileData, UrlQuery};

#[derive(Clone, Debug)]
Expand All @@ -32,6 +32,8 @@ pub struct CogSource {
meta: Meta,
tilejson: TileJSON,
tileinfo: TileInfo,
validation_level: Option<ValidationLevel>,
on_invalid: Option<OnInvalid>,
}

impl CogSource {
Expand All @@ -49,6 +51,8 @@ impl CogSource {
meta,
tilejson,
tileinfo,
validation_level: None,
on_invalid: None,
})
}
#[allow(clippy::cast_sign_loss)]
Expand Down Expand Up @@ -151,6 +155,18 @@ impl Source for CogSource {
Box::new(self.clone())
}

fn get_validation_level(&self) -> Option<ValidationLevel> {
self.validation_level
}

fn get_on_invalid(&self) -> Option<OnInvalid> {
self.on_invalid
}

async fn validate(&self, _validation_level: ValidationLevel) -> MartinResult<()> {
MartinResult::Ok(())
}

async fn get_tile(
&self,
xyz: TileCoord,
Expand Down
42 changes: 39 additions & 3 deletions martin/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::ffi::OsStr;
use std::fs::File;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::pin::Pin;

use futures::future::try_join_all;
use log::info;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use subst::VariableMap;

Expand All @@ -20,6 +20,7 @@ use crate::OptOneMany;
feature = "cog"
))]
use crate::file_config::FileConfigEnum;
use crate::file_config::OnInvalid;
use crate::source::{TileInfoSources, TileSources};
use crate::srv::{RESERVED_KEYWORDS, SrvConfig};
use crate::utils::{CacheValue, MainCache, OptMainCache, init_aws_lc_tls, parse_base_path};
Expand Down Expand Up @@ -209,7 +210,42 @@ impl Config {
sources.push(Box::pin(val));
}

Ok(TileSources::new(try_join_all(sources).await?))
let resolved_sources = try_join_all(sources)
.await?
.into_iter()
.flatten()
.collect::<TileInfoSources>();

let mut sources_to_prune: HashSet<usize> = HashSet::new();
for (idx, source) in resolved_sources.iter().enumerate() {
let validation_result = source
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to do parallel validation rather than this.

.validate(source.get_validation_level().unwrap_or(self.srv.validate))
.await;
if let Err(e) = validation_result {
match source.get_on_invalid().unwrap_or(self.srv.on_invalid) {
OnInvalid::Abort => return MartinResult::Err(e),
OnInvalid::Warn => {
warn!(
"Source {} failed validation, this may cause performance issues: {}",
source.get_id(),
e
);
}
OnInvalid::Ignore => {
sources_to_prune.insert(idx);
}
}
}
}

Ok(TileSources::new(
resolved_sources
.into_iter()
.enumerate()
.filter(|e| !sources_to_prune.contains(&e.0))
.map(|(_, s)| s)
.collect(),
))
}

pub fn save_to_file(&self, file_name: PathBuf) -> MartinResult<()> {
Expand Down
66 changes: 62 additions & 4 deletions martin/src/file_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fmt::Debug;
use std::mem;
use std::path::{Path, PathBuf};

use clap::ValueEnum;
use futures::TryFutureExt;
use log::{info, warn};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -46,6 +47,12 @@ pub enum FileError {
#[error(r"Unable to acquire connection to file: {0}")]
AcquireConnError(String),

#[error("Source {0} caused an abort due to validation error {1}")]
AbortOnInvalid(PathBuf, String),

#[error("Source {0} was ignored due to validation error {1}")]
IgnoreOnInvalid(PathBuf, String),

#[cfg(feature = "pmtiles")]
#[error(r"PMTiles error {0} processing {1}")]
PmtError(pmtiles::PmtError, String),
Expand All @@ -55,6 +62,31 @@ pub enum FileError {
CogError(#[from] crate::cog::CogError),
}

#[derive(PartialEq, Eq, Debug, Clone, Copy, Default, Serialize, Deserialize, ValueEnum)]
#[serde(rename_all = "lowercase")]
pub enum ValidationLevel {
/// Quickly check the source
#[default]
Fast,

/// Do a slow check of everything
Thorough,
}

#[derive(PartialEq, Eq, Debug, Clone, Copy, Default, Serialize, Deserialize, ValueEnum)]
#[serde(rename_all = "lowercase")]
pub enum OnInvalid {
/// Print warning message, and abort if the error is critical
#[default]
Warn,

/// Skip this source
Ignore,

/// Do not start Martin on any warnings
Abort,
}

pub trait ConfigExtras: Clone + Debug + Default + PartialEq + Send {
fn init_parsing(&mut self, _cache: OptMainCache) -> FileResult<()> {
Ok(())
Expand All @@ -78,12 +110,16 @@ pub trait SourceConfigExtras: ConfigExtras {
&self,
id: String,
path: PathBuf,
validation_level: Option<ValidationLevel>,
on_invalid: Option<OnInvalid>,
) -> impl Future<Output = FileResult<TileInfoSource>> + Send;

fn new_sources_url(
&self,
id: String,
url: Url,
validation_level: Option<ValidationLevel>,
on_invalid: Option<OnInvalid>,
) -> impl Future<Output = FileResult<TileInfoSource>> + Send;
}

Expand Down Expand Up @@ -123,6 +159,8 @@ impl<T: ConfigExtras> FileConfigEnum<T> {
} else {
Some(configs)
},
validate: None,
on_invalid: None,
custom,
})
}
Expand Down Expand Up @@ -180,6 +218,10 @@ pub struct FileConfig<T> {
pub paths: OptOneMany<PathBuf>,
/// A map of source IDs to file paths or config objects
pub sources: Option<BTreeMap<String, FileConfigSrc>>,
#[serde(default)]
pub validate: Option<ValidationLevel>,
#[serde(default)]
pub on_invalid: Option<OnInvalid>,
/// Any customizations related to the specifics of the configuration section
#[serde(flatten)]
pub custom: T,
Expand Down Expand Up @@ -268,7 +310,11 @@ async fn resolve_int<T: SourceConfigExtras>(
let dup = if dup { "duplicate " } else { "" };
let id = idr.resolve(&id, url.to_string());
configs.insert(id.clone(), source);
results.push(cfg.custom.new_sources_url(id.clone(), url.clone()).await?);
results.push(
cfg.custom
.new_sources_url(id.clone(), url.clone(), cfg.validate, cfg.on_invalid)
.await?,
);
info!("Configured {dup}source {id} from {}", sanitize_url(&url));
} else {
let can = source.abs_path()?;
Expand All @@ -282,7 +328,11 @@ async fn resolve_int<T: SourceConfigExtras>(
let id = idr.resolve(&id, can.to_string_lossy().to_string());
info!("Configured {dup}source {id} from {}", can.display());
configs.insert(id.clone(), source.clone());
results.push(cfg.custom.new_sources(id, source.into_path()).await?);
results.push(
cfg.custom
.new_sources(id, source.into_path(), cfg.validate, cfg.on_invalid)
.await?,
);
}
}
}
Expand All @@ -306,7 +356,11 @@ async fn resolve_int<T: SourceConfigExtras>(

let id = idr.resolve(id, url.to_string());
configs.insert(id.clone(), FileConfigSrc::Path(path));
results.push(cfg.custom.new_sources_url(id.clone(), url.clone()).await?);
results.push(
cfg.custom
.new_sources_url(id.clone(), url.clone(), cfg.validate, cfg.on_invalid)
.await?,
);
info!("Configured source {id} from URL {}", sanitize_url(&url));
} else {
let is_dir = path.is_dir();
Expand Down Expand Up @@ -335,7 +389,11 @@ async fn resolve_int<T: SourceConfigExtras>(
info!("Configured source {id} from {}", can.display());
files.insert(can);
configs.insert(id.clone(), FileConfigSrc::Path(path.clone()));
results.push(cfg.custom.new_sources(id, path).await?);
results.push(
cfg.custom
.new_sources(id, path, cfg.validate, cfg.on_invalid)
.await?,
);
}
}
}
Expand Down
Loading
Loading