Skip to content

feat(engine): expose per-node attribute schemas via 'schema' command#2134

Open
pyshx wants to merge 30 commits into
mainfrom
worktree-static-attr-schema
Open

feat(engine): expose per-node attribute schemas via 'schema' command#2134
pyshx wants to merge 30 commits into
mainfrom
worktree-static-attr-schema

Conversation

@pyshx
Copy link
Copy Markdown
Member

@pyshx pyshx commented Jun 3, 2026

Overview

Adds a static schema CLI command to the engine that exposes the available feature attributes at each node of a workflow, so the UI/user can know what attributes exist and target them in expressions (e.g. attributes["myAttribute"]). Source readers are sampled (a light, bounded read of their dataset) to discover real attributes; downstream transforms (AttributeManager, etc.) propagate on top. Output is JSON for a server/UI to consume.

This branch was repurposed from an earlier static build/check validator (which provided no standalone user value); that surface has been removed and the reusable engine kept. See "What I haven't done".

What I've done

  • Schema model (reearth-flow-types): AttrSchema (ordered fields + open flag), AttrType, AttrField (ty + Presence::{Always,Maybe}), and a join lattice for multi-edge fan-in. Plus a serde SchemaReport JSON DTO (ordered fields array, version, per-node note).
  • Source sampler (runtime/runtime/src/schema_sample.rs): runs a source reader briefly against a bounded channel, unions the first N features (default 10; --sample-size 0 = all) into a closed, typed AttrSchema. Per-source failures degrade to open + a note; never panics. No processors/sinks run, no sink writes.
  • Propagation (schema_infer.rs): infer_with_sampling seeds source nodes from samples, then propagates per-port schemas through the DAG in topological order (cycle-detected). Transfer functions for AttributeManager, AttributeMapper, StatisticsCalculator, DateTimeConverter, FeatureFilter; unknown processors pass through.
  • schema CLI command: reearth-flow schema --workflow <path|-> [--var K=V] [--sample-size N] → prints { version, sampleSize, nodes: { id: { name, ports: { port: { open, fields[] } }, note? } } } to stdout (logs to stderr). Supports !include expansion and --var.

What I haven't done

  • Removed the previous static build/check validator (command, referenced_input_attributes, Diagnostic/Severity, AttrRef) — it wasn't independently useful; the reusable schema engine was retained.
  • Server GraphQL endpoint and UI display/autocomplete — out of scope; this is the engine foundation. The schema JSON is their contract (separate specs to follow).
  • Typed source seeds for non-feature sources (DB catalog) and Rhai value-type inference — expression-derived values are Unknown for now (Rhai is being replaced). Sampling bounds features processed, not always bytes read (whole-file readers still read the file).

How I tested

  • Unit tests: AttrSchema join lattice, the serde DTO JSON shape, union_features edge cases (Maybe presence, type-conflict→Unknown, first-seen ordering), each transfer function.
  • Integration: schema_sample against a real GeoJSON read (tempfile); the schema command end-to-end on GeoJsonReader → AttributeManager(remove) — asserts the reader exposes real keys and the removed key is absent downstream while others survive.
  • Manual e2e on the real PLATEAU quality-check/bldg workflow (87 nodes, !includes): emits valid JSON, no panic.
  • Full local CI parity: cargo make check, format --check, clippy -D warnings, format-taplo --check, check-schema (0 drift), check-generate-examples-cms-workflow (0 drift), test-rs — all green. Engine version bumped to 0.0.377.

Screenshot

$ reearth-flow schema --workflow demo.yml
"GeoJsonReader":     { default: [ myAttribute:String(always), photoURL:String(always), address:String(maybe) ] }
"RemoveMyAttribute": { default: [ photoURL:String(always), address:String(maybe) ] }   # myAttribute removed

Which point I want you to review particularly

  • Sampling mechanism (schema_sample.rs): running a source reader on a current-thread tokio runtime over a bounded mpsc channel, then dropping the receiver to stop it. No-panic + no-deadlock are the key invariants (reviewed).
  • open + note fallback semantics: sources without a resolvable dataset (or expression-driven sources) report open: true + a note rather than failing — so the editor flow degrades gracefully.

Memo

Engine foundation only. schema is additive and does not affect run/dot. Branch based on origin/main.

pyshx added 30 commits June 1, 2026 14:14
Copilot AI review requested due to automatic review settings June 3, 2026 20:06
@pyshx pyshx requested review from asrcpq, n4to4 and shunski as code owners June 3, 2026 20:06
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an engine-level schema CLI command that reports per-node (and per-port) attribute schemas for a workflow, seeded by light source sampling and propagated through the DAG via per-action transfer functions.

