Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 37 additions & 58 deletions InteroperabilityEnabler/utils/annotation_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,78 +9,57 @@


def add_quality_annotations_to_df(
data, entity_type, assessed_attrs=None, type=None, context_value=None
context_df, time_series_df, sep="__", assessed_attrs=None
):
"""
Add quality annotations to a DataFrame for either
instance-level or attribute-level annotations (but not both).
Add NGSI-LD quality annotations to either the context (instance-level)
or the time series (attribute-level).

Args:
data (DataFrame): The flattened NGSI-LD data.
entity_type (str): The NGSI-LD entity type for quality annotations.
assessed_attrs (list of str): To annotate with quality information (if None, annotate entire instance).
type (str): The default `type` for the DataFrame rows if not already exist.
context_value (str or list): The value to assign to the `@context` column if it does not exist.
context_df (pd.DataFrame): Single-row DataFrame with 'id' and 'type'.
time_series_df (pd.DataFrame): Flattened time series DataFrame.
sep (str): Separator used in flattened column names (default: "__").
assessed_attrs (list of str, optional): List of attributes to annotate.
If None, annotate the context (instance-level).

Returns:
Pandas DataFrame with additional quality annotation columns.
Tuple[pd.DataFrame, pd.DataFrame]: (updated context_df, updated time_series_df)
"""
annotated_data = data.copy()
new_columns = {} # Dictionary to store new columns
# Copy inputs to avoid mutation
context_df = context_df.copy()
time_series_df = time_series_df.copy()

# Ensure the 'type' column exists; if not, create it
if "type" not in annotated_data.columns:
new_columns["type"] = type

# Ensure the 'id' column exists; if not, create it
if "id" not in annotated_data.columns:
new_columns["id"] = annotated_data.apply(
lambda row: f"urn:ngsi-ld:{row['type']}:{row.name}", axis=1
)

# Handle @context column (optional)
if context_value is not None: # Only add @context if context_value is provided
if "@context" not in annotated_data.columns:
if isinstance(context_value, list):
# Apply the list across all rows
new_columns["@context"] = [context_value] * len(annotated_data)
elif isinstance(context_value, str):
# Apply the string across all rows
new_columns["@context"] = context_value
entity_id = context_df.loc[0, "id"]
entity_type = context_df.loc[0, "type"]

