-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
60 lines (51 loc) · 1.92 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
def quality_check(spark, data, columns, count, table_name):
"""
Provides quality checks on a spark DataFrame. Checks
that there are rows in DataFrame, there are correct
number of columns, and that any columns that should not
have any null values do not have null values/
Args:
spark: spark session
data: spark DataFrame
columns: columns to be checked for null values
(list(str)
count: expected number of columns in the DataFrame
(int)
table_name: name of Table the DataFrame corresponds with
Returns:
True if all checks pass; otherwise False
"""
total_checks = 2 + len(columns)
fail_count = 0
pass_count = 0
col_count = len(data.columns)
row_count = data.count()
if row_count == 0:
print(f"Row check failed: {table_name} has no rows")
fail_count += 1
else:
print(f"Row check passed: {table_name} has {row_count} rows")
pass_count += 1
if col_count != count:
print(
f"Column check failed: {table_name} has {col_count} instead of {count} columns"
)
fail_count += 1
else:
print(f"Column check passed: {table_name} has {col_count} columns")
pass_count += 1
for check in columns:
check_null = data.filter(col(check).isNull()).count()
if check_null > 0:
print(
f"Null check failed for {check} column in {table_name}, there are {check_null} values"
)
fail_count += 1
else:
print(f"Null check passed for {check} column in {table_name}")
pass_count += 1
print(f"Passed checks out of total = {pass_count} / {total_checks} in {table_name}")
print(f"Failed checks out of total = {fail_count} / {total_checks} in {table_name}")
return (fail_count == 0)