-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathto_sql_newrows.py
187 lines (162 loc) · 7.42 KB
/
to_sql_newrows.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import os
import sys
import time
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import threading
from timeit import default_timer as timer
os.path.dirname(os.path.abspath(__file__))
def clean_df_db_dups(df, tablename, engine, dup_cols=[],
filter_continuous_col=None, filter_categorical_col=None):
"""
Remove rows from a dataframe that already exist in a database
Required:
df : dataframe to remove duplicate rows from
engine: SQLAlchemy engine object
tablename: tablename to check duplicates in
dup_cols: list or tuple of column names to check for duplicate row values
Optional:
filter_continuous_col: the name of the continuous data column for BETWEEEN min/max filter
can be either a datetime, int, or float data type
useful for restricting the database table size to check
filter_categorical_col : the name of the categorical data column for Where = value check
Creates an "IN ()" check on the unique values in this column
Returns
Unique list of values from dataframe compared to database table
"""
args = 'SELECT %s FROM %s' %(', '.join(['"{0}"'.format(col) for col in dup_cols]), tablename)
args_contin_filter, args_cat_filter = None, None
if filter_continuous_col is not None:
if df[filter_continuous_col].dtype == 'datetime64[ns]':
args_contin_filter = """ "%s" BETWEEN Convert(datetime, '%s')
AND Convert(datetime, '%s')""" %(filter_continuous_col,
df[filter_continuous_col].min(), df[filter_continuous_col].max())
if filter_categorical_col is not None:
args_cat_filter = ' "%s" in(%s)' %(filter_categorical_col,
', '.join(["'{0}'".format(value) for value in df[filter_categorical_col].unique()]))
if args_contin_filter and args_cat_filter:
args += ' Where ' + args_contin_filter + ' AND' + args_cat_filter
elif args_contin_filter:
args += ' Where ' + args_contin_filter
elif args_cat_filter:
args += ' Where ' + args_cat_filter
df.drop_duplicates(dup_cols, keep='last', inplace=True)
df = pd.merge(df, pd.read_sql(args, engine), how='left', on=dup_cols, indicator=True)
df = df[df['_merge'] == 'left_only']
df.drop(['_merge'], axis=1, inplace=True)
return df
def to_sql_newrows(df, pool_size, *args, **kargs):
"""
Extend the Python pandas to_sql() method to thread database insertion
Required:
df : pandas dataframe to insert new rows into a database table
POOL_SIZE : your sqlalchemy max connection pool size. Set < your db connection limit.
Example where this matters: your cloud DB has a connection limit.
*args:
Pandas to_sql() arguments.
Required arguments are:
tablename : Database table name to write results to
engine : SqlAlchemy engine
Optional arguments are:
'if_exists' : 'append' or 'replace'. If table already exists, use append.
'index' : True or False. True if you want to write index values to the db.
Credits for intial threading code:
http://techyoubaji.blogspot.com/2015/10/speed-up-pandas-tosql-with.html
"""
CHUNKSIZE = 1000
INITIAL_CHUNK = 100
if len(df) > CHUNKSIZE:
#write the initial chunk to the database if df is bigger than chunksize
df.iloc[:INITIAL_CHUNK, :].to_sql(*args, **kargs)
else:
#if df is smaller than chunksize, just write it to the db now
df.to_sql(*args, **kargs)
workers, i = [], 0
for i in range((df.shape[0] - INITIAL_CHUNK)/CHUNKSIZE):
t = threading.Thread(target=lambda: df.iloc[INITIAL_CHUNK+i*CHUNKSIZE:INITIAL_CHUNK+(i+1)*CHUNKSIZE].to_sql(*args, **kargs))
t.start()
workers.append(t)
df.iloc[INITIAL_CHUNK+(i+1)*CHUNKSIZE:, :].to_sql(*args, **kargs)
[t.join() for t in workers]
def setup(engine, tablename):
engine.execute("""DROP TABLE IF EXISTS "%s" """ % (tablename))
engine.execute("""CREATE TABLE "%s" (
"A" INTEGER,
"B" INTEGER,
"C" INTEGER,
"D" INTEGER,
CONSTRAINT pk_A_B PRIMARY KEY ("A","B"))
""" % (tablename))
if __name__ == '__main__':
DB_TYPE = 'postgresql'
DB_DRIVER = 'psycopg2'
DB_USER = 'admin'
DB_PASS = 'password'
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'pandas_upsert'
POOL_SIZE = 50
TABLENAME = 'test_upsert'
SQLALCHEMY_DATABASE_URI = '%s+%s://%s:%s@%s:%s/%s' % (DB_TYPE, DB_DRIVER, DB_USER,
DB_PASS, DB_HOST, DB_PORT, DB_NAME)
ENGINE = create_engine(
SQLALCHEMY_DATABASE_URI, pool_size=POOL_SIZE, max_overflow=0)
print 'setting up db'
setup(ENGINE, TABLENAME)
try:
i=0
prev = timer()
start = timer()
for i in range(10):
print 'running test %s' %(str(i))
df = pd.DataFrame(
np.random.randint(0, 500, size=(100000, 4)), columns=list('ABCD'))
df = clean_df_db_dups(df, TABLENAME, ENGINE, dup_cols=['A', 'B'])
print 'row count after drop db duplicates is now : %s' %(df.shape[0])
df.to_sql(TABLENAME, ENGINE, if_exists='append', index=False)
end = timer()
elapsed_time = end - prev
prev = timer()
print 'completed loop in %s sec!' %(elapsed_time)
i += 1
end = timer()
elapsed_time = end - start
print 'completed singlethread insert loops in %s sec!' %(elapsed_time)
inserted = pd.read_sql('SELECT count("A") from %s' %(TABLENAME), ENGINE)
print 'inserted %s new rows into database!' %(inserted.iloc[0]['count'])
print '\n setting up db'
setup(ENGINE, TABLENAME)
print '\n'
i=0
prev = timer()
start = timer()
for i in range(10):
print 'running test %s' %(str(i))
df = pd.DataFrame(
np.random.randint(0, 500, size=(100000, 4)), columns=list('ABCD'))
df.drop_duplicates(['A', 'B'], keep='last', inplace=True)
df.to_sql('temp', ENGINE, if_exists='replace', index=False)
connection = ENGINE.connect()
args1 = """ INSERT INTO "test_upsert"
SELECT * FROM
(SELECT a.*
FROM "temp" a LEFT OUTER JOIN "test_upsert" b
ON (a."A" = b."A" and a."B"=b."B")
WHERE b."A" is null) b"""
result = connection.execute(args1)
args2 = """ DROP Table If Exists "temp" """
connection.execute(args2)
connection.close()
end = timer()
elapsed_time = end - prev
prev = timer()
print 'completed loop in %s sec!' %(elapsed_time)
i += 1
end = timer()
elapsed_time = end - start
print 'completed staging insert loops in %s sec!' %(elapsed_time)
inserted = pd.read_sql('SELECT count("A") from %s' %(TABLENAME), ENGINE)
print 'inserted %s new rows into database!' %(inserted.iloc[0]['count'])
except KeyboardInterrupt:
print("Interrupted... exiting...")