Skip to content

Commit f969a43

Browse files
committed
cleaning up batching
1 parent 5207c2f commit f969a43

File tree

3 files changed

+153
-102
lines changed

3 files changed

+153
-102
lines changed

lib/src/api.rs

Lines changed: 125 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ pub struct OntoEnv {
9999
dependency_graph: DiGraph<GraphIdentifier, (), petgraph::Directed>,
100100
config: Config,
101101
failed_resolutions: HashSet<NamedNode>,
102-
fetched_in_session: HashSet<OntologyLocation>,
103-
io_batch_depth: usize,
102+
batch_state: BatchState,
104103
}
105104

106105
impl std::fmt::Debug for OntoEnv {
@@ -116,18 +115,45 @@ impl std::fmt::Debug for OntoEnv {
116115
}
117116
}
118117

118+
#[derive(Default)]
119+
struct BatchState {
120+
depth: usize,
121+
seen_locations: HashSet<OntologyLocation>,
122+
}
123+
124+
impl BatchState {
125+
fn begin(&mut self) {
126+
if self.depth == 0 {
127+
self.seen_locations.clear();
128+
}
129+
self.depth += 1;
130+
}
131+
132+
fn end(&mut self) {
133+
self.depth = self.depth.saturating_sub(1);
134+
}
135+
136+
fn has_seen(&self, location: &OntologyLocation) -> bool {
137+
self.seen_locations.contains(location)
138+
}
139+
140+
fn mark_seen(&mut self, location: &OntologyLocation) {
141+
self.seen_locations.insert(location.clone());
142+
}
143+
}
144+
119145
struct BatchScope<'a> {
120146
env: &'a mut OntoEnv,
121147
completed: bool,
122148
}
123149

