Skip to content

ubffm/uri-resolver

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

URI Resolver (Wikidata, GND, …)

Ein hochparalleler, Redis-gestützter URI-Resolver für Authority-Daten (z. B. Wikidata, GND), konzipiert für den Einsatz in ETL-Pipelines mit Luigi und mehreren Worker-Prozessen.


Features

  • SPARQL-Abfrage gegen Wikidata
  • SPARQL-Abfrage gegen GND
  • Einheitliches Pydantic-Domain-Modell (Pydantic v2)
  • Redis Shared Cache (prozessübergreifend)
  • Distributed Locking (Stampede-Schutz)
  • Force-Refresh (force_load=True)
  • Batch-API (resolve_many)
  • Async-first, aber Luigi-kompatibel
  • TTL-basiertes Caching
  • Namespace-Versionierung für Cache-Invalidierung

Architektur

Luigi Worker
    ↓
UriResolver
    ↓
Redis Cache (Shared)
    ↓
Redis Lock (Singleflight)
    ↓
Provider
   ├── Wikidata (SPARQL)
   ├── GND (SPARQL SELECT)
   └── OCLC (JSON Download)

Architekturprinzipien

  • Cache-Aside
  • Distributed Singleflight
  • Idempotente Auflösung
  • Provider-Plugin-System
  • Authority-agnostisches Domain-Modell

Installation

pip install "pydantic>=2" "redis>=5" httpx rdflib luigi

