-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvisualize_results.py
More file actions
270 lines (224 loc) · 11.5 KB
/
visualize_results.py
File metadata and controls
270 lines (224 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import json
import streamlit as st
import os
from const import *
import pandas as pd
import plotly.graph_objects as go
from const import METRIC_COVERAGE, BINARY_METRICS_METADATA
def get_score_color(score):
try:
score = float(score)
except Exception:
return "black" # fallback for non-numeric scores
if score >= 0.95:
return "green"
elif score >= 0.7:
return "orange"
else:
return "red"
def show_dq_assessment_results(df):
dimensions = sorted(df['dimension'].dropna().unique().tolist())
selected_dimension = st.sidebar.selectbox("Select Dimension", dimensions)
st.markdown(f"## DQ Assessment Results")
dim_group = df[df['dimension'] == selected_dimension]
st.markdown(f"**Dimension:** {selected_dimension}")
last_vocab = ''
for metric_id, group in dim_group.groupby('metric_id'):
metric_name = group.iloc[0]['metric']
last_vocab = ''
with st.expander(f"**Metric:** {metric_name}"):
counter = 0
num_metrics = len(group)
for idx, row in group.iterrows():
if counter == 0 and num_metrics != 1:
st.markdown("---")
if 'vocab' in row and row['vocab'] != '' and last_vocab != row['vocab']:
last_vocab = row['vocab']
st.markdown(f"### Vocab: {row['vocab']}")
st.markdown(f"**Description:** {row['metric_description']}")
score = row['score']
color = get_score_color(score)
st.markdown(f"**Score:** <span style='color:{color}'>{round(score,5)}</span>", unsafe_allow_html=True)
if 'meta_metric_calculation' in row and pd.notna(row['meta_metric_calculation']):
st.markdown(f"**Aggregation:** {row['meta_metric_calculation']}")
else:
st.markdown(f"**Metric calculation:** {row['metric_calculation']}")
if ((row['shape_name'].removesuffix("Shape") in BINARY_METRICS_METADATA) or (row['shape_name'].startswith('Authenticity'))) and row['score'] < 1 :
st.markdown(f"**Message:** {row['message']} ")
elif 'violations' in row and pd.notna(row['violations']) and str(row['violations']).strip():
if 'violation_text' in row and pd.notna(row['violation_text']):
st.markdown(f"**Violations ({row['num_violations']}):** {row['violation_text']}")
else:
st.markdown(f"**Violations ({row['num_violations']}):**")
if pd.notna(row['meta_metric_calculation']):
st.markdown(f"*Individual score:* {row['metric_calculation']}")
# Split violations by ';', show only the first 100
violations_list = [v.strip() for v in str(row['violations']).split(';') if v.strip()]
first_100 = violations_list[:100]
st.markdown(
f"""
<div style="max-height: 200px; max-width: 1000px; overflow-y: auto; border: 1px solid #ddd; padding: 8px; background: #f9f9f9; border-radius: 10px;">
<pre style="margin: 0;">{";\n".join(first_100)}</pre>
</div>
""",
unsafe_allow_html=True
)
if len(violations_list) > 100:
st.info(f"Showing first 100 of {len(violations_list)} violations.")
st.markdown("**Shape template:**")
if pd.notnull(row['shape_template']):
st.code(str(row['shape_template']).encode('utf-8').decode('unicode_escape'))
counter += 1
if counter < num_metrics:
st.markdown("---")
def show_dq_assessment_statistics(run_info, dataset_name, df):
"""
Displays DQA statistics
"""
graph_profile = run_info[dataset_name].get('graph_profile', {})
num_triples = graph_profile.get('num_triples', 0)
num_properties = graph_profile.get('num_properties', 0)
num_classes = graph_profile.get('num_classes', 0)
num_entities = graph_profile.get('num_entities', 0)
num_entities_with_interlinking = graph_profile.get('num_entities_with_interlinking', 0)
num_entities_label_property = graph_profile.get('num_entities_label_property', 0)
num_entities_description_property = graph_profile.get('num_entities_description_property', 0)
num_owl_datatype_props = graph_profile.get('count_owl_datatype_properties', 0)
num_owl_object_props = graph_profile.get('count_owl_object_properties', 0)
count_range_props = graph_profile.get('count_range_props', 0)
count_irreflexive_props = graph_profile.get('count_irreflexive_props', 0)
count_inverse_functional_props = graph_profile.get('count_inverse_functional_props', 0)
count_functional_props = graph_profile.get('count_functional_props', 0)
count_asymmetric_props = graph_profile.get('count_asymmetric_props', 0)
num_classes_vocabularies = graph_profile.get('num_classes_vocabularies', 0)
num_properties_vocabularies = graph_profile.get('num_properties_vocabularies', 0)
num_properties_domain = graph_profile.get('num_properties_domain', 0)
st.markdown(f"## Statistics")
st.markdown(f"**Dataset:** {dataset_name}")
st.markdown("**Dataset Statistics**")
stats = {
"Number of triples": num_triples,
"Number of entities": num_entities,
"Number of entities with labels": num_entities_label_property,
"Number of entities with descriptions": num_entities_description_property,
"Number of entities with interlinking property": num_entities_with_interlinking,
"Number of classes used": num_classes,
"Number of properties used": num_properties,
}
if num_owl_datatype_props > 0:
stats["Number of owl:DatatypeProperty used"] = num_owl_datatype_props
if num_owl_object_props > 0:
stats["Number of owl:ObjectProperty used"] = num_owl_object_props
if count_range_props > 0:
stats["Number of properties used with a defined range"] = count_range_props
if num_properties_domain > 0:
stats["Number of properties used that have a defined domain"] = num_properties_domain
if count_inverse_functional_props > 0:
stats["Inverse-functional properties used"] = count_inverse_functional_props
if count_functional_props > 0:
stats["Functional properties used"] = count_functional_props
if count_asymmetric_props > 0:
stats["Asymmetric properties used"] = count_asymmetric_props
if count_irreflexive_props > 0:
stats["Irreflexive properties used"] = count_irreflexive_props
if num_properties_vocabularies > 0:
stats["Properties defined in vocabularies (includes deprecated ones)"] = num_properties_vocabularies
if num_classes_vocabularies > 0:
stats["Classes defined in vocabularies (includes deprecated ones)"] = num_classes_vocabularies
st.table(pd.DataFrame(list(stats.items()), columns=["Statistic", "Value"]).set_index('Statistic'))
num_metrics = int(len(df['metric'].unique()))
def format_elapsed_time(seconds):
if seconds < 60:
return f"{round(seconds, 3)}", "(s)"
else:
return f"{round(seconds / 60, 3)}", "(m)"
total_time_text, total_time_unit = format_elapsed_time(run_info[dataset_name]['total_elapsed_time'])
vocab_time_text, vocab_time_unit = format_elapsed_time(run_info[dataset_name]['vocab_shapes_elapsed_time'])
data_time_text, data_time_unit = format_elapsed_time(run_info[dataset_name]['data_shapes_elapsed_time'])
metadata_time_text, metadata_time_unit = format_elapsed_time(run_info[dataset_name]['metadata_shapes_elapsed_time'])
dq_stats = {
f"Total validation time {total_time_unit}": total_time_text,
f"Metadata shapes validation time {metadata_time_unit}": metadata_time_text,
f"Data shapes validation time {data_time_unit}": data_time_text,
f"Vocabulary shapes validation time {vocab_time_unit}": vocab_time_text,
"Number of metrics": num_metrics,
"Number of shapes": run_info[dataset_name]["num_inst_shapes"],
}
st.markdown("**Data Quality Assessment**")
st.table(pd.DataFrame([(k, str(v)) for k, v in dq_stats.items()], columns=["Statistic", "Value"]).set_index("Statistic"))
def show_metric_coverage():
st.markdown("### Metric coverage & DQ measure definition")
st.markdown("**Total number of metrics:** 69")
# Values for donut chart
labels = ["Covered fully", "Covered partial", "Not covered"]
shacl_core_values = [24, 19, 26]
fig = go.Figure()
# Donut chart
fig.add_trace(go.Pie(
labels=labels,
values=shacl_core_values,
name="SHACL core",
hole=0.5,
domain={'x': [0, 0.48]},
hoverinfo="label+percent+value",
textinfo='percent+label',
marker=dict(colors=["#76C97D", "#E9D362", "#CB614E"])
))
fig.update_layout(
title_text="",
annotations=[
dict(text='SHACL core', x=0.20, y=0.5, font_size=14, showarrow=False),
],
showlegend=False
)
st.plotly_chart(fig)
st.markdown("### Detailed coverage of DQ metrics using SHACL core")
df_dq = pd.DataFrame(
METRIC_COVERAGE,
columns=["Dimension", "Metric Id", "Metric", "SHACL core"]
).set_index('Dimension')
st.dataframe(df_dq, use_container_width=True)
st.markdown("### DQ measure definition")
df_measures = pd.read_csv('dq_assessment/measure_definition.csv').set_index('Group')
df_measures['Individual score'] = df_measures['Individual score'].fillna('')
df_measures['Aggregation'] = df_measures['Aggregation'].fillna('')
st.dataframe(df_measures, use_container_width=True)
def create_results_visualization(run_info):
st.set_page_config(layout='wide')
# ---------------------------
# Sidebar panel
# ---------------------------
st.sidebar.title("")
view_option = st.sidebar.selectbox("View", ["DQA Results", "Metric Coverage & DQ measure definition"])
if view_option == "DQA Results":
DATASETS_FOLDER_PATH = "datasets"
datasets = [
d for d in os.listdir(DATASETS_FOLDER_PATH)
if (
d != "vocabularies"
and os.path.isdir(os.path.join(DATASETS_FOLDER_PATH, d, "results"))
)
]
if not datasets:
st.error("No datasets found.")
return
selected_dataset = st.sidebar.selectbox("Select Dataset", datasets)
dataset_name = selected_dataset
csv_path = f'{DQ_ASSESSMENT_RESULTS_FOLDER_PATH.format(dataset_name=dataset_name)}dq_assessment_{dataset_name}.csv'
if not os.path.exists(csv_path):
st.error(f"No results CSV found for dataset '{dataset_name}'.")
return
df = pd.read_csv(csv_path)
df['vocab'] = df['vocab'].fillna('')
df['violation_text'] = df['violation_text'].fillna('')
df['violations'] = df['violations'].fillna('')
show_dq_assessment_results(df)
show_dq_assessment_statistics(run_info, dataset_name, df)
else:
show_metric_coverage()
def visualize_results():
with open("run_info.json", "r", encoding="utf-8") as f:
run_info = json.load(f)
create_results_visualization(run_info)
if "__main__":
visualize_results()