From 274c2fed1b3b6d3f5c6fab7dfa0718ddb517836c Mon Sep 17 00:00:00 2001 From: Tim de Jager Date: Thu, 2 Jul 2026 18:58:19 +0000 Subject: [PATCH 1/5] feat(pipeline): harvest related legislation discovered during enrichment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Regelingen reached only through the machine-readable model (source.regulation, legal_basis, and open_terms/implements delegations like regeling_standaardpremie) are never auto-harvested: the recursive harvester follows only BWB hyperlinks in the source text, and those model links are a product of enrichment. The enrichment agent now returns the related legislation it needs in a result envelope (.enrichment-result.yaml) — deliberately kept OUT of the law YAML so the law artifact stays schema-conformant. After a successful enrich the worker reads the envelope, resolves each entry to a BWB id (agent bwb_id -> law_entries slug -> single-hit SRU title search), and enqueues a follow-up harvest for each. Newly harvested laws auto-enrich and return their own related legislation, so the dependency graph fills itself by recursion. - EnrichPayload.depth carries recursion depth (harvest -> enrich -> harvest); each related harvest is depth+1 at priority 40-(depth+1), so deeper nesting yields to shallower/interactive harvests. - Opt-in: HARVEST_RELATED_LEGISLATION (off by default) + RELATED_HARVEST_MAX_DEPTH (default 2); reuses ENRICH_DAILY_LIMIT for spend and create_harvest_job_if_not_exists for dedup. Best-effort: nothing here can fail the already-committed enrichment. - search_bwb_by_name extracted from the axum handler for reuse; find_bwb_id_by_slug made pub; slug hits re-validated as BWB (CVDR skipped). RFC-025 documents the pattern and its known limitations. --- .claude/skills/law-generate/SKILL.md | 38 ++++ docs/src/content/rfcs/rfc-025.md | 127 +++++++++++ packages/admin/src/handlers.rs | 2 + packages/pipeline/src/api/bwb_search.rs | 58 ++--- packages/pipeline/src/api/harvest.rs | 2 +- packages/pipeline/src/config.rs | 31 +++ packages/pipeline/src/enrich.rs | 185 ++++++++++++++- packages/pipeline/src/lib.rs | 5 +- packages/pipeline/src/worker.rs | 287 +++++++++++++++++++++++- 9 files changed, 701 insertions(+), 34 deletions(-) create mode 100644 docs/src/content/rfcs/rfc-025.md diff --git a/.claude/skills/law-generate/SKILL.md b/.claude/skills/law-generate/SKILL.md index 58703e177..fa9682960 100644 --- a/.claude/skills/law-generate/SKILL.md +++ b/.claude/skills/law-generate/SKILL.md @@ -763,6 +763,44 @@ The JSON payload format (written to the temp file): laws (>20 articles), this limit applies per batch — each batch of ~15 articles gets its own 3-iteration budget +## Phase 4.5: Write the related-legislation result envelope + +After the `machine_readable` sections are final, write a **sibling result +envelope** so the pipeline can auto-harvest the legislation this law depends on +(delegated regelingen and cross-law references the extref-only harvester misses). + +Write it next to the law YAML as `.enrichment-result.yaml` (same directory, +e.g. `corpus/regulation/nl/wet/wet_op_de_zorgtoeslag/.enrichment-result.yaml`). +Use the `Write` tool — no new agent tools are needed. + +```yaml +# .enrichment-result.yaml — result envelope, NOT part of the law schema +law_id: wet_op_de_zorgtoeslag +related_legislation: + - name: Regeling vaststelling standaardpremie en bestuursrechtelijke premie + relation: delegated_regeling # source_regulation | legal_basis | delegated_regeling + bwb_id: BWBR0037841 # optional, best-effort + slug: regeling_standaardpremie # optional, best-effort + open_term: standaardpremie # optional, only for delegations + - name: Algemene wet inkomensafhankelijke regelingen + relation: source_regulation +``` + +Coverage: add one entry for **every** `source.regulation` you bound, every +`legal_basis` you anchored on, and every `open_term` delegation you declared. +Fields: + +- `name` — **required**; the human-readable law/regeling title (used for search + fallback when no id/slug is given). +- `relation` — one of `source_regulation`, `legal_basis`, `delegated_regeling`. +- `bwb_id`, `slug`, `open_term` — **optional**, best-effort. Supply what you know + (a known `bwb_id` resolves fastest); leave the rest out. + +**CRITICAL — this MUST NOT go in the law YAML.** The law file stays strictly +schema-conformant (`just validate` must still pass). The related-legislation list +lives only in the `.enrichment-result.yaml` sidecar, which the pipeline reads +separately. Do not add a `related_legislation:` key anywhere inside the law YAML. + ## Phase 5: Report Report to the user: diff --git a/docs/src/content/rfcs/rfc-025.md b/docs/src/content/rfcs/rfc-025.md new file mode 100644 index 000000000..1279944b5 --- /dev/null +++ b/docs/src/content/rfcs/rfc-025.md @@ -0,0 +1,127 @@ +--- +title: "RFC-025: Related-Legislation Discovery via an Enrichment Result Envelope" +status: Proposed +implementation: Partially implemented +date: '2026-07-02' +authors: +- Tim de Jager +depends_on: +- RFC-007 (Cross-Law Execution Model) +- RFC-010 (Federated Corpus) +short_title: Related-Legislation Discovery +--- + +## Summary + +The recursive harvester follows only the **explicit BWB cross-links present in +the source text** (extrefs). That misses the legislation a machine-readable +model actually depends on but the text does not hyperlink: delegated regelingen +(e.g. `regeling_standaardpremie`, filled in via the `open_term`/`implements` +IoC pattern), cross-law `source.regulation` bindings whose target is named in +prose, and `legal_basis` anchors. The dependency graph therefore never fills +itself in for exactly the laws that matter. + +This RFC introduces a small feedback loop: **after enrichment, the agent +declares the related legislation it just modeled, and the worker enqueues +follow-up harvests for it.** Each harvested law is in turn enriched, which +declares its own dependencies, and so on — the graph completes itself via +recursion. + +## The result envelope (why it lives outside the law schema) + +The enrichment agent returns related legislation in a **result envelope +sidecar**, written next to the enriched law YAML as `.enrichment-result.yaml`: + +```yaml +law_id: wet_op_de_zorgtoeslag +related_legislation: + - name: Regeling vaststelling standaardpremie en bestuursrechtelijke premie + relation: delegated_regeling # source_regulation | legal_basis | delegated_regeling + bwb_id: BWBR0037841 # optional, best-effort + slug: regeling_standaardpremie # optional, best-effort + open_term: standaardpremie # optional, delegations only + - name: Algemene wet inkomensafhankelijke regelingen + relation: source_regulation +``` + +This is deliberately **not** a law-schema change. The law YAML must stay +strictly schema-conformant (`just validate` is the contract), and routing +metadata like "which other laws should the pipeline go fetch" is an artefact of +the enrichment *process*, not of the law itself. Putting it in a sidecar keeps +the two concerns separate: the law model is validated against the schema, and +the envelope is read by the pipeline separately and never validated as a law. +The envelope is staged alongside the law as provenance (it records what the +agent believed the dependencies were at enrichment time), but a malformed or +absent envelope can never fail an otherwise-successful enrichment — it degrades +to "no related legislation". + +## Worker hook, depth, and priority + +When a completed enrichment is committed, the worker (gated by an opt-in flag) +resolves each declared entry to a BWB id and enqueues a follow-up harvest. + +**Depth** propagates so the recursion is bounded and observable: + +- A harvest job carries `depth` (already used by the existing extref follow-up + loop). When that harvest auto-enqueues its enrichment, the enrichment + **inherits** the harvest's depth. +- The follow-up harvests an enrichment spawns get `depth + 1`. +- Admin-requested enrichments are roots (`depth = None`, i.e. 0). + +**Priority** decreases one point per nesting level +(`RELATED_HARVEST_BASE - (depth + 1)`, base 40, clamped to `0..=100`), so a +deep, speculative related-harvest always yields to shallower and to +editor-/root-requested harvests. + +## Hybrid resolution + +A declared entry is resolved to a BWB id in order, stopping at the first hit: + +1. an explicit `bwb_id` matching `^BWBR\d{7}$`; +2. a slug lookup against `law_entries` (`slug`, else a slugified `name`); +3. an SRU title search by `name` — accepted **only** when it returns exactly + one law. More than one hit is logged as `needs_confirmation` and skipped (a + human decides); zero hits or an error is skipped. + +The worker emits a single summary log per enrichment with the +total/resolved/enqueued/needs_confirmation/unresolved counts. + +## Guards + +The loop is designed to be safe to leave on: + +- **Opt-in**: off by default, enabled via `HARVEST_RELATED_LEGISLATION=true`. +- **Depth cap**: `RELATED_HARVEST_MAX_DEPTH` (default 2) stops the recursion. +- **Daily enrich cap**: `ENRICH_DAILY_LIMIT` continues to bound how many + enrichments (and therefore how much LLM spend) run per day, independent of how + many harvests the loop enqueues. +- **Dedup**: `create_harvest_job_if_not_exists` prevents duplicate queued jobs; + `harvest_exhausted` laws are skipped; version resolution/shadowing (RFC-010) + keeps re-harvests from multiplying the corpus. + +## Known limitations + +- **Shared depth counter.** The related-harvest recursion reuses + `HarvestPayload.depth`, the same field the extref recursive harvester + increments. A law reached via ≥ `RELATED_HARVEST_MAX_DEPTH` extref hops + therefore arrives at enrichment already at/above the cap, so related-legislation + discovery is skipped for it — even if that is the *first* such opportunity for + that law. In practice the feature fires for roots and shallow laws (the intended + case); deep-in-a-chain laws usually already have their references harvested via + extref anyway. A dedicated `related_depth` field would lift this restriction and + is the clean follow-up. +- **Unambiguous-but-loose SRU match.** A single SRU title hit is accepted as-is; + there is no similarity threshold, so a loosely-matching unique result could + enqueue the wrong law. Impact is low — the loop is opt-in, harvest-only, and its + output lands on a reviewable `enrich/{provider}` branch — but it is why `>1` + hits deliberately degrade to `needs_confirmation`/skip rather than guessing. + +## Implementation status + +Implemented: the envelope types and sidecar read, `EnrichPayload.depth` with +propagation across all three enqueue sites, the SRU search extracted into a +client-taking function shared with the API handler, the worker hook with hybrid +resolution and summary logging, the two config flags, and the enrichment-skill +step that writes the envelope. Not yet built: a UI/admin surface for the +`needs_confirmation` cases (currently only logged), and a build-time slug/name +index to make resolution independent of harvest order. diff --git a/packages/admin/src/handlers.rs b/packages/admin/src/handlers.rs index a3a11df9c..14c5bad0b 100644 --- a/packages/admin/src/handlers.rs +++ b/packages/admin/src/handlers.rs @@ -644,6 +644,8 @@ pub async fn create_enrich_jobs( law_id: law_id.clone(), yaml_path: yaml_path.clone(), provider: Some((*provider_name).to_string()), + // Admin-requested enrichments are roots of the related-harvest chain. + depth: None, }; let payload_json = serde_json::to_value(&enrich_payload).map_err(|e| { diff --git a/packages/pipeline/src/api/bwb_search.rs b/packages/pipeline/src/api/bwb_search.rs index fc3bf34b9..8580326f1 100644 --- a/packages/pipeline/src/api/bwb_search.rs +++ b/packages/pipeline/src/api/bwb_search.rs @@ -29,9 +29,24 @@ pub async fn search_bwb( State(state): State, Query(params): Query, ) -> Result>, (StatusCode, String)> { - let q = params.q.trim(); - if q.is_empty() || q.len() < 3 { - return Ok(Json(vec![])); + match search_bwb_by_name(&state.http_client, params.q.trim()).await { + Ok(results) => Ok(Json(results)), + Err(e) => Err((StatusCode::BAD_GATEWAY, e)), + } +} + +/// Search wetten.overheid.nl via the SRU API for laws matching `q`. +/// +/// The client-taking core shared by the axum handler and the enrich worker's +/// related-legislation resolution. Queries shorter than 3 characters (after the +/// same sanitize as the handler) return an empty list rather than an error. +pub async fn search_bwb_by_name( + client: &reqwest::Client, + q: &str, +) -> Result, String> { + let q = q.trim(); + if q.len() < 3 { + return Ok(vec![]); } let sanitized: String = q @@ -50,35 +65,20 @@ pub async fn search_bwb( ("maximumRecords", &MAX_RESULTS.to_string()), ], ) - .map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("URL build error: {e}"), - ) - })?; - - let response = state - .http_client + .map_err(|e| format!("URL build error: {e}"))?; + + let response = client .get(url) .send() .await - .map_err(|e| (StatusCode::BAD_GATEWAY, format!("BWB search failed: {e}")))?; - - let xml_text = response.text().await.map_err(|e| { - ( - StatusCode::BAD_GATEWAY, - format!("BWB response read failed: {e}"), - ) - })?; - - let results = parse_sru_response(&xml_text).map_err(|e| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("XML parse error: {e}"), - ) - })?; - - Ok(Json(results)) + .map_err(|e| format!("BWB search failed: {e}"))?; + + let xml_text = response + .text() + .await + .map_err(|e| format!("BWB response read failed: {e}"))?; + + parse_sru_response(&xml_text).map_err(|e| format!("XML parse error: {e}")) } /// Parse SRU XML response and extract unique laws (deduplicated by BWBR ID). diff --git a/packages/pipeline/src/api/harvest.rs b/packages/pipeline/src/api/harvest.rs index 4993d000d..32b441eca 100644 --- a/packages/pipeline/src/api/harvest.rs +++ b/packages/pipeline/src/api/harvest.rs @@ -155,7 +155,7 @@ async fn resolve_identifiers( } /// Find a law's BWB ID by its slug in the law_entries table. -async fn find_bwb_id_by_slug( +pub async fn find_bwb_id_by_slug( pool: &sqlx::PgPool, slug: &str, ) -> Result, sqlx::Error> { diff --git a/packages/pipeline/src/config.rs b/packages/pipeline/src/config.rs index d848a8cad..fa4939a69 100644 --- a/packages/pipeline/src/config.rs +++ b/packages/pipeline/src/config.rs @@ -97,6 +97,17 @@ pub struct WorkerConfig { /// Off by default; enrichment is otherwise requested explicitly via the admin /// API. Configurable via `ENRICH_AUTO_ENQUEUE`. pub auto_enrich_enqueue: bool, + /// When true, a completed enrichment enqueues follow-up harvests for the + /// related legislation the agent declared in `.enrichment-result.yaml` + /// (delegated regelingen, cross-law sources, legal bases the extref-only + /// harvester misses). Off by default. Configurable via + /// `HARVEST_RELATED_LEGISLATION`. + pub auto_harvest_related: bool, + /// Maximum recursion depth for related-legislation follow-up harvests. + /// A depth-0 enrichment may enqueue harvests at depth 1, whose enrichments + /// may enqueue at depth 2, etc., up to this cap. Default: 2. Configurable + /// via `RELATED_HARVEST_MAX_DEPTH`. + pub related_harvest_max_depth: u32, } impl std::fmt::Debug for WorkerConfig { @@ -118,6 +129,8 @@ impl std::fmt::Debug for WorkerConfig { ) .field("enrich_daily_limit", &self.enrich_daily_limit) .field("auto_enrich_enqueue", &self.auto_enrich_enqueue) + .field("auto_harvest_related", &self.auto_harvest_related) + .field("related_harvest_max_depth", &self.related_harvest_max_depth) .finish() } } @@ -193,6 +206,22 @@ impl WorkerConfig { }) .unwrap_or(false); + // Auto-harvest of related legislation after enrichment is opt-in; + // unset/unrecognized reads as false. + let auto_harvest_related = std::env::var("HARVEST_RELATED_LEGISLATION") + .map(|v| { + matches!( + v.trim().to_ascii_lowercase().as_str(), + "1" | "true" | "yes" | "on" + ) + }) + .unwrap_or(false); + + let related_harvest_max_depth: u32 = std::env::var("RELATED_HARVEST_MAX_DEPTH") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(2); + Ok(Self { database_url, max_connections, @@ -207,6 +236,8 @@ impl WorkerConfig { max_consecutive_resource_failures, enrich_daily_limit, auto_enrich_enqueue, + auto_harvest_related, + related_harvest_max_depth, }) } diff --git a/packages/pipeline/src/enrich.rs b/packages/pipeline/src/enrich.rs index 07e6db170..5105cb9bc 100644 --- a/packages/pipeline/src/enrich.rs +++ b/packages/pipeline/src/enrich.rs @@ -374,12 +374,92 @@ pub struct EnrichPayload { /// When set, overrides the worker's `LLM_PROVIDER` env var. #[serde(default, skip_serializing_if = "Option::is_none")] pub provider: Option, + /// Recursion depth for related-legislation follow-up harvests. Inherited + /// from the harvest job that spawned this enrichment. `None` or `0` means a + /// root enrichment; the child harvests it enqueues get `depth + 1`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub depth: Option, } /// All known provider names. Used to create one enrich job per provider /// after a successful harvest. pub const ENRICH_PROVIDERS: &[&str] = &["opencode", "claude"]; +/// A related-legislation reference returned by the enrichment agent in the +/// `.enrichment-result.yaml` sidecar (the "result envelope"). +/// +/// The extref-only recursive harvester only follows explicit BWB cross-links in +/// the source text, so it misses delegated regelingen and other laws a +/// machine-readable model actually depends on (a `source.regulation`, a +/// `legal_basis`, or an `open_term` delegation). The enrichment agent knows +/// these because it just modeled them, so it declares them here and the worker +/// enqueues follow-up harvests — letting the dependency graph fill itself in. +/// +/// This lives OUTSIDE the law schema on purpose: the law YAML stays +/// schema-conformant, and this provenance/routing metadata rides alongside it. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct RelatedLegislation { + /// Human-readable name of the related law/regeling (used for SRU fallback + /// resolution when no `bwb_id`/`slug` is supplied). + pub name: String, + /// How this law relates: `source_regulation`, `legal_basis`, or + /// `delegated_regeling`. Informational; the worker treats all the same. + #[serde(default)] + pub relation: String, + /// Best-effort BWB identifier (e.g. "BWBR0018451"). Preferred resolution. + #[serde(default)] + pub bwb_id: Option, + /// Best-effort corpus slug (e.g. "wet_op_de_zorgtoeslag"). Second-choice + /// resolution, looked up against `law_entries`. + #[serde(default)] + pub slug: Option, + /// The `open_term` id this delegation fills, when `relation` is a delegation. + #[serde(default)] + pub open_term: Option, +} + +/// The `.enrichment-result.yaml` result envelope written next to an enriched +/// law YAML. Deliberately NOT a law-schema change — see [`RelatedLegislation`]. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +pub struct EnrichmentResultEnvelope { + #[serde(default)] + pub law_id: Option, + #[serde(default)] + pub related_legislation: Vec, +} + +/// Read the sibling `.enrichment-result.yaml` result envelope for a law YAML. +/// +/// Never errors, so it can never fail an otherwise-successful enrichment: +/// - absent file → empty list; +/// - unparseable file → logged at `warn` and empty list. +fn read_enrichment_result_envelope(yaml_abs: &Path) -> Vec { + let envelope_path = enrichment_result_path(yaml_abs); + let content = match std::fs::read_to_string(&envelope_path) { + Ok(c) => c, + Err(_) => return Vec::new(), + }; + match serde_yaml_ng::from_str::(&content) { + Ok(envelope) => envelope.related_legislation, + Err(e) => { + tracing::warn!( + path = %envelope_path.display(), + error = %e, + "failed to parse .enrichment-result.yaml; ignoring related legislation" + ); + Vec::new() + } + } +} + +/// Path of the `.enrichment-result.yaml` sidecar next to a law YAML file. +fn enrichment_result_path(yaml_abs: &Path) -> PathBuf { + yaml_abs + .parent() + .unwrap_or(Path::new(".")) + .join(".enrichment-result.yaml") +} + /// Result of a successful enrichment execution. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EnrichResult { @@ -395,6 +475,11 @@ pub struct EnrichResult { pub coverage_score: f64, pub provider: String, pub branch: String, + /// Related legislation the enrichment agent declared this law depends on, + /// read from the `.enrichment-result.yaml` sidecar. The worker uses these to + /// enqueue follow-up harvests. Empty when no sidecar was written. + #[serde(default)] + pub related_legislation: Vec, } /// Metadata written alongside the enriched law YAML as `.enrichment.yaml`. @@ -1075,9 +1160,19 @@ pub async fn execute_enrich_with_runner( .map_err(|e| PipelineError::Enrich(format!("failed to serialize metadata: {e}")))?; tokio::fs::write(&metadata_path, &metadata_yaml).await?; + // Read the related-legislation result envelope the agent may have written. + // Never fails: absent/malformed → empty (see read_enrichment_result_envelope). + let related_legislation = read_enrichment_result_envelope(&yaml_abs); + // Collect written files for corpus staging let mut written_files = vec![yaml_abs.clone(), metadata_path]; + // Stage the result envelope as provenance when the agent wrote one. + let envelope_path = enrichment_result_path(&yaml_abs); + if envelope_path.exists() { + written_files.push(envelope_path); + } + // Check if a feature file was generated for this specific law. // MvT research creates feature files named after the law slug. // Only include files whose name contains the law slug to avoid @@ -1114,6 +1209,7 @@ pub async fn execute_enrich_with_runner( coverage_score, provider: provider_name, branch, + related_legislation, }; Ok((result, written_files)) @@ -1206,23 +1302,29 @@ mod tests { law_id: "BWBR0018451".to_string(), yaml_path: "regulation/nl/wet/wet_op_de_zorgtoeslag/2025-01-01.yaml".to_string(), provider: Some("claude".to_string()), + depth: Some(2), }; let json = serde_json::to_string(&payload).unwrap(); let deserialized: EnrichPayload = serde_json::from_str(&json).unwrap(); assert_eq!(deserialized.provider.as_deref(), Some("claude")); + assert_eq!(deserialized.depth, Some(2)); - // Verify backward compatibility: provider is optional and skipped when None + // Verify backward compatibility: provider and depth are optional and + // skipped when None (old queued payloads omit them entirely). let payload_no_provider = EnrichPayload { law_id: "BWBR0018451".to_string(), yaml_path: "regulation/nl/wet/wet_op_de_zorgtoeslag/2025-01-01.yaml".to_string(), provider: None, + depth: None, }; let json_no_provider = serde_json::to_string(&payload_no_provider).unwrap(); assert!(!json_no_provider.contains("provider")); + assert!(!json_no_provider.contains("depth")); let deserialized_no_provider: EnrichPayload = serde_json::from_str(&json_no_provider).unwrap(); assert!(deserialized_no_provider.provider.is_none()); + assert!(deserialized_no_provider.depth.is_none()); assert_eq!(deserialized.law_id, "BWBR0018451"); assert!(deserialized.yaml_path.contains("zorgtoeslag")); @@ -1238,6 +1340,7 @@ mod tests { coverage_score: 0.7, provider: "opencode".to_string(), branch: "enrich/opencode".to_string(), + related_legislation: Vec::new(), }; let json = serde_json::to_value(&result).unwrap(); @@ -1247,6 +1350,83 @@ mod tests { assert_eq!(json["branch"], "enrich/opencode"); } + #[test] + fn test_envelope_full_deserialization() { + let yaml = r#" +law_id: wet_op_de_zorgtoeslag +related_legislation: + - name: Regeling vaststelling standaardpremie en bestuursrechtelijke premie + relation: delegated_regeling + bwb_id: BWBR0037841 + slug: regeling_standaardpremie + open_term: standaardpremie + - name: Algemene wet inkomensafhankelijke regelingen + relation: source_regulation +"#; + let envelope: EnrichmentResultEnvelope = serde_yaml_ng::from_str(yaml).unwrap(); + assert_eq!(envelope.law_id.as_deref(), Some("wet_op_de_zorgtoeslag")); + assert_eq!(envelope.related_legislation.len(), 2); + let first = &envelope.related_legislation[0]; + assert_eq!(first.relation, "delegated_regeling"); + assert_eq!(first.bwb_id.as_deref(), Some("BWBR0037841")); + assert_eq!(first.slug.as_deref(), Some("regeling_standaardpremie")); + assert_eq!(first.open_term.as_deref(), Some("standaardpremie")); + // Second entry omits every optional field. + let second = &envelope.related_legislation[1]; + assert_eq!(second.relation, "source_regulation"); + assert!(second.bwb_id.is_none()); + assert!(second.slug.is_none()); + assert!(second.open_term.is_none()); + } + + #[test] + fn test_envelope_missing_fields_default() { + // Only `name` is required; everything else defaults. + let yaml = "related_legislation:\n - name: Some Law\n"; + let envelope: EnrichmentResultEnvelope = serde_yaml_ng::from_str(yaml).unwrap(); + assert!(envelope.law_id.is_none()); + assert_eq!(envelope.related_legislation.len(), 1); + let entry = &envelope.related_legislation[0]; + assert_eq!(entry.name, "Some Law"); + assert_eq!(entry.relation, ""); + assert!(entry.bwb_id.is_none()); + } + + #[test] + fn test_read_envelope_absent_file_is_empty() { + let dir = tempfile::tempdir().unwrap(); + let yaml_abs = dir.path().join("2025-01-01.yaml"); + // No sidecar exists next to it. + assert!(read_enrichment_result_envelope(&yaml_abs).is_empty()); + } + + #[test] + fn test_read_envelope_malformed_is_empty() { + let dir = tempfile::tempdir().unwrap(); + let yaml_abs = dir.path().join("2025-01-01.yaml"); + std::fs::write( + enrichment_result_path(&yaml_abs), + "related_legislation: [this is: not valid: yaml", + ) + .unwrap(); + // Malformed sidecar must never error — it degrades to empty. + assert!(read_enrichment_result_envelope(&yaml_abs).is_empty()); + } + + #[test] + fn test_read_envelope_present_parses() { + let dir = tempfile::tempdir().unwrap(); + let yaml_abs = dir.path().join("2025-01-01.yaml"); + std::fs::write( + enrichment_result_path(&yaml_abs), + "related_legislation:\n - name: Delegated Regeling\n bwb_id: BWBR0037841\n", + ) + .unwrap(); + let related = read_enrichment_result_envelope(&yaml_abs); + assert_eq!(related.len(), 1); + assert_eq!(related[0].bwb_id.as_deref(), Some("BWBR0037841")); + } + #[test] fn test_llm_provider_opencode_defaults() { let provider = LlmProvider::OpenCode { @@ -1582,6 +1762,7 @@ articles: law_id: "BWBR0000001".into(), yaml_path: yaml_path.into(), provider: Some("opencode".into()), + depth: None, }; let config = test_config(LlmProvider::OpenCode { @@ -1646,6 +1827,7 @@ articles: law_id: "BWBR0000001".into(), yaml_path: yaml_path.into(), provider: None, + depth: None, }; let config = test_config(LlmProvider::OpenCode { @@ -1693,6 +1875,7 @@ articles: law_id: "BWBR0000001".into(), yaml_path: yaml_path.into(), provider: None, + depth: None, }; let config = test_config(LlmProvider::OpenCode { diff --git a/packages/pipeline/src/lib.rs b/packages/pipeline/src/lib.rs index 1035bc372..f59e282dd 100644 --- a/packages/pipeline/src/lib.rs +++ b/packages/pipeline/src/lib.rs @@ -20,8 +20,9 @@ pub use api_state::ApiState; pub use config::{PipelineConfig, WorkerConfig}; pub use db::{create_pool, ensure_schema, MIGRATION_LOCK_KEY}; pub use enrich::{ - progress_file_path, EnrichConfig, EnrichPayload, EnrichResult, EnrichmentMetadata, LlmProvider, - LlmRunner, ProcessLlmRunner, ENRICH_PROVIDERS, + progress_file_path, EnrichConfig, EnrichPayload, EnrichResult, EnrichmentMetadata, + EnrichmentResultEnvelope, LlmProvider, LlmRunner, ProcessLlmRunner, RelatedLegislation, + ENRICH_PROVIDERS, }; pub use error::PipelineError; pub use harvest::{HarvestPayload, HarvestResult, MAX_HARVEST_DEPTH}; diff --git a/packages/pipeline/src/worker.rs b/packages/pipeline/src/worker.rs index 85d784452..6e56e23c7 100644 --- a/packages/pipeline/src/worker.rs +++ b/packages/pipeline/src/worker.rs @@ -10,7 +10,7 @@ use crate::config::WorkerConfig; use crate::db; use crate::enrich::{ create_enrich_corpus, enrich_branch_name, execute_enrich, progress_file_path, EnrichConfig, - EnrichPayload, + EnrichPayload, RelatedLegislation, }; use crate::error::{PipelineError, Result}; use crate::harvest::{execute_harvest, HarvestPayload, HarvestResult, MAX_HARVEST_DEPTH}; @@ -412,6 +412,9 @@ async fn process_next_job( law_id: job.law_id.clone(), yaml_path: result.file_path.clone(), provider: Some((*provider_name).to_string()), + // Inherit the just-completed harvest's depth so a + // related-legislation harvest chain keeps counting. + depth: payload.depth, }; let payload_json = match serde_json::to_value(&enrich_payload) { Ok(json) => json, @@ -700,6 +703,12 @@ pub async fn run_enrich_worker(config: WorkerConfig) -> Result<()> { let enrich_config = EnrichConfig::from_env(); + // One shared HTTP client for related-legislation SRU resolution. Built once + // (connection pooling) and threaded into every enrich job's follow-up hook. + let http_client = regelrecht_harvester::http::create_client().map_err(|e| { + crate::error::PipelineError::Worker(format!("failed to create HTTP client: {e}")) + })?; + // Corpus config is passed per-job so each enrichment creates its own // branch-specific corpus client. We still use the base repo_path as // fallback when corpus is not configured. @@ -807,6 +816,9 @@ pub async fn run_enrich_worker(config: WorkerConfig) -> Result<()> { config.corpus_config.as_ref(), config.job_timeout, config.exhausted_threshold, + &http_client, + config.auto_harvest_related, + config.related_harvest_max_depth, ) .await { @@ -846,6 +858,194 @@ pub async fn run_enrich_worker(config: WorkerConfig) -> Result<()> { Ok(()) } +/// Base priority for related-legislation follow-up harvests. One point below +/// this per nesting level (see [`related_harvest_priority`]) so the speculative +/// related-harvest chain always yields to editor- and root-requested harvests +/// (which use higher priorities). +const RELATED_HARVEST_BASE: i32 = 40; + +/// Priority for a related-legislation follow-up harvest spawned by an enrichment +/// at `enrich_depth`. Drops one point per nesting level so deeper (more +/// speculative) harvests yield to shallower ones; [`Priority::new`] clamps the +/// result into the valid `0..=100` range. +fn related_harvest_priority(enrich_depth: u32) -> Priority { + Priority::new(RELATED_HARVEST_BASE - (enrich_depth as i32 + 1)) +} + +/// True when `s` is a syntactically valid BWB regulation id (`^BWBR\d{7}$`). +fn is_valid_bwb_id(s: &str) -> bool { + s.len() == 11 && s.starts_with("BWBR") && s[4..].bytes().all(|b| b.is_ascii_digit()) +} + +/// Turn a law name into a corpus slug: ASCII-lowercase, every run of +/// non-alphanumeric characters collapsed to a single `_`, trimmed of leading and +/// trailing `_`. Best-effort fallback used for slug lookup when the agent didn't +/// supply an explicit `slug`. +fn slugify(name: &str) -> String { + let mut out = String::with_capacity(name.len()); + let mut pending_underscore = false; + for c in name.chars() { + if c.is_ascii_alphanumeric() { + if pending_underscore && !out.is_empty() { + out.push('_'); + } + pending_underscore = false; + out.push(c.to_ascii_lowercase()); + } else { + pending_underscore = true; + } + } + out +} + +/// Outcome of resolving a single [`RelatedLegislation`] entry to a BWB id. +enum RelatedResolution { + /// Resolved to a concrete BWB id. + Resolved(String), + /// SRU search matched more than one law — a human must pick; skip for now. + NeedsConfirmation, + /// No candidate (unknown slug, zero SRU hits, or a lookup error). Skip. + Unresolved, +} + +/// Resolve a related-legislation entry to a BWB id via the hybrid order: +/// (a) explicit valid `bwb_id`, (b) slug lookup (explicit `slug` else +/// `slugify(name)`) against `law_entries`, (c) SRU search by name accepting only +/// an unambiguous single hit. Never errors — lookup failures degrade to skips. +async fn resolve_related_bwb_id( + pool: &PgPool, + http_client: &Client, + entry: &RelatedLegislation, +) -> RelatedResolution { + // (a) explicit bwb_id + if let Some(bwb_id) = entry.bwb_id.as_deref() { + if is_valid_bwb_id(bwb_id) { + return RelatedResolution::Resolved(bwb_id.to_string()); + } + } + + // (b) slug lookup + let slug = entry.slug.clone().unwrap_or_else(|| slugify(&entry.name)); + if !slug.is_empty() { + match crate::api::harvest::find_bwb_id_by_slug(pool, &slug).await { + // The slug may map to a CVDR id (local regulation); only a BWB id is + // harvestable through the `bwb_id` follow-up path, so skip non-BWB + // hits rather than enqueue a malformed harvest. + Ok(Some(id)) if is_valid_bwb_id(&id) => return RelatedResolution::Resolved(id), + Ok(Some(id)) => { + tracing::debug!(slug = %slug, resolved = %id, "slug resolved to a non-BWB id; skipping"); + } + Ok(None) => {} + Err(e) => { + tracing::warn!(slug = %slug, error = %e, "slug lookup failed for related legislation"); + } + } + } + + // (c) SRU search by name — accept only an unambiguous single hit + match crate::api::bwb_search::search_bwb_by_name(http_client, &entry.name).await { + Ok(results) if results.len() == 1 => RelatedResolution::Resolved(results[0].bwb_id.clone()), + Ok(results) if results.len() > 1 => RelatedResolution::NeedsConfirmation, + Ok(_) => RelatedResolution::Unresolved, + Err(e) => { + tracing::warn!(name = %entry.name, error = %e, "SRU search failed for related legislation"); + RelatedResolution::Unresolved + } + } +} + +/// Resolve every related-legislation entry declared by an enrichment and enqueue +/// a follow-up harvest for each resolved BWB id at `enrich_depth + 1`. Emits one +/// summary log with the total/resolved/enqueued/needs_confirmation/unresolved +/// counts. Best-effort throughout: a failure on one entry never blocks the rest, +/// and none of this can fail the already-committed enrichment. +async fn harvest_related_legislation( + pool: &PgPool, + http_client: &Client, + parent_law_id: &str, + related: &[RelatedLegislation], + enrich_depth: u32, +) { + if related.is_empty() { + return; + } + + let child_depth = enrich_depth + 1; + let priority = related_harvest_priority(enrich_depth); + let total = related.len(); + let mut resolved = 0u32; + let mut enqueued = 0u32; + let mut needs_confirmation = 0u32; + let mut unresolved = 0u32; + + for entry in related { + let bwb_id = match resolve_related_bwb_id(pool, http_client, entry).await { + RelatedResolution::Resolved(id) => id, + RelatedResolution::NeedsConfirmation => { + needs_confirmation += 1; + tracing::info!( + parent_law_id = %parent_law_id, + name = %entry.name, + "related legislation matched multiple BWB results: needs_confirmation, skipping" + ); + continue; + } + RelatedResolution::Unresolved => { + unresolved += 1; + continue; + } + }; + resolved += 1; + + // Skip harvest for exhausted laws (mirror the follow-up harvest block). + if let Ok(law) = law_status::get_law(pool, &bwb_id).await { + if law.status == LawStatusValue::HarvestExhausted { + tracing::info!(bwb_id = %bwb_id, "skipping related harvest: law is harvest_exhausted"); + continue; + } + } + + // Related harvests always want the latest consolidation (date None). The + // dedup key uses the payload date, which is NULL here — the ON-CONFLICT + // guard still matches existing NULL-date jobs and skips duplicates. + let follow_up_payload = HarvestPayload { + bwb_id: Some(bwb_id.clone()), + cvdr_id: None, + date: None, + max_size_mb: None, + depth: Some(child_depth), + }; + let payload_json = match serde_json::to_value(&follow_up_payload) { + Ok(v) => v, + Err(e) => { + tracing::warn!(bwb_id = %bwb_id, error = %e, "failed to serialize related harvest payload"); + continue; + } + }; + let req = CreateJobRequest::new(JobType::Harvest, bwb_id.as_str()) + .with_priority(priority) + .with_payload(payload_json); + match job_queue::create_harvest_job_if_not_exists(pool, req, "").await { + Ok(Some(_)) => enqueued += 1, + Ok(None) => {} // already queued, skip + Err(e) => { + tracing::warn!(bwb_id = %bwb_id, error = %e, "failed to create related harvest job") + } + } + } + + tracing::info!( + parent_law_id = %parent_law_id, + depth = child_depth, + total, + resolved, + enqueued, + needs_confirmation, + unresolved, + "related-legislation harvest summary" + ); +} + /// Process the next available enrich job. /// /// Returns the [`JobOutcome`]: `Processed` when a job was handled, `Idle` when @@ -855,6 +1055,7 @@ pub async fn run_enrich_worker(config: WorkerConfig) -> Result<()> { /// Each enrichment creates a separate branch (`enrich/{provider}`) /// so results can be reviewed before merging. A dedicated `CorpusClient` is /// created per job pointing at the enrichment branch. +#[allow(clippy::too_many_arguments)] async fn process_next_enrich_job( pool: &PgPool, repo_path: &Path, @@ -862,6 +1063,9 @@ async fn process_next_enrich_job( corpus_config: Option<&CorpusConfig>, job_timeout: Duration, exhausted_threshold: i32, + http_client: &Client, + auto_harvest_related: bool, + related_harvest_max_depth: u32, ) -> Result { let job = match job_queue::claim_job(pool, Some(JobType::Enrich)).await? { Some(job) => job, @@ -1160,6 +1364,33 @@ async fn process_next_enrich_job( "coverage score updated" ); } + + // Enqueue follow-up harvests for the related legislation the + // enrichment agent declared (delegated regelingen, cross-law + // sources, legal bases the extref-only harvester misses). + // Opt-in and depth-capped so the recursion is bounded. + if auto_harvest_related { + let enrich_depth = payload.depth.unwrap_or(0); + if enrich_depth < related_harvest_max_depth { + harvest_related_legislation( + pool, + http_client, + &job.law_id, + &result.related_legislation, + enrich_depth, + ) + .await; + } else if !result.related_legislation.is_empty() { + tracing::info!( + law_id = %job.law_id, + depth = enrich_depth, + max_depth = related_harvest_max_depth, + related = result.related_legislation.len(), + "skipping related-legislation harvest: max depth reached" + ); + } + } + Ok(JobOutcome::Processed) } } @@ -1460,4 +1691,58 @@ mod tests { handle_resource_exhaustion(&mut counter, 3, "test"); assert_eq!(counter, 2); } + + #[test] + fn is_valid_bwb_id_matches_bwbr_seven_digits() { + assert!(is_valid_bwb_id("BWBR0018451")); + // Wrong prefix, wrong digit count, extra chars, or wrong casing all fail. + assert!(!is_valid_bwb_id("BWBR001845")); // 6 digits + assert!(!is_valid_bwb_id("BWBR00184510")); // 8 digits + assert!(!is_valid_bwb_id("CVDR0018451")); + assert!(!is_valid_bwb_id("BWBR001845x")); + assert!(!is_valid_bwb_id("bwbr0018451")); + assert!(!is_valid_bwb_id("")); + } + + #[test] + fn slugify_normalizes_names() { + assert_eq!(slugify("Wet op de zorgtoeslag"), "wet_op_de_zorgtoeslag"); + // Collapses runs of punctuation/whitespace and trims the edges. + assert_eq!( + slugify(" Regeling: standaard-premie!! "), + "regeling_standaard_premie" + ); + assert_eq!(slugify("---"), ""); + } + + #[test] + fn related_harvest_priority_drops_one_per_level_and_clamps() { + // Base is 40; child depth = enrich_depth + 1, priority = 40 - child_depth. + assert_eq!(related_harvest_priority(0).value(), 39); + assert_eq!(related_harvest_priority(1).value(), 38); + assert_eq!(related_harvest_priority(2).value(), 37); + // Deep chains clamp at 0 rather than going negative. + assert_eq!(related_harvest_priority(39).value(), 0); + assert_eq!(related_harvest_priority(100).value(), 0); + } + + #[test] + fn harvest_payload_depth_round_trips_through_serde() { + let payload = HarvestPayload { + bwb_id: Some("BWBR0018451".to_string()), + cvdr_id: None, + date: None, + max_size_mb: None, + depth: Some(2), + }; + let json = serde_json::to_value(&payload).unwrap(); + assert_eq!(json["depth"], 2); + let back: HarvestPayload = serde_json::from_value(json).unwrap(); + assert_eq!(back.depth, Some(2)); + + // depth None is omitted from the wire form (backward compatible). + let root = HarvestPayload::for_law("BWBR0018451", None); + let root_json = serde_json::to_string(&root).unwrap(); + assert!(!root_json.contains("depth")); + } } From 86e2069e57b4fee51306620743f52c12d04b1325 Mon Sep 17 00:00:00 2001 From: Tim de Jager Date: Thu, 2 Jul 2026 19:48:36 +0000 Subject: [PATCH 2/5] refactor(pipeline): make related-legislation harvest always-on Drop the HARVEST_RELATED_LEGISLATION opt-in flag. The follow-up harvest only enqueues harvest jobs (no LLM cost); the expensive re-enrichment of those laws stays gated by ENRICH_AUTO_ENQUEUE + ENRICH_DAILY_LIMIT, and the recursion is bounded by RELATED_HARVEST_MAX_DEPTH. So there is nothing to protect behind a separate flag. --- docs/src/content/rfcs/rfc-025.md | 21 ++++++++------- packages/pipeline/src/config.rs | 19 -------------- packages/pipeline/src/worker.rs | 44 +++++++++++++++----------------- 3 files changed, 33 insertions(+), 51 deletions(-) diff --git a/docs/src/content/rfcs/rfc-025.md b/docs/src/content/rfcs/rfc-025.md index 1279944b5..69e3a1cb3 100644 --- a/docs/src/content/rfcs/rfc-025.md +++ b/docs/src/content/rfcs/rfc-025.md @@ -57,8 +57,8 @@ to "no related legislation". ## Worker hook, depth, and priority -When a completed enrichment is committed, the worker (gated by an opt-in flag) -resolves each declared entry to a BWB id and enqueues a follow-up harvest. +When a completed enrichment is committed, the worker resolves each declared entry +to a BWB id and enqueues a follow-up harvest. **Depth** propagates so the recursion is bounded and observable: @@ -88,9 +88,12 @@ total/resolved/enqueued/needs_confirmation/unresolved counts. ## Guards -The loop is designed to be safe to leave on: +The loop is always on but designed to be safe to leave on: -- **Opt-in**: off by default, enabled via `HARVEST_RELATED_LEGISLATION=true`. +- **Cheap by itself**: it only enqueues *harvest* jobs (no LLM). The LLM-costly + re-enrichment of a newly harvested law is separately gated by + `ENRICH_AUTO_ENQUEUE` and bounded by `ENRICH_DAILY_LIMIT`, so the expensive part + of the recursion never runs unless enrichment auto-enqueue is explicitly on. - **Depth cap**: `RELATED_HARVEST_MAX_DEPTH` (default 2) stops the recursion. - **Daily enrich cap**: `ENRICH_DAILY_LIMIT` continues to bound how many enrichments (and therefore how much LLM spend) run per day, independent of how @@ -112,16 +115,16 @@ The loop is designed to be safe to leave on: is the clean follow-up. - **Unambiguous-but-loose SRU match.** A single SRU title hit is accepted as-is; there is no similarity threshold, so a loosely-matching unique result could - enqueue the wrong law. Impact is low — the loop is opt-in, harvest-only, and its - output lands on a reviewable `enrich/{provider}` branch — but it is why `>1` + enqueue the wrong law. Impact is low — the follow-up is harvest-only (no LLM), + and its output lands on a reviewable `enrich/{provider}` branch — but it is why `>1` hits deliberately degrade to `needs_confirmation`/skip rather than guessing. ## Implementation status Implemented: the envelope types and sidecar read, `EnrichPayload.depth` with propagation across all three enqueue sites, the SRU search extracted into a -client-taking function shared with the API handler, the worker hook with hybrid -resolution and summary logging, the two config flags, and the enrichment-skill -step that writes the envelope. Not yet built: a UI/admin surface for the +client-taking function shared with the API handler, the always-on worker hook with +hybrid resolution and summary logging, the `RELATED_HARVEST_MAX_DEPTH` cap, and the +enrichment-skill step that writes the envelope. Not yet built: a UI/admin surface for the `needs_confirmation` cases (currently only logged), and a build-time slug/name index to make resolution independent of harvest order. diff --git a/packages/pipeline/src/config.rs b/packages/pipeline/src/config.rs index fa4939a69..f3f6f6530 100644 --- a/packages/pipeline/src/config.rs +++ b/packages/pipeline/src/config.rs @@ -97,12 +97,6 @@ pub struct WorkerConfig { /// Off by default; enrichment is otherwise requested explicitly via the admin /// API. Configurable via `ENRICH_AUTO_ENQUEUE`. pub auto_enrich_enqueue: bool, - /// When true, a completed enrichment enqueues follow-up harvests for the - /// related legislation the agent declared in `.enrichment-result.yaml` - /// (delegated regelingen, cross-law sources, legal bases the extref-only - /// harvester misses). Off by default. Configurable via - /// `HARVEST_RELATED_LEGISLATION`. - pub auto_harvest_related: bool, /// Maximum recursion depth for related-legislation follow-up harvests. /// A depth-0 enrichment may enqueue harvests at depth 1, whose enrichments /// may enqueue at depth 2, etc., up to this cap. Default: 2. Configurable @@ -129,7 +123,6 @@ impl std::fmt::Debug for WorkerConfig { ) .field("enrich_daily_limit", &self.enrich_daily_limit) .field("auto_enrich_enqueue", &self.auto_enrich_enqueue) - .field("auto_harvest_related", &self.auto_harvest_related) .field("related_harvest_max_depth", &self.related_harvest_max_depth) .finish() } @@ -206,17 +199,6 @@ impl WorkerConfig { }) .unwrap_or(false); - // Auto-harvest of related legislation after enrichment is opt-in; - // unset/unrecognized reads as false. - let auto_harvest_related = std::env::var("HARVEST_RELATED_LEGISLATION") - .map(|v| { - matches!( - v.trim().to_ascii_lowercase().as_str(), - "1" | "true" | "yes" | "on" - ) - }) - .unwrap_or(false); - let related_harvest_max_depth: u32 = std::env::var("RELATED_HARVEST_MAX_DEPTH") .ok() .and_then(|v| v.parse().ok()) @@ -236,7 +218,6 @@ impl WorkerConfig { max_consecutive_resource_failures, enrich_daily_limit, auto_enrich_enqueue, - auto_harvest_related, related_harvest_max_depth, }) } diff --git a/packages/pipeline/src/worker.rs b/packages/pipeline/src/worker.rs index 6e56e23c7..885b15aee 100644 --- a/packages/pipeline/src/worker.rs +++ b/packages/pipeline/src/worker.rs @@ -817,7 +817,6 @@ pub async fn run_enrich_worker(config: WorkerConfig) -> Result<()> { config.job_timeout, config.exhausted_threshold, &http_client, - config.auto_harvest_related, config.related_harvest_max_depth, ) .await @@ -1064,7 +1063,6 @@ async fn process_next_enrich_job( job_timeout: Duration, exhausted_threshold: i32, http_client: &Client, - auto_harvest_related: bool, related_harvest_max_depth: u32, ) -> Result { let job = match job_queue::claim_job(pool, Some(JobType::Enrich)).await? { @@ -1368,27 +1366,27 @@ async fn process_next_enrich_job( // Enqueue follow-up harvests for the related legislation the // enrichment agent declared (delegated regelingen, cross-law // sources, legal bases the extref-only harvester misses). - // Opt-in and depth-capped so the recursion is bounded. - if auto_harvest_related { - let enrich_depth = payload.depth.unwrap_or(0); - if enrich_depth < related_harvest_max_depth { - harvest_related_legislation( - pool, - http_client, - &job.law_id, - &result.related_legislation, - enrich_depth, - ) - .await; - } else if !result.related_legislation.is_empty() { - tracing::info!( - law_id = %job.law_id, - depth = enrich_depth, - max_depth = related_harvest_max_depth, - related = result.related_legislation.len(), - "skipping related-legislation harvest: max depth reached" - ); - } + // Always on, but depth-capped so the recursion is bounded (and + // the LLM-costly re-enrichment of those harvests is separately + // gated by ENRICH_AUTO_ENQUEUE + ENRICH_DAILY_LIMIT). + let enrich_depth = payload.depth.unwrap_or(0); + if enrich_depth < related_harvest_max_depth { + harvest_related_legislation( + pool, + http_client, + &job.law_id, + &result.related_legislation, + enrich_depth, + ) + .await; + } else if !result.related_legislation.is_empty() { + tracing::info!( + law_id = %job.law_id, + depth = enrich_depth, + max_depth = related_harvest_max_depth, + related = result.related_legislation.len(), + "skipping related-legislation harvest: max depth reached" + ); } Ok(JobOutcome::Processed) From e770061141d7ea846a74f289dd1b44b0fda3b589 Mon Sep 17 00:00:00 2001 From: Tim de Jager Date: Thu, 2 Jul 2026 20:00:15 +0000 Subject: [PATCH 3/5] fix(dev): exclude enrichment sidecars from law YAML validation The enrichment result envelope (.enrichment-result.yaml) and the existing .enrichment.yaml metadata sidecar are written into a law directory but are not law files. `find -name '*.yaml'` matches leading-dot names and the pre-commit `files:` regex matches them too, so validating one fails (missing $id). Skip dot-prefixed sidecars in both script/validate.sh and the validate-law-yaml hook. --- .pre-commit-config.yaml | 3 +++ script/validate.sh | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4aaee4c25..718522b8f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -59,6 +59,9 @@ repos: language: system pass_filenames: true files: ^corpus/regulation/.*\.yaml$ + # Skip dot-prefixed sidecars (.enrichment.yaml, .enrichment-result.yaml): + # enrichment metadata/result envelopes, not law files. + exclude: (^|/)\.[^/]*\.yaml$ types: [yaml] - id: skills-no-casus diff --git a/script/validate.sh b/script/validate.sh index ac85ad673..cc422c766 100755 --- a/script/validate.sh +++ b/script/validate.sh @@ -13,7 +13,10 @@ if [ $# -eq 0 ]; then FILES=() while IFS= read -r -d '' f; do FILES+=("$f") - done < <(find "$REPO_ROOT/corpus/regulation" -name '*.yaml' -print0 | sort -z) + # Exclude dot-prefixed sidecars (e.g. .enrichment.yaml, .enrichment-result.yaml) + # — enrichment metadata/result envelopes, not law files. `find -name '*.yaml'` + # matches leading-dot names, so filter them out explicitly. + done < <(find "$REPO_ROOT/corpus/regulation" -name '*.yaml' ! -name '.*' -print0 | sort -z) set -- "${FILES[@]}" fi From 4a24029faf21860f2e7390da4902e66c73721498 Mon Sep 17 00:00:00 2001 From: Tim de Jager Date: Thu, 2 Jul 2026 20:17:40 +0000 Subject: [PATCH 4/5] fix(pipeline): tighten related-legislation resolution; drop RFC-025 Address CI review findings: - A CVDR slug hit no longer falls through to the SRU name search (the slug already identified the law; a title match could resolve a *different* national law). It now returns Unresolved. - The harvest summary log separates already_queued and exhausted skips instead of conflating them in the resolved-but-not-enqueued gap. Also drop RFC-025: the related-legislation harvest loop is an implementation detail, not a cross-cutting design decision that warrants an RFC. --- docs/src/content/rfcs/rfc-025.md | 130 ------------------------------- packages/pipeline/src/worker.rs | 13 +++- 2 files changed, 11 insertions(+), 132 deletions(-) delete mode 100644 docs/src/content/rfcs/rfc-025.md diff --git a/docs/src/content/rfcs/rfc-025.md b/docs/src/content/rfcs/rfc-025.md deleted file mode 100644 index 69e3a1cb3..000000000 --- a/docs/src/content/rfcs/rfc-025.md +++ /dev/null @@ -1,130 +0,0 @@ ---- -title: "RFC-025: Related-Legislation Discovery via an Enrichment Result Envelope" -status: Proposed -implementation: Partially implemented -date: '2026-07-02' -authors: -- Tim de Jager -depends_on: -- RFC-007 (Cross-Law Execution Model) -- RFC-010 (Federated Corpus) -short_title: Related-Legislation Discovery ---- - -## Summary - -The recursive harvester follows only the **explicit BWB cross-links present in -the source text** (extrefs). That misses the legislation a machine-readable -model actually depends on but the text does not hyperlink: delegated regelingen -(e.g. `regeling_standaardpremie`, filled in via the `open_term`/`implements` -IoC pattern), cross-law `source.regulation` bindings whose target is named in -prose, and `legal_basis` anchors. The dependency graph therefore never fills -itself in for exactly the laws that matter. - -This RFC introduces a small feedback loop: **after enrichment, the agent -declares the related legislation it just modeled, and the worker enqueues -follow-up harvests for it.** Each harvested law is in turn enriched, which -declares its own dependencies, and so on — the graph completes itself via -recursion. - -## The result envelope (why it lives outside the law schema) - -The enrichment agent returns related legislation in a **result envelope -sidecar**, written next to the enriched law YAML as `.enrichment-result.yaml`: - -```yaml -law_id: wet_op_de_zorgtoeslag -related_legislation: - - name: Regeling vaststelling standaardpremie en bestuursrechtelijke premie - relation: delegated_regeling # source_regulation | legal_basis | delegated_regeling - bwb_id: BWBR0037841 # optional, best-effort - slug: regeling_standaardpremie # optional, best-effort - open_term: standaardpremie # optional, delegations only - - name: Algemene wet inkomensafhankelijke regelingen - relation: source_regulation -``` - -This is deliberately **not** a law-schema change. The law YAML must stay -strictly schema-conformant (`just validate` is the contract), and routing -metadata like "which other laws should the pipeline go fetch" is an artefact of -the enrichment *process*, not of the law itself. Putting it in a sidecar keeps -the two concerns separate: the law model is validated against the schema, and -the envelope is read by the pipeline separately and never validated as a law. -The envelope is staged alongside the law as provenance (it records what the -agent believed the dependencies were at enrichment time), but a malformed or -absent envelope can never fail an otherwise-successful enrichment — it degrades -to "no related legislation". - -## Worker hook, depth, and priority - -When a completed enrichment is committed, the worker resolves each declared entry -to a BWB id and enqueues a follow-up harvest. - -**Depth** propagates so the recursion is bounded and observable: - -- A harvest job carries `depth` (already used by the existing extref follow-up - loop). When that harvest auto-enqueues its enrichment, the enrichment - **inherits** the harvest's depth. -- The follow-up harvests an enrichment spawns get `depth + 1`. -- Admin-requested enrichments are roots (`depth = None`, i.e. 0). - -**Priority** decreases one point per nesting level -(`RELATED_HARVEST_BASE - (depth + 1)`, base 40, clamped to `0..=100`), so a -deep, speculative related-harvest always yields to shallower and to -editor-/root-requested harvests. - -## Hybrid resolution - -A declared entry is resolved to a BWB id in order, stopping at the first hit: - -1. an explicit `bwb_id` matching `^BWBR\d{7}$`; -2. a slug lookup against `law_entries` (`slug`, else a slugified `name`); -3. an SRU title search by `name` — accepted **only** when it returns exactly - one law. More than one hit is logged as `needs_confirmation` and skipped (a - human decides); zero hits or an error is skipped. - -The worker emits a single summary log per enrichment with the -total/resolved/enqueued/needs_confirmation/unresolved counts. - -## Guards - -The loop is always on but designed to be safe to leave on: - -- **Cheap by itself**: it only enqueues *harvest* jobs (no LLM). The LLM-costly - re-enrichment of a newly harvested law is separately gated by - `ENRICH_AUTO_ENQUEUE` and bounded by `ENRICH_DAILY_LIMIT`, so the expensive part - of the recursion never runs unless enrichment auto-enqueue is explicitly on. -- **Depth cap**: `RELATED_HARVEST_MAX_DEPTH` (default 2) stops the recursion. -- **Daily enrich cap**: `ENRICH_DAILY_LIMIT` continues to bound how many - enrichments (and therefore how much LLM spend) run per day, independent of how - many harvests the loop enqueues. -- **Dedup**: `create_harvest_job_if_not_exists` prevents duplicate queued jobs; - `harvest_exhausted` laws are skipped; version resolution/shadowing (RFC-010) - keeps re-harvests from multiplying the corpus. - -## Known limitations - -- **Shared depth counter.** The related-harvest recursion reuses - `HarvestPayload.depth`, the same field the extref recursive harvester - increments. A law reached via ≥ `RELATED_HARVEST_MAX_DEPTH` extref hops - therefore arrives at enrichment already at/above the cap, so related-legislation - discovery is skipped for it — even if that is the *first* such opportunity for - that law. In practice the feature fires for roots and shallow laws (the intended - case); deep-in-a-chain laws usually already have their references harvested via - extref anyway. A dedicated `related_depth` field would lift this restriction and - is the clean follow-up. -- **Unambiguous-but-loose SRU match.** A single SRU title hit is accepted as-is; - there is no similarity threshold, so a loosely-matching unique result could - enqueue the wrong law. Impact is low — the follow-up is harvest-only (no LLM), - and its output lands on a reviewable `enrich/{provider}` branch — but it is why `>1` - hits deliberately degrade to `needs_confirmation`/skip rather than guessing. - -## Implementation status - -Implemented: the envelope types and sidecar read, `EnrichPayload.depth` with -propagation across all three enqueue sites, the SRU search extracted into a -client-taking function shared with the API handler, the always-on worker hook with -hybrid resolution and summary logging, the `RELATED_HARVEST_MAX_DEPTH` cap, and the -enrichment-skill step that writes the envelope. Not yet built: a UI/admin surface for the -`needs_confirmation` cases (currently only logged), and a build-time slug/name -index to make resolution independent of harvest order. diff --git a/packages/pipeline/src/worker.rs b/packages/pipeline/src/worker.rs index 885b15aee..e138712d7 100644 --- a/packages/pipeline/src/worker.rs +++ b/packages/pipeline/src/worker.rs @@ -932,7 +932,11 @@ async fn resolve_related_bwb_id( // hits rather than enqueue a malformed harvest. Ok(Some(id)) if is_valid_bwb_id(&id) => return RelatedResolution::Resolved(id), Ok(Some(id)) => { - tracing::debug!(slug = %slug, resolved = %id, "slug resolved to a non-BWB id; skipping"); + // The slug already identified the law (it's just CVDR, not + // BWB-harvestable here). Do NOT fall through to the name search: + // a title match could resolve a *different* national law. + tracing::debug!(slug = %slug, resolved = %id, "slug resolved to a non-BWB id; not harvestable via bwb_id path"); + return RelatedResolution::Unresolved; } Ok(None) => {} Err(e) => { @@ -974,6 +978,8 @@ async fn harvest_related_legislation( let total = related.len(); let mut resolved = 0u32; let mut enqueued = 0u32; + let mut already_queued = 0u32; + let mut exhausted = 0u32; let mut needs_confirmation = 0u32; let mut unresolved = 0u32; @@ -999,6 +1005,7 @@ async fn harvest_related_legislation( // Skip harvest for exhausted laws (mirror the follow-up harvest block). if let Ok(law) = law_status::get_law(pool, &bwb_id).await { if law.status == LawStatusValue::HarvestExhausted { + exhausted += 1; tracing::info!(bwb_id = %bwb_id, "skipping related harvest: law is harvest_exhausted"); continue; } @@ -1026,7 +1033,7 @@ async fn harvest_related_legislation( .with_payload(payload_json); match job_queue::create_harvest_job_if_not_exists(pool, req, "").await { Ok(Some(_)) => enqueued += 1, - Ok(None) => {} // already queued, skip + Ok(None) => already_queued += 1, // an active harvest already exists Err(e) => { tracing::warn!(bwb_id = %bwb_id, error = %e, "failed to create related harvest job") } @@ -1039,6 +1046,8 @@ async fn harvest_related_legislation( total, resolved, enqueued, + already_queued, + exhausted, needs_confirmation, unresolved, "related-legislation harvest summary" From d0ce3b117b7f779541224f105ae4238ed79a799a Mon Sep 17 00:00:00 2001 From: Tim de Jager Date: Thu, 2 Jul 2026 20:33:40 +0000 Subject: [PATCH 5/5] fix(pipeline): address review nits on related-legislation resolution - Validate the single-hit SRU result as a BWB id before resolving, so a malformed SRU id can't slip into a harvest payload (paths a/b already validate). - Read the .enrichment-result.yaml sidecar via tokio::fs (was blocking std::fs) for consistency with the rest of execute_enrich_with_runner. - Clarify the depth-inherit comment: the field is the shared extref-recursion counter, so deep-via-extref laws skip related discovery (roots/shallow laws, the intended case, are unaffected). --- packages/pipeline/src/enrich.rs | 24 ++++++++++++------------ packages/pipeline/src/worker.rs | 16 ++++++++++++---- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/packages/pipeline/src/enrich.rs b/packages/pipeline/src/enrich.rs index 5105cb9bc..8fae95c91 100644 --- a/packages/pipeline/src/enrich.rs +++ b/packages/pipeline/src/enrich.rs @@ -433,9 +433,9 @@ pub struct EnrichmentResultEnvelope { /// Never errors, so it can never fail an otherwise-successful enrichment: /// - absent file → empty list; /// - unparseable file → logged at `warn` and empty list. -fn read_enrichment_result_envelope(yaml_abs: &Path) -> Vec { +async fn read_enrichment_result_envelope(yaml_abs: &Path) -> Vec { let envelope_path = enrichment_result_path(yaml_abs); - let content = match std::fs::read_to_string(&envelope_path) { + let content = match tokio::fs::read_to_string(&envelope_path).await { Ok(c) => c, Err(_) => return Vec::new(), }; @@ -1162,7 +1162,7 @@ pub async fn execute_enrich_with_runner( // Read the related-legislation result envelope the agent may have written. // Never fails: absent/malformed → empty (see read_enrichment_result_envelope). - let related_legislation = read_enrichment_result_envelope(&yaml_abs); + let related_legislation = read_enrichment_result_envelope(&yaml_abs).await; // Collect written files for corpus staging let mut written_files = vec![yaml_abs.clone(), metadata_path]; @@ -1392,16 +1392,16 @@ related_legislation: assert!(entry.bwb_id.is_none()); } - #[test] - fn test_read_envelope_absent_file_is_empty() { + #[tokio::test] + async fn test_read_envelope_absent_file_is_empty() { let dir = tempfile::tempdir().unwrap(); let yaml_abs = dir.path().join("2025-01-01.yaml"); // No sidecar exists next to it. - assert!(read_enrichment_result_envelope(&yaml_abs).is_empty()); + assert!(read_enrichment_result_envelope(&yaml_abs).await.is_empty()); } - #[test] - fn test_read_envelope_malformed_is_empty() { + #[tokio::test] + async fn test_read_envelope_malformed_is_empty() { let dir = tempfile::tempdir().unwrap(); let yaml_abs = dir.path().join("2025-01-01.yaml"); std::fs::write( @@ -1410,11 +1410,11 @@ related_legislation: ) .unwrap(); // Malformed sidecar must never error — it degrades to empty. - assert!(read_enrichment_result_envelope(&yaml_abs).is_empty()); + assert!(read_enrichment_result_envelope(&yaml_abs).await.is_empty()); } - #[test] - fn test_read_envelope_present_parses() { + #[tokio::test] + async fn test_read_envelope_present_parses() { let dir = tempfile::tempdir().unwrap(); let yaml_abs = dir.path().join("2025-01-01.yaml"); std::fs::write( @@ -1422,7 +1422,7 @@ related_legislation: "related_legislation:\n - name: Delegated Regeling\n bwb_id: BWBR0037841\n", ) .unwrap(); - let related = read_enrichment_result_envelope(&yaml_abs); + let related = read_enrichment_result_envelope(&yaml_abs).await; assert_eq!(related.len(), 1); assert_eq!(related[0].bwb_id.as_deref(), Some("BWBR0037841")); } diff --git a/packages/pipeline/src/worker.rs b/packages/pipeline/src/worker.rs index e138712d7..9713b1dd9 100644 --- a/packages/pipeline/src/worker.rs +++ b/packages/pipeline/src/worker.rs @@ -412,8 +412,12 @@ async fn process_next_job( law_id: job.law_id.clone(), yaml_path: result.file_path.clone(), provider: Some((*provider_name).to_string()), - // Inherit the just-completed harvest's depth so a - // related-legislation harvest chain keeps counting. + // Inherit the harvest's depth. NB: this is the shared + // extref-recursion counter, so a law reached via + // >= RELATED_HARVEST_MAX_DEPTH extref hops enriches at a + // depth that skips related-legislation discovery. Roots and + // shallow laws (the intended case) are unaffected; a + // dedicated related-depth counter is the follow-up. depth: payload.depth, }; let payload_json = match serde_json::to_value(&enrich_payload) { @@ -945,9 +949,13 @@ async fn resolve_related_bwb_id( } } - // (c) SRU search by name — accept only an unambiguous single hit + // (c) SRU search by name — accept only an unambiguous single hit, and only + // if it is a well-formed BWB id (paths a/b validate too; don't let a + // malformed SRU id slip into a harvest payload). match crate::api::bwb_search::search_bwb_by_name(http_client, &entry.name).await { - Ok(results) if results.len() == 1 => RelatedResolution::Resolved(results[0].bwb_id.clone()), + Ok(results) if results.len() == 1 && is_valid_bwb_id(&results[0].bwb_id) => { + RelatedResolution::Resolved(results[0].bwb_id.clone()) + } Ok(results) if results.len() > 1 => RelatedResolution::NeedsConfirmation, Ok(_) => RelatedResolution::Unresolved, Err(e) => {