-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinspections_etl.py
More file actions
67 lines (53 loc) · 2.19 KB
/
inspections_etl.py
File metadata and controls
67 lines (53 loc) · 2.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/python3.9
# -*- coding: utf-8 -*-
import os
import sys
from utils import constants_sql, helpers
def process_inspections(logger):
try:
conn = helpers.get_database_connection()
except Exception as e:
logger.error(f"Getting database connection: {e}")
sys.exit("Unable to get database connection")
cur = conn.cursor()
dict_cur = helpers.get_dictionary_cursor(conn)
logger.info("Starting Fulton inspections extraction")
url = helpers.build_fulton_extract_url('inspections', logger)
results = helpers.extract_violations(url, logger)
number_results = len(results)
logger.info(f"Extracted {number_results} inspection records")
load_status = helpers.load_fulton_inspections(conn, cur, results, logger)
logger.info(f"Load status: {load_status}")
if load_status != 'success':
helpers.exit_out(cur, dict_cur, conn, logger)
logger.info("Starting Fulton violations extraction")
url = helpers.build_fulton_extract_url('violations', logger)
results = helpers.extract_violations(url, logger)
number_results = len(results)
logger.info(f"Extracted {number_results} violations records")
load_status = helpers.load_fulton_violations(conn, cur, results, logger)
logger.info(f"Load status: {load_status}")
if load_status != 'success':
helpers.exit_out(cur, dict_cur, conn, logger)
geocode_updates = []
results = helpers.get_records2geocode(dict_cur, logger)
number_results = len(results)
logger.info(f"{number_results} records to GeoCode")
for row in results:
geocode_results = helpers.extract_geocodes(row, logger)
if geocode_results:
geocode_updates.append(geocode_results)
logger.info('GeoCoding complete')
logger.info(f'Number records {len(geocode_updates)} GeoCoded')
cur.executemany(constants_sql.UPDATE_GEOCODING_SQL, geocode_updates)
conn.commit()
cur.close()
dict_cur.close()
conn.close()
if __name__ == '__main__':
# move to working directory...
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)
logger = helpers.setup_logger_stdout('inspections_etl')
process_inspections(logger)