forked from ni/nisystemlink-clients-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_dataframe_utilities.py
330 lines (269 loc) · 12.5 KB
/
_dataframe_utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
from typing import Any, Callable, Dict, List, Optional
import pandas as pd
from nisystemlink.clients.testmonitor.models import (
Measurement,
Result,
Step,
StepProjection,
)
from nisystemlink.clients.testmonitor.utilities.constants import DataFrameHeaders
def has_name_and_measurement(measurement: Measurement) -> bool:
"""Checks if a step data parameter is measurement data by ensuring it has both
'name' and 'measurement' fields.
Args:
measurement: A measurement data object
Returns:
bool: True if the measurement has both 'name' and 'measurement' fields, False otherwise.
"""
return measurement.name is not None and measurement.measurement is not None
def convert_results_to_dataframe(
results: List[Result], set_id_as_index: bool = True
) -> pd.DataFrame:
"""Creates a Pandas DataFrame for the results.
Args:
results: The list of results to be included in the dataframe.
set_id_as_index: If true (default value), result id will be set as index for the dataframe.
If false, index will not be set.
Returns:
A Pandas DataFrame with the each result fields having a separate column.
Following fields are split into sub-columns.
- status_type_summary: All the entries will be split into separate columns.
For example, status_type_summary.LOOPING, status_type_summary.PASSED, etc
- Properties: All the properties will be split into separate columns. For example,
properties.property1, properties.property2, etc.
"""
results_dict = []
for result in results:
data = result.dict(exclude_none=True)
__normalize_status(data)
results_dict.append(data)
normalized_dataframe = pd.json_normalize(results_dict, sep=".")
normalized_dataframe = __format_results_columns(
results_dataframe=normalized_dataframe
)
if set_id_as_index and "id" in normalized_dataframe.columns:
normalized_dataframe.set_index("id", inplace=True)
return normalized_dataframe
def convert_steps_to_dataframe(
steps: List[Step],
is_valid_measurement: Optional[
Callable[[Measurement], bool]
] = has_name_and_measurement,
) -> pd.DataFrame:
"""Converts a list of steps into a normalized dataframe.
Args:
steps: A list of steps.
is_valid_measurement: Optional callback function that checks if a step data parameter is a
valid measurement so that only those are included in the returned dataframe. The method takes
a measurement as input and returns a boolean value.
The default behavior is to consider only measurement data that have both 'name' and 'measurement'
fields with values as valid measurements.
If none of the measurements have the required fields, the step data parameters will not
appear in the dataframe.
If the callback function is set to None, all step data parameters will be included in the dataframe.
Returns:
DataFrame:
- A Pandas DataFrame containing the steps data. The DataFrame would consist of all the
fields in the input steps.
- A new column would be created for unique `properties` across all steps. The property
columns would be named in the format `properties.property_name`.
- A new column would be created for unique `Inputs` and `Outputs` across all steps. The columns
would be named in the format `inputs.input_name` and `outputs.output_name` respectively.
- The column headers for the step data parameters would differ based on the callback function. If
the None is passed for the callback function, the column would be prefixed with `data.parameters.`.
If the callback function is set, the column would be prefixed with `data.measurement.`.
"""
DATA_PARAMETERS_PREFIX = (
"data.parameters" if is_valid_measurement is None else "data.measurement"
)
step_dicts = __convert_steps_to_dict(steps, is_valid_measurement)
steps_dataframe = pd.json_normalize(step_dicts, sep=".")
steps_dataframe = __explode_and_normalize(
steps_dataframe, "data.parameters", f"{DATA_PARAMETERS_PREFIX}."
)
grouped_columns = __group_step_columns(steps_dataframe.columns)
return steps_dataframe.reindex(columns=grouped_columns, copy=False)
def __normalize_status(
data: Dict[str, Any],
) -> None:
"""Normalizes the status object into a string.
Args:
data: Dictionary containing status information.
"""
status = data.get("status", {})
if status:
if status.get("status_type") == "CUSTOM":
data["status"] = status.get("status_name", None)
else:
data["status"] = getattr(status.get("status_type", None), "value", None)
def __format_results_columns(results_dataframe: pd.DataFrame) -> pd.DataFrame:
"""Format results column to keep properties at the end.
Args:
results_dataframe: Dataframe of results.
Returns:
Formatted dataframe of results.
"""
column_headers = results_dataframe.columns.to_list()
standard_column_headers = [
header for header in column_headers if __is_standard_column_header(header)
]
status_type_summary_header = [
header
for header in column_headers
if __is_status_type_summary_header(header=header)
]
properties_headers = [
header for header in column_headers if __is_property_header(header=header)
]
standard_column_headers += status_type_summary_header + properties_headers
return results_dataframe.reindex(columns=standard_column_headers, copy=False)
def __is_standard_column_header(header: str) -> bool:
"""Check if column header is not status type summary or property.
Args:
header: column header for results dataframe.
Returns:
True if header doesn't start with 'status_type_summary.', 'properties.'. Else returns false.
"""
return not (
__is_status_type_summary_header(header=header)
or __is_property_header(header=header)
)
def __is_status_type_summary_header(header: str) -> bool:
"""Check if column header is not a status type summary.
Args:
header: column header for results dataframe.
Returns:
True if header contains 'status_type_summary.'. Else returns false.
"""
return header.startswith(DataFrameHeaders.STATUS_TYPE_SUMMARY_HEADER_PREFIX)
def __is_property_header(header: str) -> bool:
"""Check if column header is not a property.
Args:
header: column header for results dataframe.
Returns:
True if header contains 'properties.'. Else returns false.
"""
return header.startswith(DataFrameHeaders.PROPERTY_COLUMN_HEADER_PREFIX)
def __convert_steps_to_dict(
steps: List[Step],
is_valid_measurement: Optional[Callable[[Measurement], bool]],
) -> List[Dict[str, Any]]:
"""Converts a list of steps to dictionaries, excluding None values.
Args:
steps: A list of steps.
is_valid_measurement: Optional callback function that checks if a step data
parameter is a valid measurement so that only those are included in the returned dataframe.
Returns:
List[Dict[str, Any]]: A list of dictionaries containing step information.
"""
steps_dict = []
for step in steps:
single_step_dict = step.dict(exclude_none=True)
__filter_invalid_measurements(single_step_dict, step, is_valid_measurement)
__normalize_inputs_outputs(single_step_dict, step)
__normalize_status(single_step_dict)
steps_dict.append(single_step_dict)
return steps_dict
def __filter_invalid_measurements(
step_dict: Dict[str, Any],
step: Step,
is_valid_measurement: Optional[Callable[[Measurement], bool]],
) -> None:
"""Gets data parameters from the step dictionary and filters it based on the callback function.
Args:
step_dict: A dictionary with step information.
step: A Step object containing data parameters.
is_valid_measurement: Optional callback function to check if a measurement is valid. The method takes
a Measurement as input and returns a boolean value. The default behavior is to consider only parameters
that have both 'name' and 'measurement' fields with values as valid measurements.
Returns:
None: The function modifies step dictionary in place with filtered data parameters.
"""
if step.data and step.data.parameters and is_valid_measurement is not None:
valid_measurement_parameters = []
for measurement in step.data.parameters:
if (
measurement
and is_valid_measurement
and is_valid_measurement(measurement)
):
valid_measurement_parameters.append(measurement)
step_dict["data"]["parameters"] = [
measurement.dict(exclude_none=True)
for measurement in valid_measurement_parameters
]
def __normalize_inputs_outputs(
step_dict: Dict[str, Any],
step: Step,
) -> None:
"""Normalizes the input and output fields by converting them into dictionaries.
Args:
step_dict: A dictionary with step information.
step: A Step object containing inputs and outputs.
Returns:
None: The function modifies step_dict in place with normalized inputs and outputs.
"""
STEP_INPUTS = StepProjection.INPUTS.lower()
STEP_OUTPUTS = StepProjection.OUTPUTS.lower()
if STEP_INPUTS in step_dict:
step_dict[STEP_INPUTS] = (
{item.name: item.value for item in step.inputs} if step.inputs else {}
)
if STEP_OUTPUTS in step_dict:
step_dict[STEP_OUTPUTS] = (
{item.name: item.value for item in step.outputs} if step.outputs else {}
)
def __explode_and_normalize(
dataframe: pd.DataFrame, column: str, prefix: str
) -> pd.DataFrame:
"""Explodes a specified column in the dataframe and normalizes its nested data.
This function handles the process of exploding a column that contains lists or arrays,
transforming each list element into a separate row. After exploding, it normalizes the
nested data into flat columns using the specified prefix, making it easier to analyze
and manipulate. The new columns are added to the original dataframe.
Args:
dataframe: The input DataFrame that contains the column to explode and normalize.
column: The name of the column in the DataFrame that contains the list-like data to explode.
prefix: The prefix to add to the new column names created during the normalization process.
Returns:
DataFrame:
- A new DataFrame with the exploded rows and the normalized columns, all combined
with the original data in the dataframe.
- If the column is not found in the dataframe, the original dataframe is returned unchanged.
"""
if column in dataframe:
exploded_dataframe = dataframe.explode(column, ignore_index=True)
normalized_dataframe = pd.json_normalize(
exploded_dataframe.pop(column)
).add_prefix(prefix)
return pd.concat([exploded_dataframe, normalized_dataframe], axis=1, copy=False)
return dataframe
def __group_step_columns(dataframe_columns: List[str]) -> List[str]:
"""Groups and orders dataframe columns into predefined categories to maintain a consistent structure.
When normalizing steps into a dataframe, new input, output, or property fields may be added at the end,
disrupting the expected column order. This function ensures columns are grouped properly.
Args:
dataframe_columns: The list of all columns from the normalized dataframe.
Returns:
List[str]: A list containing grouped and ordered columns.
"""
GENERAL_CATEGORIES = "general"
CATEGORY_KEYS = DataFrameHeaders.CATEGORY_COLUMN_HEADERS
grouped_columns: Dict[str, List[str]] = {category: [] for category in CATEGORY_KEYS}
for column in dataframe_columns:
column_lower = column.lower()
key = next(
(
category
for category in CATEGORY_KEYS[1:]
if column_lower.startswith(category)
and column != StepProjection.DATA_MODEL.lower()
),
GENERAL_CATEGORIES,
)
grouped_columns[key].append(column)
return [
column
for category_key in CATEGORY_KEYS
for column in grouped_columns[category_key]
]