Changes:

  • Introduces a new attribute-schema model (AttrSchema, AttrType, presence lattice, JSON DTOs) in reearth-flow-types.
  • Adds runtime source sampling (schema_sample) plus DAG propagation/inference (schema_infer), including cycle detection.
  • Adds reearth-flow schema CLI subcommand and implements schema transfer functions + tests for several processors.

Reviewed changes

Copilot reviewed 22 out of 23 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
engine/testing/workflow-tests/Cargo.toml Version bump for workflow test crate.
engine/testing/plateau-tiles-test/Cargo.toml Version bump for plateau tiles test crate.
engine/runtime/types/src/lib.rs Exposes the new attr_schema module.
engine/runtime/types/src/attr_schema.rs Adds schema data model + JSON report DTOs + unit tests.
engine/runtime/runtime/tests/schema_sample.rs Adds integration tests for source sampling via a real source factory.
engine/runtime/runtime/src/schema_sample.rs Implements bounded source execution + attribute unioning into a schema.
engine/runtime/runtime/src/schema_infer.rs Implements static propagation + sampling-seeded inference across the DAG.
engine/runtime/runtime/src/node.rs Adds infer_output_schema hooks to factory traits + small default behavior test.
engine/runtime/runtime/src/lib.rs Exports schema_infer and schema_sample modules.
engine/runtime/runtime/src/errors.rs Adds SchemaInferenceCycle execution error.
engine/runtime/runtime/Cargo.toml Adds dependencies needed by schema sampling/tests (e.g. indexmap, tempfile).
engine/runtime/action-processor/src/feature/filter.rs Adds schema inference for FeatureFilter.
engine/runtime/action-processor/src/attribute/statistics_calculator.rs Adds schema inference for StatisticsCalculator + tests.
engine/runtime/action-processor/src/attribute/mapper.rs Adds schema inference for AttributeMapper + tests.
engine/runtime/action-processor/src/attribute/manager.rs Adds schema inference for AttributeManager + tests.
engine/runtime/action-processor/src/attribute/datetime_converter.rs Adds schema inference for DateTimeConverter + tests.
engine/plateau-gis-quality-checker/src-tauri/Cargo.toml Version bump for Tauri app crate.
engine/cli/src/schema.rs Adds the schema CLI command implementation + end-to-end test.
engine/cli/src/main.rs Wires the new schema module.
engine/cli/src/cli.rs Registers the schema subcommand and dispatch.
engine/cli/Cargo.toml Adds needed deps/dev-deps for the new command/tests.
engine/Cargo.toml Engine version bump.
engine/Cargo.lock Lockfile updates reflecting new versions/dependencies.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +109 to +130
fn infer_output_schema(
&self,
inputs: &HashMap<Port, reearth_flow_types::attr_schema::AttrSchema>,
_with: &Option<HashMap<String, Value>>,
) -> Option<HashMap<Port, reearth_flow_types::attr_schema::AttrSchema>> {
use reearth_flow_types::attr_schema::AttrSchema;

// FeatureFilter routes whole features by expression; it never modifies
// attributes. So each statically-declared output port carries the input
// schema unchanged (identity).
let input = inputs
.get(&DEFAULT_PORT.clone())
.cloned()
.unwrap_or_else(AttrSchema::open);

let map = self
.get_output_ports()
.into_iter()
.map(|port| (port, input.clone()))
.collect();
Some(map)
}
Comment on lines +93 to +108
match op.method {
// Create/Convert both set the attribute to an expression-derived value,
// whose type we can't analyze statically -> Unknown, Always present.
Method::Create | Method::Convert => {
out.insert(attr, AttrField::always(AttrType::Unknown));
}
// Rename's destination name is an expression -> not statically knowable.
// Drop the source key and mark the schema open (an unknown-named attr appears).
Method::Rename => {
out.fields.shift_remove(&attr);
out.open = true;
}
Method::Remove => {
out.fields.shift_remove(&attr);
}
}
Comment on lines +360 to +363
assert_eq!(
schema.fields.get(&Attribute::new("foo".to_string())),
Some(&AttrField::always(AttrType::Unknown))
);
Comment on lines +423 to +425
assert!(!schema.fields.contains_key(&Attribute::new("a".to_string())));
assert!(schema.open);
}
Comment on lines +86 to +92
let joined = join_all_inputs(&inputs);
factory
.output_ports()
.into_iter()
.map(|p| (p, joined.clone()))
.collect()
}
Comment on lines +153 to +159
let joined = join_all_inputs(&inputs);
factory
.output_ports()
.into_iter()
.map(|p| (p, joined.clone()))
.collect()
}
Comment on lines +102 to +106
async fn read_features(
mut source: Box<dyn crate::node::Source>,
ctx: NodeContext,
sample_size: usize,
) -> Result<Vec<Feature>, String> {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants