-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBureauOfNormalcy_draft.py
More file actions
160 lines (142 loc) · 6.53 KB
/
BureauOfNormalcy_draft.py
File metadata and controls
160 lines (142 loc) · 6.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/python3
# Title: BureauOfNormalcy.py
# Date Began: 11:12 AM 11/30/2021
# Author: Ilde Senesence
# Keywords: normalizer, csv, example, assignment
# Description: python based script to normalize csv files, repair UTF encoding,
# validate data on preassigned parameters
# Depends: Python 3.9+, ftfy, dateutil.tz, dateutil.parser, logging
#
# Future State:
# Consider asyncio, aiofiles, pandas for future inputs if this needs to be
# web-based, use large inputs, or both
# Consider charset-normalizer if inputs need to be converted to UTF-8 from
# unknown
import sys, re, csv, datetime, dateutil.tz, dateutil.parser, ftfy
def main(argv):
inputfile = ''
outputfile = ''
try:
opts, args = getopt.getopt(argv,"hi:o:",["ifile=","ofile="])
except getopt.GetoptError:
print('bureauofnormalcy.py -i <inputfile> -o <outputfile>')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print('bureauofnormalcy.py -i <inputfile> -o <outputfile>')
sys.exit()
elif opt in ("-i", "--ifile"):
inputfile = arg
elif opt in ("-o", "--ofile"):
outputfile = arg
def get_seconds(duration_str):
if re.search( "\d{2}:\d{2}:\d{2}(\.\d+|\b)" , duration_str ):
hh, mm, sec = duration_str.split(':')
return int(hh) * 3600 + int(mm) * 60 + int(sec)
else:
raise ValueError('Expected data in HH:MM:SS.MS, received: ' +
duration_str)
# This could probably be a function combined with the Timestamp block
normal_date = datetime.datetime.combine(datetime.datetime.now(),
datetime.time(0, tzinfo=dateutil.tz.gettz("America/Los_Angeles")))
normal_tz = dateutil.tz.gettz("America/New_York")
# Open & read the input - would prefer to do this as a stream for larger sets
with open(inputfile, newline='',encoding='utf-8') as csvfile:
oddity = csv.DictReader(csvfile)
oddFields = oddity.keys()
with open(outputfile, mode='x',encoding='utf-8',errors='strict') as \
normalcsvfile:
# Explicitly pulling along "unsafely assumed" fields
normality = csv.DictWriter(normalcsvfile,fieldnames=oddFields)
normality.writeheader()
for csvrow in oddity:
# Convert & "fix" UTF-8
oddTimestamp = fix_text(csvrow['Timestamp'])
oddFooDuration = fix_text(csvrow['FooDuration'])
oddBarDuration = fix_text(csvrow['BarDuration'])
# From here, invalid formatting by UTF should send an error
# Convert to Eastern, assume Pacific
try:
normalTimestamp = dateutil.parse(oddTimestamp,
default=normal_date).astimezone(normal_tz)
except ParserError as errorvalue:
sys.stderr.write( "Error parsing Timestamp:" +
str(errorvalue))
continue
# Unicode validation only, preserve quotes
normalAddress = fix_text(csvrow['Address'])
# Pad from beginning with 0s
try:
normalZIP = int(("00000" + fix_text(csvrow['ZIP']))[-5:])
except ValueError as errorvalue:
sys.stderr.write( "Wrong value type for ZIP code:" +
str(errorvalue))
continue
except ParserError as errorvalue:
sys.stderr.write( "Error parsing ZIP code after UTF " +
"normalization:" + str(errorvalue))
continue
# To Uppercase (beware CJK characters)
try:
normalFullName = fix_text(csvrow['FullName']).upper()
except TypeError as errorvalue:
sys.stderr.write( "Wrong value for FullName (expected " +
"String): " + str(errorvalue))
continue
# HH:MM:SS.MS to TotalSeconds
try:
normalFooDuration = get_seconds(oddFooDuration)
normalBarDuration = get_seconds(oddBarDuration)
except UnicodeError as errorvalue:
sys.stderr.write( "Error decoding Duration from UTF-8:" +
str(errorvalue))
except ValueError as errorvalue:
sys.stderr.write( "Error parsing Duration:" + str(errorvalue))
# Sum of FooDuration and BarDuration
normalTotalDuration = normalFooDuration + normalBarDuration
# Unmodified BUT invalid UTF-8 characters to be replaced with Unicode
# Replacement Character
normalNotes = csvrow['Notes'].decode("utf-8", "replace")
# Construct output line from normalized known fields and all unknown fields
normalRow = {}
for key in oddity.keys():
if (key.lower() == "timestamp"):
normalRow[key] = normalTimestamp
elif (key.lower() == "address"):
normalRow[key] = normalAddress
elif (key.lower() == "zip"):
normalRow[key] = normalZIP
elif (key.lower() == "fullname"):
normalRow[key] = normalFullName
elif (key.lower() == "fooduration"):
normalRow[key] = normalFooDuration
elif (key.lower() == "barduration"):
normalRow[key] = normalBarDuration
elif (key.lower() == "totalduration"):
normalRow[key] = normalTotalDuration
elif (key.lower() == "notes"):
normalRow[key] = normalNotes
else:
normalRow[key] = oddity[key]
# For use when 3.10+ python is easier to use/install
# match key:
# case "Timestamp":
# normalRow[key] = normalTimestamp
# case "Address":
# normalRow[key] = normalAddress
# case "ZIP":
# normalRow[key] = normalZIP
# case "FullName":
# normalRow[key] = normalFullName
# case "FooDuration":
# normalRow[key] = normalFooDuration
# case "BarDuration":
# normalRow[key] = normalBarDuration
# case "TotalDuration":
# normalRow[key] = normalTotalDuration
# case "Notes":
# normalRow[key] = normalNotes
# case _:
# normalRow[key] = oddity[key]
# Time to actually write
normality.writeRow(normalRow)