Skip to content
144 changes: 143 additions & 1 deletion src/check_datapackage/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,17 @@ def _check_keys(properties: dict[str, Any], issues: list[Issue]) -> list[Issue]:
key_issues = _flat_map(resources_with_pk, _check_primary_key)

# Foreign keys

resources_with_fk = _get_fields_at_jsonpath(
"$.resources[?(length(@.schema.foreignKeys) > 0)]",
properties,
)
resources_with_fk = _keep_resources_with_no_issue_at_property(
resources_with_fk, issues, "schema.foreignKeys"
)
key_issues += _flat_map(
resources_with_fk,
lambda resource: _check_foreign_keys(resource, properties),
)
return key_issues


Expand Down Expand Up @@ -197,6 +207,35 @@ def _check_primary_key(resource: PropertyField) -> list[Issue]:
]


def _check_foreign_keys(
resource: PropertyField, properties: dict[str, Any]
) -> list[Issue]:
"""Check that foreign key source and destination fields exist."""
# Safe, as only FKs of the correct type here
foreign_keys = cast(
list[dict[str, Any]], resolve("/schema/foreignKeys", resource.value)
)
foreign_keys_diff_resource = _filter(
foreign_keys,
lambda fk: "resource" in fk["reference"] and fk["reference"]["resource"] != "",
)
foreign_keys_same_resource = _filter(
foreign_keys, lambda fk: fk not in foreign_keys_diff_resource
)

issues = _flat_map(foreign_keys, lambda fk: _check_fk_source_fields(fk, resource))
issues += _flat_map(
foreign_keys_same_resource,
lambda fk: _check_fk_dest_fields_same_resource(fk, resource),
)
issues += _flat_map(
foreign_keys_diff_resource,
lambda fk: _check_fk_dest_fields_diff_resource(fk, resource, properties),
)

return issues


def _key_fields_as_str_list(key_fields: Any) -> list[str]:
"""Returns the list representation of primary and foreign key fields.

Expand All @@ -220,6 +259,109 @@ def _get_unknown_key_fields(
return ", ".join(unknown_fields)


def _check_fk_source_fields(
foreign_key: dict[str, Any], resource: PropertyField
) -> list[Issue]:
"""Check that foreign key source fields exist and have the correct number."""
issues = []
source_fields = resolve("/fields", foreign_key)
source_field_list = _key_fields_as_str_list(source_fields)
unknown_fields = _get_unknown_key_fields(source_field_list, resource.value)
if unknown_fields:
issues.append(
Issue(
jsonpath=f"{resource.jsonpath}.schema.foreignKeys.fields",
type="foreign-key-source-fields",
message=(
"No fields found in resource for foreign key source fields: "
f"{unknown_fields}."
),
instance=source_fields,
)
)

dest_fields = _key_fields_as_str_list(resolve("/reference/fields", foreign_key))
if len(source_field_list) != len(dest_fields):
issues.append(
Issue(
jsonpath=f"{resource.jsonpath}.schema.foreignKeys.fields",
type="foreign-key-source-fields",
message=(
"The number of foreign key source fields must be the same as "
"the number of foreign key destination fields."
),
instance=source_fields,
)
)
return issues


def _check_fk_dest_fields_same_resource(
foreign_key: dict[str, Any],
resource: PropertyField,
) -> list[Issue]:
"""Check that foreign key destination fields exist on the same resource."""
dest_fields = resolve("/reference/fields", foreign_key)
dest_field_list = _key_fields_as_str_list(dest_fields)
unknown_fields = _get_unknown_key_fields(dest_field_list, resource.value)
if not unknown_fields:
return []

return [
Issue(
jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.fields",
type="foreign-key-destination-fields",
message=(
"No fields found in resource for foreign key "
f"destination fields: {unknown_fields}."
),
instance=dest_fields,
)
]


def _check_fk_dest_fields_diff_resource(
foreign_key: dict[str, Any], resource: PropertyField, properties: dict[str, Any]
) -> list[Issue]:
"""Check that foreign key destination fields exist on the destination resource."""
dest_fields = resolve("/reference/fields", foreign_key)
dest_field_list = _key_fields_as_str_list(dest_fields)
# Safe, as only keys of the correct type here
dest_resource_name = cast(str, resolve("/reference/resource", foreign_key))

dest_resource_path = f"resources[?(@.name == '{dest_resource_name}')]"
if not findall(dest_resource_path, properties):
return [
Issue(
jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.resource",
type="foreign-key-destination-resource",
message=(
f"The destination resource {dest_resource_name!r} of this foreign "
"key doesn't exist in the package."
),
instance=dest_resource_name,
)
]

unknown_fields = _get_unknown_key_fields(
dest_field_list, properties, f"{dest_resource_path}."
)
if not unknown_fields:
return []

return [
Issue(
jsonpath=f"{resource.jsonpath}.schema.foreignKeys.reference.fields",
type="foreign-key-destination-fields",
message=(
f"No fields found in destination resource {dest_resource_name!r} "
f"for foreign key destination fields: {unknown_fields}."
),
instance=dest_fields,
)
]


def _set_should_fields_to_required(schema: dict[str, Any]) -> dict[str, Any]:
"""Set 'SHOULD' fields to 'REQUIRED' in the schema."""
should_fields = ("name", "id", "licenses")
Expand Down
Loading