124150
impl<'a> BatchScope<'a> {
125151
fn enter(env: &'a mut OntoEnv) -> Result<Self> {
126-
if env.io_batch_depth == 0 {
127-
env.fetched_in_session.clear();
152+
env.batch_state.begin();
153+
if let Err(err) = env.io.begin_batch() {
154+
env.batch_state.end();
155+
return Err(err);
128156
}
129-
env.io_batch_depth += 1;
130-
env.io.begin_batch()?;
131157
Ok(Self {
132158
env,
133159
completed: false,
@@ -137,7 +163,7 @@ impl<'a> BatchScope<'a> {
137163
fn run<T>(mut self, f: impl FnOnce(&mut OntoEnv) -> Result<T>) -> Result<T> {
138164
let result = f(self.env);
139165
let end_result = self.env.io.end_batch();
140-
self.env.io_batch_depth = self.env.io_batch_depth.saturating_sub(1);
166+
self.env.batch_state.end();
141167
self.completed = true;
142168
match (result, end_result) {
143169
(Ok(value), Ok(())) => Ok(value),
@@ -159,10 +185,15 @@ impl<'a> Drop for BatchScope<'a> {
159185
if let Err(err) = self.env.io.end_batch() {
160186
error!("Failed to finalize batched RDF write: {err}");
161187
}
162-
self.env.io_batch_depth = self.env.io_batch_depth.saturating_sub(1);
188+
self.env.batch_state.end();
163189
}
164190
}
165191

192+
enum FetchOutcome {
193+
Reused(GraphIdentifier),
194+
Loaded(Ontology),
195+
}
196+
166197
impl OntoEnv {
167198
fn new(env: Environment, io: Box<dyn GraphIO>, config: Config) -> Self {
168199
Self {
@@ -171,8 +202,7 @@ impl OntoEnv {
171202
config,
172203
dependency_graph: DiGraph::new(),
173204
failed_resolutions: HashSet::new(),
174-
fetched_in_session: HashSet::new(),
175-
io_batch_depth: 0,
205+
batch_state: BatchState::default(),
176206
}
177207
}
178208

@@ -437,8 +467,7 @@ impl OntoEnv {
437467
config,
438468
dependency_graph,
439469
failed_resolutions: HashSet::new(),
440-
fetched_in_session: HashSet::new(),
441-
io_batch_depth: 0,
470+
batch_state: BatchState::default(),
442471
})
443472
}
444473

@@ -538,8 +567,7 @@ impl OntoEnv {
538567
dependency_graph: DiGraph::new(),
539568
config,
540569
failed_resolutions: HashSet::new(),
541-
fetched_in_session: HashSet::new(),
542-
io_batch_depth: 0,
570+
batch_state: BatchState::default(),
543571
};
544572

545573
let _ = ontoenv.update_all(false)?;
@@ -599,6 +627,28 @@ impl OntoEnv {
599627
})
600628
}
601629

630+
fn fetch_location(
631+
&mut self,
632+
location: OntologyLocation,
633+
overwrite: Overwrite,
634+
refresh: RefreshStrategy,
635+
) -> Result<FetchOutcome> {
636+
if let Some(existing_id) = self.try_reuse_cached(&location, refresh)? {
637+
self.batch_state.mark_seen(&location);
638+
return Ok(FetchOutcome::Reused(existing_id));
639+
}
640+
641+
if !refresh.is_force() && self.batch_state.has_seen(&location) {
642+
if let Some(existing) = self.env.get_ontology_by_location(&location) {
643+
return Ok(FetchOutcome::Reused(existing.id().clone()));
644+
}
645+
}
646+
647+
let ontology = self.io.add(location.clone(), overwrite)?;
648+
self.batch_state.mark_seen(&location);
649+
Ok(FetchOutcome::Loaded(ontology))
650+
}
651+
602652
fn register_ontologies(
603653
&mut self,
604654
ontologies: Vec<Ontology>,
@@ -607,7 +657,7 @@ impl OntoEnv {
607657
let mut ids = Vec::with_capacity(ontologies.len());
608658
for ontology in ontologies {
609659
let id = ontology.id().clone();
610-
self.env.add_ontology(ontology);
660+
self.env.add_ontology(ontology)?;
611661
ids.push(id);
612662
}
613663

@@ -628,33 +678,19 @@ impl OntoEnv {
628678
) -> Result<GraphIdentifier> {
629679
self.failed_resolutions.clear();
630680

631-
if let Some(existing_id) = self.try_reuse_cached(&location, refresh)? {
632-
debug!(
633-
"Reusing cached ontology {} for location {}",
634-
existing_id, location
635-
);
636-
self.fetched_in_session.insert(location);
637-
return Ok(existing_id);
638-
}
639-
640-
if !refresh.is_force() && self.fetched_in_session.contains(&location) {
641-
if let Some(existing) = self.env.get_ontology_by_location(&location) {
642-
debug!(
643-
"Skipping refetch for {} (already loaded this run)",
644-
location
645-
);
646-
return Ok(existing.id().clone());
681+
match self.fetch_location(location.clone(), overwrite, refresh)? {
682+
FetchOutcome::Reused(id) => {
683+
debug!("Reusing cached ontology {} for location {}", id, location);
684+
Ok(id)
685+
}
686+
FetchOutcome::Loaded(ont) => {
687+
let ids = self.register_ontologies(vec![ont], update_dependencies)?;
688+
Ok(ids
689+
.into_iter()
690+
.next()
691+
.expect("registered ontology list should contain new entry"))
647692
}
648693
}
649-
650-
let ont = self.io.add(location.clone(), overwrite)?;
651-
let ids = self.register_ontologies(vec![ont], update_dependencies)?;
652-
let id = ids
653-
.into_iter()
654-
.next()
655-
.expect("registered ontology list should contain new entry");
656-
self.fetched_in_session.insert(location);
657-
Ok(id)
658694
}
659695

660696
fn try_reuse_cached(
@@ -729,53 +765,10 @@ impl OntoEnv {
729765

730766
fn update_all_inner(&mut self, all: bool) -> Result<Vec<GraphIdentifier>> {
731767
self.failed_resolutions.clear();
732-
// remove ontologies which are no longer present in the search directories
733-
// remove ontologies which are no longer present in the search directories
734-
for graphid in self.missing_ontologies() {
735-
self.io.remove(&graphid)?;
736-
self.env.remove_ontology(&graphid);
737-
}
738-
739-
// now, find all the new and updated ontologies in the search directories
740-
// and add them to the environment
741-
let updated_files: Vec<OntologyLocation> = if all {
742-
let mut set: HashSet<OntologyLocation> = self
743-
.env
744-
.ontologies()
745-
.values()
746-
.filter_map(|o| o.location().cloned())
747-
.collect();
748-
for loc in self.find_files()? {
749-
set.insert(loc);
750-
}
751-
set.into_iter().collect()
752-
} else {
753-
self.get_updated_locations()?
754-
};
768+
self.remove_missing_ontologies()?;
755769

756-
// load all of these files into the environment
757-
let mut ontologies: Vec<Ontology> = vec![];
758-
for location in updated_files {
759-
// if 'strict' mode then fail on any errors when adding the ontology
760-
// otherwise just warn
761-
762-
let result = self.io.add(location.clone(), Overwrite::Allow);
763-
if result.is_err() {
764-
if self.config.strict {
765-
return Err(result.unwrap_err());
766-
} else {
767-
warn!(
768-
"Failed to read ontology file {}: {}",
769-
location,
770-
result.unwrap_err()
771-
);
772-
continue;
773-
}
774-
}
775-
776-
let new_ont = result.unwrap();
777-
ontologies.push(new_ont);
778-
}
770+
let updated_files = self.collect_updated_files(all)?;
771+
let ontologies = self.load_updated_ontologies(updated_files)?;
779772

780773
let update_ids = self.register_ontologies(ontologies, true)?;
781774
Ok(update_ids)
@@ -842,6 +835,51 @@ impl OntoEnv {
842835
.collect()
843836
}
844837

838+
fn remove_missing_ontologies(&mut self) -> Result<()> {
839+
for graphid in self.missing_ontologies() {
840+
self.io.remove(&graphid)?;
841+
self.env.remove_ontology(&graphid)?;
842+
}
843+
Ok(())
844+
}
845+
846+
fn collect_updated_files(&mut self, all: bool) -> Result<Vec<OntologyLocation>> {
847+
if all {
848+
let mut set: HashSet<OntologyLocation> = self
849+
.env
850+
.ontologies()
851+
.values()
852+
.filter_map(|o| o.location().cloned())
853+
.collect();
854+
for loc in self.find_files()? {
855+
set.insert(loc);
856+
}
857+
Ok(set.into_iter().collect())
858+
} else {
859+
self.get_updated_locations()
860+
}
861+
}
862+
863+
fn load_updated_ontologies(
864+
&mut self,
865+
updated_files: Vec<OntologyLocation>,
866+
) -> Result<Vec<Ontology>> {
867+
let mut ontologies = Vec::with_capacity(updated_files.len());
868+
for location in updated_files {
869+
match self.io.add(location.clone(), Overwrite::Allow) {
870+
Ok(ont) => ontologies.push(ont),
871+
Err(err) => {
872+
if self.config.strict {
873+
return Err(err);
874+
} else {
875+
warn!("Failed to read ontology file {}: {}", location, err);
876+
}
877+
}
878+
}
879+
}
880+
Ok(ontologies)
881+
}
882+
845883
/// Returns a list of all files in the environment which have been updated (added or changed)
846884
/// Does not return files that have been removed
847885
pub fn get_updated_locations(&self) -> Result<Vec<OntologyLocation>> {
@@ -998,7 +1036,7 @@ impl OntoEnv {
9981036
match self.io.add(location, Overwrite::Preserve) {
9991037
Ok(new_ont) => {
10001038
let id = new_ont.id().clone();
1001-
self.env.add_ontology(new_ont);
1039+
self.env.add_ontology(new_ont)?;
10021040
stack.push_back(id);
10031041
}
10041042
Err(e) => {

lib/src/config.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use serde::{Deserialize, Serialize};
99
use std::io::{BufReader, Write};
1010
use std::path::{Path, PathBuf};
1111

12+
const DEFAULT_INCLUDE_PATTERNS: &[&str] = &["*.ttl", "*.xml", "*.n3"];
13+
1214
fn vec_pattern_ser<S>(patterns: &Vec<Pattern>, serializer: S) -> Result<S::Ok, S::Error>
1315
where
1416
S: serde::Serializer,
@@ -304,9 +306,12 @@ impl ConfigBuilder {
304306
self.locations.unwrap_or_else(|| vec![root.clone()])
305307
};
306308

307-
let includes_str = self
308-
.includes
309-
.unwrap_or_else(|| vec!["*.ttl".to_string(), "*.xml".to_string(), "*.n3".to_string()]);
309+
let includes_str = self.includes.unwrap_or_else(|| {
310+
DEFAULT_INCLUDE_PATTERNS
311+
.iter()
312+
.map(|s| s.to_string())
313+
.collect()
314+
});
310315
let excludes_str = self.excludes.unwrap_or_default();
311316

312317
let includes = includes_str

lib/src/environment.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
use crate::io::GraphIO;
55
use crate::ontology::{GraphIdentifier, Ontology, OntologyLocation};
66
use crate::policy;
7-
use anyhow::Result;
7+
use anyhow::{anyhow, Result};
88
use chrono::prelude::*;
9+
use log::warn;
910
use oxigraph::model::{Graph, NamedNodeRef};
1011
use serde::{Deserialize, Serialize};
1112
use std::collections::HashMap;
@@ -84,17 +85,26 @@ impl Environment {
8485
&self.ontologies
8586
}
8687

87-
pub fn add_ontology(&mut self, mut ontology: Ontology) {
88+
pub fn add_ontology(&mut self, mut ontology: Ontology) -> Result<()> {
8889
ontology.last_updated = Some(Utc::now());
89-
self.locations
90-
.insert(ontology.location().unwrap().clone(), ontology.id().clone());
90+
let location = ontology
91+
.location()
92+
.cloned()
93+
.ok_or_else(|| anyhow!("Cannot add ontology {} without a location", ontology.id()))?;
94+
self.locations.insert(location, ontology.id().clone());
9195
self.ontologies.insert(ontology.id().clone(), ontology);
96+
Ok(())
9297
}
9398

94-
pub fn remove_ontology(&mut self, id: &GraphIdentifier) -> Option<Ontology> {
95-
self.locations
96-
.remove(self.ontologies.get(id)?.location().unwrap());
97-
self.ontologies.remove(id)
99+
pub fn remove_ontology(&mut self, id: &GraphIdentifier) -> Result<Option<Ontology>> {
100+
if let Some(existing) = self.ontologies.get(id) {
101+
if let Some(location) = existing.location() {
102+
self.locations.remove(location);
103+
} else {
104+
warn!("Removing ontology {} without recorded location", id);
105+
}
106+
}
107+
Ok(self.ontologies.remove(id))
98108
}
99109

100110
pub fn get_modified_time(&self, id: &GraphIdentifier) -> Option<DateTime<Utc>> {
@@ -145,9 +155,7 @@ impl Environment {
145155

146156
/// Returns the first ontology with the given location
147157
pub fn get_ontology_by_location(&self, location: &OntologyLocation) -> Option<&Ontology> {
148-
// choose the first ontology with the given location
149-
self.ontologies
150-
.values()
151-
.find(|&ontology| ontology.location() == Some(location))
158+
let id = self.locations.get(location)?;
159+
self.ontologies.get(id)
152160
}
153161
}

0 commit comments

Comments
 (0)