Anforderungen:

  • Python >= 3.12
  • Redis Server (z. B. redis://localhost:6379/0)

Domain-Modell

Zentrales Modell:

class ResolvedEntity(BaseModel):
    uri: HttpUrl
    canonical_uri: HttpUrl
    labels: dict[str, list[str]]
    alt_labels: dict[str, list[str]]
    descriptions: dict[str, list[str]]
    professions: dict[str, list[str]]
    types: list[str]
    same_as: list[SameAs]
    subject_categories: list[str]
    date_of_birth: datetime | None
    date_of_death: datetime | None
    provider: str
    retrieved_at: datetime

Labels

  • labels: bevorzugte Labels pro Sprache
  • alt_labels: alternative Labels pro Sprache
  • Deduplicated
  • Normalisiert

Beschreibungen

  • descriptions: mehrsprachige beschreibende Freitexte analog zu labels
  • Deduplicated
  • Normalisiert

Berufe / Tätigkeiten

  • professions: mehrsprachige Berufsnennungen analog zu labels
  • aktuell insbesondere vom GND-Provider aus gndo:professionOrOccupation extrahiert
  • Deduplicated
  • Normalisiert

Typen

  • types: eindeutige Liste von Typ-Bezeichnern (z. B. "Person")
  • Reihenfolge entspricht der Quelle

same_as

Normalisiert auf URI-Ebene.

Unterstützte Schemes:

  • wikidata
  • gnd
  • oclc
  • viaf
  • orcid
  • isni
  • unknown

Sachkategorien (GND)

  • subject_categories: Liste der Sachkategorie-URIs (gnd-sc), bereitgestellt vom GND-Provider.
  • Struktur: URI-Strings; keine Sprachvariante.
  • Duplikate werden entfernt, Reihenfolge entspricht der Quelle.

Lebensdaten

  • date_of_birth und date_of_death sind UTC-Zeitstempel (datetime) oder None, abhängig davon, was der Provider liefern kann.

Label-Sprachfilter

Standardmäßig werden alle verfügbaren Sprachvarianten gespeichert. Über die Resolver-Konfiguration lässt sich das einschränken:

resolver = UriResolver(
    providers=[WikidataProvider(), GNDProvider(), OclcEntityProvider()],
    redis_cache=RedisCache(redis),
    singleflight=RedisSingleflight(redis),
    config=ResolverConfig(label_languages=("de", "en")),
)

Die Reihenfolge der konfigurierten Sprachen wird beibehalten; doppelte oder leere Einträge werden automatisch entfernt. Der Filter wirkt ebenfalls auf alt_labels, descriptions und professions.


Verwendung

Minimalbeispiel

import asyncio
from redis.asyncio import Redis

from uri_resolver.cache.redis import RedisCache
from uri_resolver.resolver.redis_singleflight import RedisSingleflight
from uri_resolver.resolver.resolver import UriResolver
from uri_resolver.providers.wikidata import WikidataProvider
from uri_resolver.providers.gnd import GNDProvider
from uri_resolver.providers.oclc import OclcEntityProvider

async def main():
    redis = Redis.from_url("redis://localhost:6379/0", decode_responses=True)

    resolver = UriResolver(
        providers=[WikidataProvider(), GNDProvider(), OclcEntityProvider()],
        redis_cache=RedisCache(redis),
        singleflight=RedisSingleflight(redis),
    )

    entity = await resolver.resolve("http://d-nb.info/gnd/1211648338")
    print(entity.model_dump_json(indent=2))


asyncio.run(main())

CLI-Tool

Nach der Installation steht ein Kommandozeilenwerkzeug zur Verfügung:

uri-resolver https://d-nb.info/gnd/118626523 --format json-pretty

Argumente und Optionen:

  • uri (Pflicht): zu auflösende URI.
  • --format: "json" (Standard) oder "json-pretty" mit Einrückung.
  • --output <pfad>: schreibt die Ausgabe in eine Datei statt nach stdout.
  • --force-load: überspringt den Cache und lädt die URI direkt vom Provider.

Alternativ kann das Modul direkt gestartet werden:

python -m uri_resolver.main https://d-nb.info/gnd/118626523

Force Refresh

Standardmäßig wird Cache-first gearbeitet:

entity = await resolver.resolve(uri)

Um immer frisch vom Provider zu laden:

entity = await resolver.resolve(uri, force_load=True)

Semantik:

  • ignoriert vorhandene Cache-Einträge
  • nutzt weiterhin Distributed Locking
  • überschreibt Cache nach erfolgreichem Fetch

Batch-Verarbeitung

results = await resolver.resolve_many(
    uris,
    force_load=False,
    concurrency=100,
)

Eigenschaften:

  • Bounded Concurrency
  • Fehler pro URI isoliert
  • Redis-stampede-sicher

Luigi-Integration

Beispiel:

import asyncio
import luigi


class ResolveUriTask(luigi.Task):
    uri = luigi.Parameter()

    def run(self):
        result = asyncio.run(resolve_async(self.uri))
        with self.output().open("w") as f:
            f.write(result)

Wichtig:

  • Redis wird zwischen Worker-Prozessen geteilt
  • Locking verhindert doppelte Provider-Requests
  • force_load=True ist pro Task steuerbar

Redis-Strategie

Keyspace

uri_resolver:v1:cache:<canonical_uri>
uri_resolver:v1:lock:<canonical_uri>

TTL

Standard: 14 Tage (Provider-abhängig).

Override möglich über:

ResolverConfig(ttl_override=timedelta(days=7))

Versionierung

Namespace uri_resolver:v1 erlaubt Breaking Changes durch Anheben auf v2.


Distributed Singleflight

Problem:

Mehrere Luigi-Worker treffen gleichzeitig auf denselben Cache-Miss.

Lösung:

SET lock_key token NX PX <ttl>

Nur ein Worker lädt vom Provider. Andere Worker warten auf Cache-Füllung.


Provider

Wikidata

GND

  • SPARQL Endpoint: standardmäßig https://d-nb.info/sparql (konfigurierbar über URI_RESOLVER_GND_SPARQL_ENDPOINT)
  • Abfragemodus: SPARQL SELECT (enriched)
  • Labels:
    • gndo:preferredNameForThePerson
    • gndo:preferredNameForTheCorporateBody
    • gndo:preferredNameForTheConferenceOrEvent
    • gndo:preferredNameForThePlaceOrGeographicName
    • gndo:preferredNameForTheWork
    • bei Personen zusätzlich Fallback aus gndo:forename + gndo:surname
  • Alternative Labels:
    • über Prädikate mit variantName* / altLabel-Semantik
  • Berufe:
    • gndo:professionOrOccupation (RDF-Container-Mitglieder rdf:_1 ... rdf:_n)
    • Label-Auflösung über gndo:preferredNameForTheSubjectHeading
    • Ausgabe in ResolvedEntity.professions
  • sameAs:
    • owl:sameAs
    • skos:exactMatch
    • skos:closeMatch

Performance-Charakteristik

Bereich Verhalten
Cache-Hit Redis O(1)
Cache-Miss 1 Provider-Call
Multi-Worker max. 1 Fetch pro URI
Throughput abhängig von Endpoint-Limits

Empfohlene Concurrency

Kontext Empfehlung
resolve_many 50–100
Luigi Worker 4–16
SPARQL Endpoints vorsichtig mit >100 parallelen Calls

Erweiterbarkeit

Neue Provider:

class MyProvider(Provider):
    def can_resolve(self, uri: str) -> bool: ...
    async def resolve(self, uri: str) -> ResolvedEntity: ...

Einfach in UriResolver(providers=[...]) registrieren.


About

Ein hochparalleler, Redis-gestützter URI-Resolver für Authority-Daten (z. B. Wikidata, GND), konzipiert für den Einsatz in ETL-Pipelines mit Luigi und mehreren Worker-Prozessen.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages