-
Notifications
You must be signed in to change notification settings - Fork 2
Pleiades as a Source #241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Pleiades as a Source #241
Changes from 3 commits
9a60cf4
84b0cd8
f1dab78
ef7dc71
ef2cfc1
ae797d5
3b12edb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| import os | ||
| from pipeline.process.base.downloader import BaseDownloader | ||
|
|
||
| class PleiadesDownloader(BaseDownloader): | ||
| """ | ||
| Types url: https://atlantides.org/downloads/pleiades/rdf/place-types.ttl | ||
| Places url: https://atlantides.org/downloads/pleiades/json/pleiades-places-latest.json.gz | ||
| """ | ||
| def get_urls(self): | ||
| place_types_url = self.config['input_files']["records"][0]['url'] | ||
| places_url = self.config['input_files']["records"][1]['url'] | ||
| dumps_dir = self.config['dumps_dir'] | ||
| place_types_path = os.path.join(dumps_dir, place_types_url.rsplit('/')[-1]) | ||
| places_path = os.path.join(dumps_dir, places_url.rsplit('/')[-1]) | ||
| return [{"url": place_types_url, "path": place_types_path}, {"url": places_url, "path": places_path}] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,205 @@ | ||
| # from lux_pipeline.process.base.mapper import Mapper | ||
| from pipeline.process.base.mapper import Mapper | ||
| from cromulent import model, vocab | ||
| import re | ||
|
|
||
| class PleiadesMapper(Mapper): | ||
| def __init__(self, config): | ||
| Mapper.__init__(self, config) | ||
| self.factory.auto_assign_id = False | ||
| # pass | ||
|
|
||
| def guess_type(self, data): | ||
| # Check the @type field first | ||
| if data.get("@type") == "Concept": | ||
| return model.Type | ||
| # Default to Place for everything else, including records with placeTypeURIs | ||
| return model.Place | ||
|
|
||
| def geojson_to_wkt(self, geom): | ||
| """Convert a GeoJSON geometry dict to a WKT string.""" | ||
| t = geom.get("type") | ||
| coords = geom.get("coordinates") | ||
| if t == "Point": | ||
| return f"POINT ({coords[0]} {coords[1]})" | ||
| elif t == "Polygon": | ||
| # Polygon: list of linear rings (first is exterior) | ||
| rings = [] | ||
| for ring in coords: | ||
| rings.append(", ".join(f"{x} {y}" for x, y in ring)) | ||
| return f"POLYGON (({rings[0]}))" | ||
| elif t == "MultiPolygon": | ||
| polys = [] | ||
| for poly in coords: | ||
| rings = [] | ||
| for ring in poly: | ||
| rings.append(", ".join(f"{x} {y}" for x, y in ring)) | ||
| polys.append(f"(({rings[0]}))") | ||
| return f"MULTIPOLYGON ({', '.join(polys)})" | ||
| # Add more types as needed | ||
| return None | ||
|
|
||
| def bbox_to_wkt(self, bbox): | ||
| """Convert a bounding box [minLon, minLat, maxLon, maxLat] to a WKT Polygon string.""" | ||
| minx, miny, maxx, maxy = bbox | ||
| # Polygon: lower left, lower right, upper right, upper left, close | ||
| return ( | ||
| f"POLYGON (({minx} {miny}, {maxx} {miny}, {maxx} {maxy}, {minx} {maxy}, {minx} {miny}))" | ||
| ) | ||
|
|
||
| def parse_types(self, ttl_section): | ||
| """Parse a TTL section for a place type concept and create a Linked Art record.""" | ||
| # Extract the URI from the first line | ||
| uri_match = re.search(r'<https://pleiades\.stoa\.org/vocabularies/([^>]+)>', ttl_section) | ||
| if not uri_match: | ||
| return None | ||
|
|
||
| concept_id = uri_match.group(1) | ||
| uri = f"https://pleiades.stoa.org/vocabularies/{concept_id}" | ||
|
|
||
| # Extract prefLabel | ||
| label_match = re.search(r'skos:prefLabel "([^"]+)"@en', ttl_section) | ||
|
||
| if not label_match: | ||
| return None | ||
|
|
||
| label = label_match.group(1) | ||
|
|
||
| # Extract scopeNote (description) | ||
| scope_match = re.search(r'skos:scopeNote "([^"]+)"@en', ttl_section) | ||
|
||
| description = scope_match.group(1) if scope_match else None | ||
|
|
||
| # Extract all owl:sameAs URIs (handling multiline format) | ||
| # First find the owl:sameAs section | ||
| same_as_match = re.search(r'owl:sameAs\s+(.+?);', ttl_section, re.DOTALL) | ||
| same_as_matches = [] | ||
| if same_as_match: | ||
| same_as_section = same_as_match.group(1) | ||
| # Extract all URIs from this section | ||
| same_as_matches = re.findall(r'<([^>]+)>', same_as_section) | ||
|
|
||
| # Create the Type record | ||
| top = model.Type(ident=uri) | ||
|
|
||
| # Add primary name | ||
| primary_name = vocab.PrimaryName(content=label) | ||
| primary_name.language = model.Language(ident="http://vocab.getty.edu/aat/300388277") # English | ||
| top.identified_by = primary_name | ||
|
|
||
| # Add description if present | ||
| if description: | ||
| desc = vocab.Description(content=description) | ||
| desc.language = model.Language(ident="http://vocab.getty.edu/aat/300388277") # English | ||
| top.referred_to_by = desc | ||
|
|
||
| # Add equivalents from owl:sameAs statements | ||
| for same_as_uri in same_as_matches: | ||
| # Skip self-references | ||
| if same_as_uri != uri: | ||
| top.equivalent = model.Type(ident=same_as_uri) | ||
|
|
||
|
|
||
| data = model.factory.toJSON(top) | ||
| return {"identifier": concept_id, "data": data, "source": "pleiades"} | ||
|
|
||
| def parse_place(self, record): | ||
| rec = record["data"] | ||
| recid = record["identifier"] | ||
|
|
||
| # Handle places | ||
| top = model.Place(ident=rec["uri"]) | ||
| # Collect all available names | ||
| all_names = [] | ||
| title = rec.get("title") | ||
| if title: | ||
| all_names.append({"content": title, "language": "en", "source": "title"}) | ||
|
|
||
| # Add names from the names array | ||
| names = rec.get("names", []) | ||
| for n in names: | ||
| if "attested" in n and n["attested"]: | ||
| lang = n.get("language", "en") | ||
| all_names.append({"content": n["attested"], "language": lang, "source": "names"}) | ||
|
|
||
| if not all_names: | ||
| return None | ||
|
|
||
| # Assign first name as primary | ||
| primary_name_data = all_names[0] | ||
| primary_name = vocab.PrimaryName(content=primary_name_data["content"]) | ||
| if primary_name_data["language"] == "en": | ||
| primary_name.language = model.Language(ident="http://vocab.getty.edu/aat/300388277") # English | ||
| else: | ||
| primary_name.language = model.Language(label=primary_name_data["language"]) | ||
|
||
| top.identified_by = primary_name | ||
|
|
||
| # Add remaining names as alternate names | ||
| for name_data in all_names[1:]: | ||
| alt_name = vocab.AlternateName(content=name_data["content"]) | ||
| if name_data["language"] == "en": | ||
| alt_name.language = model.Language(ident="http://vocab.getty.edu/aat/300388277") # English | ||
| else: | ||
| alt_name.language = model.Language(label=name_data["language"]) | ||
|
||
| top.identified_by = alt_name | ||
|
|
||
| # Add description | ||
| if "description" in rec: | ||
| desc = vocab.Description(content=rec["description"]) | ||
| # Add English language | ||
| desc.language = model.Language(ident="http://vocab.getty.edu/aat/300388277") # English | ||
| top.referred_to_by = desc | ||
|
|
||
| # Add place types using placeTypeURIs directly | ||
| if "placeTypeURIs" in rec: | ||
| for pt_uri in rec["placeTypeURIs"]: | ||
| top.classified_as = model.Type(ident=pt_uri) | ||
|
|
||
| # Add geospatial data: geometry, bbox, reprPoint (in that order) | ||
| wkt = None | ||
| if "geometry" in rec and rec["geometry"]: | ||
| wkt = self.geojson_to_wkt(rec["geometry"]) | ||
| elif "bbox" in rec and rec["bbox"]: | ||
| wkt = self.bbox_to_wkt(rec["bbox"]) | ||
| elif "boundingBox" in rec and rec["boundingBox"]: | ||
| wkt = self.bbox_to_wkt(rec["boundingBox"]) | ||
| elif "reprPoint" in rec and rec["reprPoint"]: | ||
| coords = rec["reprPoint"] | ||
| if len(coords) >= 2: | ||
| wkt = f"POINT ({coords[0]} {coords[1]})" | ||
| if wkt: | ||
| top.defined_by = wkt | ||
|
|
||
| # Add part_of relationships | ||
| if "connections" in rec and rec["connections"]: | ||
| for conn in rec["connections"]: | ||
| #https://pleiades.stoa.org/vocabularies/relationship-types | ||
| if conn.get("connectionType") in [ | ||
| "part_of_physical", | ||
| "part_of_admin", | ||
| "part_of_regional", | ||
| "located_in", | ||
| "in_territory_of", | ||
| "located_at", | ||
| "port_of", | ||
| "member_of", | ||
| "part_of_analytical", | ||
| "capital_of" | ||
| ]: | ||
| related = model.Place(ident=conn["connectsTo"]) | ||
| if "title" in conn: | ||
| related._label = conn["title"] | ||
| top.part_of = related | ||
| if "references" in rec and rec["references"]: | ||
| for ref in rec["references"]: | ||
| if "https://www.wikidata.org/wiki" in ref["accessURI"]: | ||
| top.equivalent = model.Place(ident=ref["accessURI"]) | ||
| data = model.factory.toJSON(top) | ||
| return {"identifier": recid, "data": data, "source": "pleiades"} | ||
|
|
||
| def transform(self, record, rectype, reference=False): | ||
|
|
||
|
||
| if rectype == "Place": | ||
| return self.parse_place(record) | ||
| elif rectype == "Type": | ||
| return self.parse_concept(record) | ||
| else: | ||
| return None | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure about this? Could be [1] [0]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double checked a week ago and it's right, I believe.