Skip to content

Commit 87efbba

Browse files
feat: add fetch module for RDF over HTTP with fallbacks
Co-authored-by: aider (openai/gpt-5) <[email protected]>
1 parent 25217e0 commit 87efbba

File tree

5 files changed

+302
-86
lines changed

5 files changed

+302
-86
lines changed

lib/src/fetch.rs

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
use crate::errors::OfflineRetrievalError;
2+
use anyhow::{anyhow, Result};
3+
use chrono::prelude::*;
4+
use oxigraph::io::RdfFormat;
5+
use reqwest::blocking::Client;
6+
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE, LINK};
7+
use std::time::Duration;
8+
9+
#[derive(Debug, Clone)]
10+
pub struct FetchOptions {
11+
pub offline: bool,
12+
pub timeout: Duration,
13+
pub accept_order: Vec<&'static str>,
14+
pub extension_candidates: Vec<&'static str>,
15+
}
16+
17+
impl Default for FetchOptions {
18+
fn default() -> Self {
19+
Self {
20+
offline: false,
21+
timeout: Duration::from_secs(30),
22+
accept_order: vec![
23+
"text/turtle",
24+
"application/rdf+xml",
25+
"application/n-triples",
26+
],
27+
extension_candidates: vec![".ttl", ".rdf", ".owl", "index.ttl", "index.rdf"],
28+
}
29+
}
30+
}
31+
32+
#[derive(Debug, Clone)]
33+
pub struct FetchResult {
34+
pub bytes: Vec<u8>,
35+
pub format: Option<RdfFormat>,
36+
pub final_url: String,
37+
pub content_type: Option<String>,
38+
}
39+
40+
fn detect_format(ct: &str) -> Option<RdfFormat> {
41+
let ct = ct.split(';').next().unwrap_or("").trim().to_ascii_lowercase();
42+
match ct.as_str() {
43+
"text/turtle" | "application/x-turtle" => Some(RdfFormat::Turtle),
44+
"application/rdf+xml" => Some(RdfFormat::RdfXml),
45+
"application/n-triples" | "application/ntriples" | "text/plain" => Some(RdfFormat::NTriples),
46+
_ => None,
47+
}
48+
}
49+
50+
fn build_accept(accept_order: &[&'static str]) -> String {
51+
if accept_order.is_empty() {
52+
return "*/*".to_string();
53+
}
54+
let mut parts = Vec::new();
55+
let mut q = 1.0f32;
56+
for (i, t) in accept_order.iter().enumerate() {
57+
parts.push(format!("{t}; q={:.1}", q));
58+
let next = 1.0f32 - 0.1f32 * (i as f32 + 1.0f32);
59+
q = if next < 0.1 { 0.1 } else { next };
60+
}
61+
parts.push("*/*; q=0.1".to_string());
62+
parts.join(", ")
63+
}
64+
65+
fn build_extension_candidates(orig: &str, exts: &[&str]) -> Vec<String> {
66+
let mut cands = Vec::new();
67+
if orig.ends_with('/') {
68+
for e in exts {
69+
cands.push(format!("{orig}{e}"));
70+
}
71+
return cands;
72+
}
73+
// split path
74+
let slash_pos = orig.rfind('/').map(|i| i + 1).unwrap_or(0);
75+
let (prefix, filename) = orig.split_at(slash_pos);
76+
if let Some(dot) = filename.rfind('.') {
77+
let stem = &filename[..dot];
78+
let base = format!("{prefix}{stem}");
79+
for rep in [".ttl", ".rdf", ".owl"] {
80+
cands.push(format!("{base}{rep}"));
81+
}
82+
} else {
83+
for rep in [".ttl", ".rdf", ".owl"] {
84+
cands.push(format!("{orig}{rep}"));
85+
}
86+
}
87+
cands
88+
}
89+
90+
fn parse_link_alternates(headers: &HeaderMap, accept_order: &[&'static str]) -> Vec<String> {
91+
let mut out = Vec::new();
92+
if let Some(link_val) = headers.get(LINK) {
93+
if let Ok(link_str) = link_val.to_str() {
94+
for part in link_str.split(',') {
95+
let part = part.trim();
96+
if !part.contains("rel=\"alternate\"") {
97+
continue;
98+
}
99+
// Try to extract type and URL
100+
let has_rdf_type = accept_order
101+
.iter()
102+
.any(|typ| part.contains(&format!("type=\"{}\"", typ)));
103+
if !has_rdf_type {
104+
continue;
105+
}
106+
if let Some(start) = part.find('<') {
107+
if let Some(end) = part[start + 1..].find('>') {
108+
let url = &part[start + 1..start + 1 + end];
109+
out.push(url.to_string());
110+
}
111+
}
112+
}
113+
}
114+
}
115+
out
116+
}
117+
118+
fn try_get(
119+
url: &str,
120+
client: &Client,
121+
accept: &str,
122+
) -> Result<(Vec<u8>, Option<String>, Option<String>, String, reqwest::StatusCode)> {
123+
let resp = client.get(url).header(ACCEPT, accept).send()?;
124+
let status = resp.status();
125+
let final_url = resp.url().to_string();
126+
let ct = resp
127+
.headers()
128+
.get(CONTENT_TYPE)
129+
.and_then(|h| h.to_str().ok())
130+
.map(|s| s.to_string());
131+
let link = resp
132+
.headers()
133+
.get(LINK)
134+
.and_then(|h| h.to_str().ok())
135+
.map(|s| s.to_string());
136+
let bytes = resp.bytes()?.to_vec();
137+
Ok((bytes, ct, link, final_url, status))
138+
}
139+
140+
pub fn fetch_rdf(url: &str, opts: &FetchOptions) -> Result<FetchResult> {
141+
if opts.offline {
142+
return Err(anyhow!(OfflineRetrievalError {
143+
file: url.to_string()
144+
}));
145+
}
146+
let client = Client::builder().timeout(opts.timeout).build()?;
147+
let accept = build_accept(&opts.accept_order);
148+
149+
// First attempt
150+
let (bytes, ct, link, final_url, status) = try_get(url, &client, &accept)?;
151+
152+
// If success and looks RDF by Content-Type, return
153+
if status.is_success() {
154+
if let Some(ref cts) = ct {
155+
if let Some(fmt) = detect_format(cts) {
156+
return Ok(FetchResult {
157+
bytes,
158+
format: Some(fmt),
159+
final_url,
160+
content_type: ct,
161+
});
162+
}
163+
}
164+
// Unknown or HTML content-type: fall through to alternates with Link header hints
165+
}
166+
167+
// Try Link: rel="alternate" first if present
168+
if let Some(link_header) = link {
169+
let mut headers = HeaderMap::new();
170+
headers.insert(
171+
LINK,
172+
HeaderValue::from_str(&link_header).unwrap_or(HeaderValue::from_static("")),
173+
);
174+
for alt in parse_link_alternates(&headers, &opts.accept_order) {
175+
let (b2, ct2, _link2, fu2, st2) = try_get(&alt, &client, &accept)?;
176+
if st2.is_success() {
177+
let fmt = ct2.as_deref().and_then(detect_format);
178+
return Ok(FetchResult {
179+
bytes: b2,
180+
format: fmt,
181+
final_url: fu2,
182+
content_type: ct2,
183+
});
184+
}
185+
}
186+
}
187+
188+
// Status-based or type-based fallbacks
189+
if !status.is_success() || ct.as_deref().map(|s| s.contains("html")).unwrap_or(true) {
190+
for candidate in build_extension_candidates(url, &opts.extension_candidates) {
191+
let (b2, ct2, _link2, fu2, st2) = try_get(&candidate, &client, &accept)?;
192+
if st2.is_success() {
193+
let fmt = ct2.as_deref().and_then(detect_format);
194+
return Ok(FetchResult {
195+
bytes: b2,
196+
format: fmt,
197+
final_url: fu2,
198+
content_type: ct2,
199+
});
200+
}
201+
}
202+
}
203+
204+
// As a last resort, if the original was successful but with unknown CT, return it.
205+
if status.is_success() {
206+
let fmt = ct.as_deref().and_then(detect_format);
207+
return Ok(FetchResult {
208+
bytes,
209+
format: fmt,
210+
final_url,
211+
content_type: ct,
212+
});
213+
}
214+
215+
Err(anyhow!(
216+
"Failed to retrieve RDF from {} (HTTP {}) and fallbacks",
217+
url,
218+
status
219+
))
220+
}
221+
222+
pub fn head_last_modified(url: &str, opts: &FetchOptions) -> Result<Option<DateTime<Utc>>> {
223+
if opts.offline {
224+
return Err(anyhow!(OfflineRetrievalError {
225+
file: url.to_string()
226+
}));
227+
}
228+
let client = Client::builder().timeout(opts.timeout).build()?;
229+
let accept = build_accept(&opts.accept_order);
230+
let resp = client.head(url).header(ACCEPT, accept).send()?;
231+
if !resp.status().is_success() {
232+
return Ok(None);
233+
}
234+
if let Some(h) = resp.headers().get("Last-Modified") {
235+
if let Ok(s) = h.to_str() {
236+
if let Ok(dt) = DateTime::parse_from_rfc2822(s) {
237+
return Ok(Some(dt.with_timezone(&Utc)));
238+
}
239+
}
240+
}
241+
Ok(None)
242+
}
243+
244+
pub fn head_exists(url: &str, opts: &FetchOptions) -> Result<bool> {
245+
if opts.offline {
246+
return Err(anyhow!(OfflineRetrievalError {
247+
file: url.to_string()
248+
}));
249+
}
250+
let client = Client::builder().timeout(opts.timeout).build()?;
251+
let accept = build_accept(&opts.accept_order);
252+
let resp = client.head(url).header(ACCEPT, accept).send()?;
253+
Ok(resp.status().is_success())
254+
}

lib/src/io.rs

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,30 @@ pub struct StoreStats {
2727
pub num_triples: usize,
2828
}
2929

30+
fn load_staging_store_from_bytes(bytes: &[u8], preferred: Option<RdfFormat>) -> Result<Store> {
31+
// Try preferred first, then fall back to other formats with a fresh store each time
32+
let mut candidates = vec![RdfFormat::Turtle, RdfFormat::RdfXml, RdfFormat::NTriples];
33+
if let Some(p) = preferred {
34+
candidates.retain(|f| *f != p);
35+
candidates.insert(0, p);
36+
}
37+
for fmt in candidates {
38+
let store = Store::new()?;
39+
let staging_graph = NamedNode::new_unchecked("temp:graph");
40+
let parser = RdfParser::from_format(fmt)
41+
.with_default_graph(GraphNameRef::NamedNode(staging_graph.as_ref()))
42+
.without_named_graphs();
43+
if store
44+
.bulk_loader()
45+
.load_from_reader(parser, std::io::Cursor::new(bytes))
46+
.is_ok()
47+
{
48+
return Ok(store);
49+
}
50+
}
51+
Err(anyhow!("Failed to parse RDF bytes in any supported format"))
52+
}
53+
3054
/// A helper function to read an ontology from a location, add it to a store,
3155
/// and return the parsed ontology metadata. This is used by multiple GraphIO implementations.
3256
fn add_ontology_to_store(
@@ -36,27 +60,23 @@ fn add_ontology_to_store(
3660
offline: bool,
3761
strict: bool,
3862
) -> Result<Ontology> {
39-
// Read source into bytes and detect format
40-
let (bytes, format) = match &location {
41-
OntologyLocation::File(path) => get_file_contents(path)?,
63+
// Read source into bytes (via fetch for URLs) and load into a staging store with format fallback
64+
let t0 = Instant::now();
65+
let staging_graph = NamedNode::new_unchecked("temp:graph");
66+
let tmp_store = match &location {
67+
OntologyLocation::File(path) => {
68+
let (bytes, format) = get_file_contents(path)?;
69+
load_staging_store_from_bytes(&bytes, format)?
70+
}
4271
OntologyLocation::Url(url) => {
4372
if offline {
4473
return Err(Error::new(OfflineRetrievalError { file: url.clone() }));
4574
}
46-
get_url_contents(url.as_str())?
75+
let opts = crate::fetch::FetchOptions::default();
76+
let fetched = crate::fetch::fetch_rdf(url.as_str(), &opts)?;
77+
load_staging_store_from_bytes(&fetched.bytes, fetched.format)?
4778
}
4879
};
49-
50-
// Parse once into a temporary, isolated store to discover ontology metadata
51-
let tmp_store = Store::new()?;
52-
let staging_graph = NamedNode::new_unchecked("temp:graph");
53-
let parser = RdfParser::from_format(format.unwrap_or(RdfFormat::Turtle))
54-
.with_default_graph(GraphNameRef::NamedNode(staging_graph.as_ref()))
55-
.without_named_graphs();
56-
let t0 = Instant::now();
57-
tmp_store
58-
.bulk_loader()
59-
.load_from_reader(parser, bytes.as_slice())?;
6080
debug!(
6181
"Loaded {} into staging store in {:?}",
6282
location.as_str(),
@@ -180,14 +200,9 @@ pub trait GraphIO: Send + Sync {
180200
modified
181201
}
182202
OntologyLocation::Url(url) => {
183-
let response = reqwest::blocking::Client::new().head(url).send()?;
184-
let url_last_modified = response.headers().get("Last-Modified");
185-
match url_last_modified {
186-
Some(date) => {
187-
let date = date.to_str()?;
188-
let date = DateTime::parse_from_rfc2822(date)?;
189-
date.with_timezone(&Utc)
190-
}
203+
let opts = crate::fetch::FetchOptions::default();
204+
match crate::fetch::head_last_modified(url, &opts)? {
205+
Some(dt) => dt,
191206
None => Utc::now(),
192207
}
193208
}

lib/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub mod io;
101101
pub mod ontology;
102102
pub mod options;
103103
pub mod policy;
104+
pub mod fetch;
104105
#[macro_use]
105106
pub mod util;
106107
pub mod transform;

lib/src/ontology.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,12 +276,8 @@ impl Ontology {
276276
match &self.location {
277277
Some(OntologyLocation::File(p)) => p.exists(),
278278
Some(OntologyLocation::Url(u)) => {
279-
// check if the URL is reachable
280-
let res = reqwest::blocking::get(u);
281-
match res {
282-
Ok(r) => r.status().is_success(),
283-
Err(_) => false,
284-
}
279+
let opts = crate::fetch::FetchOptions::default();
280+
crate::fetch::head_exists(u, &opts).unwrap_or(false)
285281
}
286282
None => false,
287283
}

0 commit comments

Comments
 (0)