1
1
#%%
2
2
print ("PYHTON LOG: Importing Libraries" )
3
3
4
- import warnings
5
- warnings .filterwarnings ("ignore" )
4
+ """
5
+ This module handles the ETL processes using PySpark and SQLAlchemy.
6
+ """
6
7
7
8
import os
8
-
9
+ import warnings
9
10
from pyspark .sql import SparkSession
10
11
import pyspark .pandas as ps
11
12
from sqlalchemy import create_engine
12
13
from dotenv import load_dotenv
13
14
15
+ warnings .filterwarnings ("ignore" )
16
+
14
17
load_dotenv ()
15
18
16
19
print ("PYTHON LOG: Imported libraries successfully" )
21
24
USER = os .getenv ('USER' )
22
25
PASSWORD = os .getenv ('PASSWORD' )
23
26
24
- jdbc_url = "jdbc:postgresql://{}/{}" . format ( HOST , DATABASE )
25
- jdbc_driver_path = "C:/spark/jars/postgresql-42.7.3.jar"
27
+ jdbc_url = f "jdbc:postgresql://{ HOST } /{ DATABASE } "
28
+ JDBC_DRIVER_PATH = "C:/spark/jars/postgresql-42.7.3.jar"
26
29
27
30
print ('PYTHON LOG: Creating Spark Session' )
28
31
# Initialize Spark session
29
32
spark = SparkSession \
30
33
.builder \
31
- .config ("sparkk.jars" , jdbc_driver_path ) \
32
- .config ('spark.driver.extraClassPath' , jdbc_driver_path ) \
34
+ .config ("sparkk.jars" , JDBC_DRIVER_PATH ) \
35
+ .config ('spark.driver.extraClassPath' , JDBC_DRIVER_PATH ) \
33
36
.appName ("Data ETL" ) \
34
37
.getOrCreate ()
35
38
print ('PYTHON LOG: Created Spark Session Successfully' )
56
59
.option ('dbtable' , 'stage_loan' ) \
57
60
.option ("driver" , "org.postgresql.Driver" ) \
58
61
.save ()
59
-
60
62
print ('PYTHON LOG: Saved to database successfully' )
61
63
#%%
62
64
print ("PYTHON LOG: Stopping Spark Session" )
63
65
spark .stop ()
64
66
print ("PYTHON LOG: Spark Session stopped successfully" )
65
- #%%
67
+ #%%
0 commit comments