Skip to content

improve yaml schema validation error message #34480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
* (Java) Fix BigQuery Storage Write compatibility with Avro 1.8 ([#34281](https://github.com/apache/beam/pull/34281)).
* Fixed checkpoint recovery and streaming behavior in Spark Classic and Portable runner's Flatten transform by replacing queueStream with SingleEmitInputDStream ([#34080](https://github.com/apache/beam/pull/34080), [#18144](https://github.com/apache/beam/issues/18144), [#20426](https://github.com/apache/beam/issues/20426))
* (Java) Fixed Read caching of UnboundedReader objects to effectively cache across multiple DoFns and avoid checkpointing unstarted reader. [#34146](https://github.com/apache/beam/pull/34146) [#33901](https://github.com/apache/beam/pull/33901)
* (YAML) Improved YAML schema validation error message to include the key name ([#32870](https://github.com/apache/beam/issues/32870)).

## Known Issues

Expand Down
11 changes: 8 additions & 3 deletions sdks/python/apache_beam/yaml/yaml_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,26 @@ def pipeline_schema(strictness):
return pipeline_schema


def _closest_line(o, path):
def _closest_line_and_key(o, path):
best_key = '<root>'
best_line = SafeLineLoader.get_line(o)
for step in path:
o = o[step]
maybe_line = SafeLineLoader.get_line(o)
best_key = step
if maybe_line != 'unknown':
best_line = maybe_line
return best_line
return best_line, best_key


def validate_against_schema(pipeline, strictness):
try:
jsonschema.validate(pipeline, pipeline_schema(strictness))
except jsonschema.ValidationError as exn:
exn.message += f" around line {_closest_line(pipeline, exn.path)}"
line, key = _closest_line_and_key(pipeline, exn.path)
exn.message = (
f"Error found on key '{key}' around line {line}. "
f"Cause: {exn.message}.")
raise exn


Expand Down
3 changes: 2 additions & 1 deletion website/www/site/content/en/documentation/sdks/yaml.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ pipeline:

As syntactic sugar, you can name the first and last transforms in your pipeline
as `source` and `sink`. This convention does not change the resulting pipeline,
but it signals the intent of the source and sink transforms.
but it signals the intent of the source and sink transforms. Note that `source`
and `sink` each require a single transform definition (as a YAML object).

```
pipeline:
Expand Down
Loading