if assessed_attrs is None:
# Annotate the entire instance (data point)
new_columns["hasQuality.type"] = "Relationship"
new_columns["hasQuality.object"] = annotated_data.apply(
lambda row: f"urn:ngsi-ld:DataQualityAssessment:{entity_type}:{row['id']}",
axis=1,
# Instance-level annotation → attach to context
context_df[f"hasQuality{sep}type"] = "Relationship"
context_df[f"hasQuality{sep}object"] = (
f"urn:ngsi-ld:DataQualityAssessment:{entity_type}:{entity_id}"
)
else:
# Annotate specific attributes
# Attribute-level annotation → apply per-attribute, per-row
for attr in assessed_attrs:
# Identify columns that start with the attribute name
matching_columns = [col for col in data.columns if col.startswith(attr)]
if not matching_columns:
raise ValueError(f"Attribute '{attr}' not found in DataFrame columns.")
attr_cols = [
col for col in time_series_df.columns if col.startswith(f"{attr}{sep}")
]
if not attr_cols:
raise ValueError(f"Attribute '{attr}' not found in DataFrame.")

rows_to_annotate = time_series_df[attr_cols].notna().any(axis=1)

# Add quality annotation for each matching attribute column
for col in matching_columns:
base_attr = col.split(".")[0] # Extract the base attribute name
quality_type_col = f"{base_attr}.hasQuality.type"
quality_object_col = f"{base_attr}.hasQuality.object"
quality_type_col = f"{attr}{sep}hasQuality{sep}type"
quality_obj_col = f"{attr}{sep}hasQuality{sep}object"

# Collect new columns in the dictionary
new_columns[quality_type_col] = "Relationship"
new_columns[quality_object_col] = annotated_data.apply(
lambda row: f"urn:ngsi-ld:DataQualityAssessment:{entity_type}:{row['id']}:{base_attr}".split(
"["
)[
0
],
axis=1,
)
# Initialize empty columns with None
time_series_df[quality_type_col] = None
time_series_df[quality_obj_col] = None

# Update DataFrame in one go using `pd.concat` to avoid fragmentation
annotated_data = pd.concat([annotated_data, pd.DataFrame(new_columns)], axis=1)
# Apply values only to relevant rows
time_series_df.loc[rows_to_annotate, quality_type_col] = "Relationship"
time_series_df.loc[rows_to_annotate, quality_obj_col] = (
f"urn:ngsi-ld:DataQualityAssessment:{entity_type}:{entity_id}:{attr}"
)

return annotated_data
return context_df, time_series_df
127 changes: 68 additions & 59 deletions InteroperabilityEnabler/utils/data_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,91 +10,100 @@

import json
import pandas as pd
from io import StringIO


def data_to_dataframe(data):
def data_formatter(data, sep="__"):
"""
Convert data from file path or raw JSON/JSON-LD into a flattened pandas DataFrame.
Formats input data (in JSON) into a structured DataFrame via the data_to_dataframe function.
The function accepts either a file path to a JSON file, a raw JSON-like string,
or a Python object (dictionary or list). It processes the input and leverages the provided separator
to convert the data into a DataFrame format.

If the input is not in a supported format or type, an error is raised or caught.

Args:
data (str | dict | list): Path to a data file or a JSON/JSON-LD object.
data: The input data to be formatted. Can be a file path (ending in .json), a
JSON-like string, or a Python object like a dictionary or list.
sep: The separator string used in formatting. Defaults to "__".

Returns:
pd.DataFrame: Flattened data as a DataFrame.
A DataFrame-like object containing the structured representation of the input data,
or a tuple of `None, None` if processing fails.
"""
df = None
try:
if isinstance(data, str):
# Handle file path
if data.endswith(".xls") or data.endswith(".xlsx"):
df = pd.read_excel(data)
elif data.endswith(".csv") :
df = pd.read_csv(data)
elif data.endswith(".json") or data.endswith(".jsonld"):
if data.endswith(".json"):
with open(data, "r", encoding="utf-8") as file:
json_data = json.load(file)
entities = json_data if isinstance(json_data, list) else json_data.get("@graph", [json_data])
df = pd.DataFrame([flatten_dict(e) for e in entities])
df.reset_index(drop=True, inplace=True)
raw_data = json.load(file)
return data_to_dataframe(raw_data, sep=sep)
else:
# Check if it's raw CSV content (contains commas and newlines)
if '\n' in data and (',' in data or ';' in data):
try:
df = pd.read_csv(StringIO(data))
except pd.errors.ParserError:
df = pd.read_csv(StringIO(data), sep=';')
else:
raise ValueError("Unsupported file format or content. Must be .xls, .xlsx, .csv, .json, .jsonld, or raw CSV content")
raise ValueError(
"Unsupported file format or content. Must be .json or raw JSON-like content"
)
elif isinstance(data, (dict, list)):
# Handle raw JSON or JSON-LD object directly
entities = data if isinstance(data, list) else data.get("@graph", [data])
df = pd.DataFrame([flatten_dict(e) for e in entities])
df.reset_index(drop=True, inplace=True)
return data_to_dataframe(data, sep=sep)
else:
raise ValueError("Unsupported input type. Must be file path or JSON object.")
raise ValueError(
"Unsupported input type. Must be a file path or JSON object."
)
except Exception as e:
print(f"Error processing data: {e}")
return df
return None, None


def flatten_dict(d, parent_key="", sep=".", preserve_keys=None):
def data_to_dataframe(raw_data, sep):
"""
Recursively flattens a nested dictionary into a flat dictionary.
Converts time-series data (in JSON) into two structured DataFrames: a context DataFrame
and a time series DataFrame.
The context DataFrame contains the id and type of the entity. It is a single-row DataFrame.
The time series DataFrame contains flattened and chronologically sorted time-based data (data points).

Args:
d (dict): The dictionary to flatten.
parent_key (str): Prefix for keys during recursion.
sep (str): Separator used for key hierarchy.
preserve_keys (list): Keys whose values should not be flattened.
raw_data: Dict containing time-series data.
sep: String separator used to create flat column names by combining keys
and attributes from the hierarchical structure.

Returns:
dict: A flattened dictionary.
Tuple[pd.DataFrame, pd.DataFrame]: A tuple containing two pandas DataFrames:
- A context DataFrame.
- A time series DataFrame where each row corresponds to a single observed
timestamp, flattened with attributes prefixed with their associated key.
"""
if preserve_keys is None:
preserve_keys = ["coordinates", "@context"]
# Build context DataFrame
context_keys = ["id", "type"]
context = {k: raw_data[k] for k in context_keys}
context_df = pd.DataFrame([context])

items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
# Build dynamic flat rows
rows = []

if isinstance(v, dict):
if k in preserve_keys:
items.append((new_key, v))
else:
items.extend(flatten_dict(v, new_key, sep=sep, preserve_keys=preserve_keys).items())
for key, val_list in raw_data.items():
if (
isinstance(val_list, list)
and val_list
and isinstance(val_list[0], dict)
and "observedAt" in val_list[0]
):
for entry in val_list:
timestamp = entry["observedAt"]
row = {"observedAt": timestamp}
for attr_key, attr_val in entry.items():
if attr_key == "observedAt":
continue
row[f"{key}{sep}{attr_key}"] = attr_val
rows.append(row)

elif isinstance(v, list):
if k in preserve_keys:
items.append((new_key, v))
else:
for i, item in enumerate(v):
if isinstance(item, dict):
items.extend(flatten_dict(item, f"{new_key}[{i}]", sep=sep, preserve_keys=preserve_keys).items())
else:
items.append((f"{new_key}[{i}]", item))
# Combine and reshape
time_series_df = pd.DataFrame(rows)

else:
items.append((new_key, v))
# Handle potential duplicates by grouping
time_series_df = time_series_df.groupby("observedAt").first().reset_index()

# Sort chronologically
time_series_df["observedAt"] = pd.to_datetime(time_series_df["observedAt"])
time_series_df = time_series_df.sort_values("observedAt").reset_index(drop=True)

# Convert to UNIX timestamp
time_series_df["observedAt"] = time_series_df["observedAt"].astype(int) // 10**9

return dict(items)
return context_df, time_series_df
Loading