-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathcheck_subdis_outlier.py
More file actions
73 lines (56 loc) · 2.07 KB
/
check_subdis_outlier.py
File metadata and controls
73 lines (56 loc) · 2.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import csv
import numpy as np
SUBDIS_CODE_COL = 4
LATITUDE_COL = 8
LONGITUDE_COL = 9
STDEV_THRESHOLD = 2.5
def read_rows(file):
with open(file, "r", encoding="utf-8") as buffer:
reader = csv.reader(buffer)
rows = [row for row in reader][0:]
return rows
def dict_by_subdis_code(rows) -> dict[str, list[list[str]]]:
subdistricts = {}
for station in rows:
subdis_code = station[SUBDIS_CODE_COL]
if subdis_code not in subdistricts:
subdistricts[subdis_code] = []
subdistricts[subdis_code].append(station)
return subdistricts
def warn_subdistrict(rows: list[list[str]], stdev: float) -> list[list[str]]:
stations = [
row for row in rows if (row[LATITUDE_COL] != "" and row[LONGITUDE_COL] != "")
]
if len(stations) < 2:
return []
else:
latlon = np.array(
[
[float(station[LATITUDE_COL]), float(station[LONGITUDE_COL])]
for station in stations
]
)
# Calculate the centroid (mean of all x and y coordinates)
centroid = np.mean(latlon, axis=0)
# Calculate the 2D distance of each point from the centroid
distances = np.linalg.norm(latlon - centroid, axis=1)
# Calculate the mean and standard deviation of the distances
mean_distance = np.mean(distances)
std_dev_distance = np.std(distances)
# Define the threshold for outliers (mean + standard deviation threshold * standard deviation)
threshold = mean_distance + stdev * std_dev_distance
# Filter only outliers stations to warn
outlier_indices = np.where(distances > threshold)[0]
return [stations[index] for index in outlier_indices]
def main():
rows = read_rows("station66_distinct_clean.csv")
subdistricts = dict_by_subdis_code(rows)
for subdistrict in subdistricts:
[
print(warn)
for warn in warn_subdistrict(
subdistricts[subdistrict], stdev=STDEV_THRESHOLD
)
]
if __name__ == "__main__":
main()