@@ -22,7 +22,7 @@ def self.update_tables
22
22
update_tables = PostgresToRedshift . new
23
23
24
24
update_tables . tables . each do |table |
25
- target_connection . exec ( "CREATE TABLE IF NOT EXISTS public .#{ target_connection . quote_ident ( table . target_table_name ) } (#{ table . columns_for_create } )" )
25
+ target_connection . exec ( "CREATE TABLE IF NOT EXISTS #{ schema } .#{ target_connection . quote_ident ( table . target_table_name ) } (#{ table . columns_for_create } )" )
26
26
27
27
update_tables . copy_table ( table )
28
28
@@ -55,6 +55,10 @@ def self.target_connection
55
55
@target_connection
56
56
end
57
57
58
+ def self . schema
59
+ ENV . fetch ( 'POSTGRES_TO_REDSHIFT_TARGET_SCHEMA' )
60
+ end
61
+
58
62
def source_connection
59
63
self . class . source_connection
60
64
end
@@ -126,15 +130,17 @@ def upload_table(table, buffer, chunk)
126
130
127
131
def import_table ( table )
128
132
puts "Importing #{ table . target_table_name } "
129
- target_connection . exec ( "DROP TABLE IF EXISTS public.#{ table . target_table_name } _updating" )
133
+ schema = self . class . schema
134
+
135
+ target_connection . exec ( "DROP TABLE IF EXISTS #{ schema } .#{ table . target_table_name } _updating" )
130
136
131
137
target_connection . exec ( "BEGIN;" )
132
138
133
- target_connection . exec ( "ALTER TABLE public .#{ target_connection . quote_ident ( table . target_table_name ) } RENAME TO #{ table . target_table_name } _updating" )
139
+ target_connection . exec ( "ALTER TABLE #{ schema } .#{ target_connection . quote_ident ( table . target_table_name ) } RENAME TO #{ table . target_table_name } _updating" )
134
140
135
- target_connection . exec ( "CREATE TABLE public .#{ target_connection . quote_ident ( table . target_table_name ) } (#{ table . columns_for_create } )" )
141
+ target_connection . exec ( "CREATE TABLE #{ schema } .#{ target_connection . quote_ident ( table . target_table_name ) } (#{ table . columns_for_create } )" )
136
142
137
- target_connection . exec ( "COPY public .#{ target_connection . quote_ident ( table . target_table_name ) } FROM 's3://#{ ENV [ 'S3_DATABASE_EXPORT_BUCKET' ] } /export/#{ table . target_table_name } .psv.gz' CREDENTIALS 'aws_access_key_id=#{ ENV [ 'S3_DATABASE_EXPORT_ID' ] } ;aws_secret_access_key=#{ ENV [ 'S3_DATABASE_EXPORT_KEY' ] } ' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';" )
143
+ target_connection . exec ( "COPY #{ schema } .#{ target_connection . quote_ident ( table . target_table_name ) } FROM 's3://#{ ENV [ 'S3_DATABASE_EXPORT_BUCKET' ] } /export/#{ table . target_table_name } .psv.gz' CREDENTIALS 'aws_access_key_id=#{ ENV [ 'S3_DATABASE_EXPORT_ID' ] } ;aws_secret_access_key=#{ ENV [ 'S3_DATABASE_EXPORT_KEY' ] } ' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';" )
138
144
139
145
target_connection . exec ( "COMMIT;" )
140
146
end
0 commit comments