-
Notifications
You must be signed in to change notification settings - Fork 234
Expand file tree
/
Copy pathjdbc.go
More file actions
207 lines (180 loc) · 7.82 KB
/
Copy pathjdbc.go
File metadata and controls
207 lines (180 loc) · 7.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package jdbc
import (
"context"
"database/sql"
"fmt"
"github.com/datazip-inc/olake/protocol"
"github.com/datazip-inc/olake/types"
)
// MinMaxQuery returns the query to fetch MIN and MAX values of a column in a table
func MinMaxQuery(stream protocol.Stream, column string) string {
return fmt.Sprintf(`SELECT MIN(%[1]s) AS min_value, MAX(%[1]s) AS max_value FROM %[2]s.%[3]s`, column, stream.Namespace(), stream.Name())
}
// NextChunkEndQuery returns the query to calculate the next chunk boundary
func NextChunkEndQuery(stream protocol.Stream, column string, chunkSize int) string {
return fmt.Sprintf(`SELECT MAX(%[1]s) FROM (SELECT %[1]s FROM %[2]s.%[3]s WHERE %[1]s > ? ORDER BY %[1]s LIMIT %[4]d) AS subquery`, column, stream.Namespace(), stream.Name(), chunkSize)
}
// buildChunkCondition creates SQL conditions for filtering based on chunk boundaries
// with formatting determined by the provided formatter function
func buildChunkCondition(
filterColumn string,
chunk types.Chunk,
formatter func(column string, operator string, value interface{}) string,
) string {
// If formatter is nil, use default formatting
if formatter == nil {
formatter = func(column string, operator string, value interface{}) string {
return fmt.Sprintf("%s %s %v", column, operator, value)
}
}
// Only Min condition
if chunk.Min != nil && chunk.Max == nil {
return formatter(filterColumn, ">=", chunk.Min)
}
// Only Max condition
if chunk.Min == nil && chunk.Max != nil {
return formatter(filterColumn, "<=", chunk.Max)
}
// Both Min and Max conditions
return fmt.Sprintf("%s AND %s",
formatter(filterColumn, ">=", chunk.Min),
formatter(filterColumn, "<=", chunk.Max))
}
// PostgreSQL-Specific Queries
// TODO: Rewrite queries for taking vars as arguments while execution.
// PostgresMinMaxQuery returns the query to fetch MIN and MAX values of a column in a table
func PostgresMinMaxQuery(stream protocol.Stream, column string, columnType types.DataType) string {
if columnType == types.String {
return fmt.Sprintf(`SELECT MIN(%[1]s::text) AS min_value, MAX(%[1]s::text) AS max_value FROM %[2]s.%[3]s`, column, stream.Namespace(), stream.Name())
}
return fmt.Sprintf(`SELECT MIN(%[1]s) AS min_value, MAX(%[1]s) AS max_value FROM %[2]s.%[3]s`, column, stream.Namespace(), stream.Name())
}
// PostgresWithoutState returns the query for a simple SELECT without state
func PostgresWithoutState(stream protocol.Stream) string {
return fmt.Sprintf(`SELECT * FROM "%s"."%s" ORDER BY %s`, stream.Namespace(), stream.Name(), stream.Cursor())
}
// PostgresWithState returns the query for a SELECT with state
func PostgresWithState(stream protocol.Stream) string {
return fmt.Sprintf(`SELECT * FROM "%s"."%s" where "%s">$1 ORDER BY "%s" ASC NULLS FIRST`, stream.Namespace(), stream.Name(), stream.Cursor(), stream.Cursor())
}
// PostgresRowCountQuery returns the query to fetch the estimated row count in PostgreSQL
func PostgresRowCountQuery(stream protocol.Stream) string {
return fmt.Sprintf(`SELECT reltuples::bigint AS approx_row_count FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = '%s' AND n.nspname = '%s';`, stream.Name(), stream.Namespace())
}
// PostgresRelPageCount returns the query to fetch relation page count in PostgreSQL
func PostgresRelPageCount(stream protocol.Stream) string {
return fmt.Sprintf(`SELECT relpages FROM pg_class WHERE relname = '%s' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = '%s')`, stream.Name(), stream.Namespace())
}
// PostgresWalLSNQuery returns the query to fetch the current WAL LSN in PostgreSQL
func PostgresWalLSNQuery() string {
return `SELECT pg_current_wal_lsn()::text::pg_lsn`
}
// PostgresNextChunkEndQuery generates a SQL query to fetch the maximum value of a specified column
func PostgresNextChunkEndQuery(stream protocol.Stream, filterColumn string, filterValue interface{}, batchSize int, filterColumnType types.DataType) string {
if filterColumnType == types.String {
return fmt.Sprintf(`SELECT MAX(%s::text) FROM (SELECT %s FROM "%s"."%s" WHERE %s::text > $$%v$$ ORDER BY %s ASC LIMIT %d) AS T`, filterColumn, filterColumn, stream.Namespace(), stream.Name(), filterColumn, filterValue, filterColumn, batchSize)
}
return fmt.Sprintf(`SELECT MAX(%s) FROM (SELECT %s FROM "%s"."%s" WHERE %s > %v ORDER BY %s ASC LIMIT %d) AS T`, filterColumn, filterColumn, stream.Namespace(), stream.Name(), filterColumn, filterValue, filterColumn, batchSize)
}
// PostgresMinQuery returns the query to fetch the minimum value of a column in PostgreSQL
func PostgresMinQuery(stream protocol.Stream, filterColumn string, filterValue interface{}) string {
return fmt.Sprintf(`SELECT MIN(%s) FROM "%s"."%s" WHERE %s > %v`, filterColumn, stream.Namespace(), stream.Name(), filterColumn, filterValue)
}
// PostgresBuildSplitScanQuery builds a chunk scan query for PostgreSQL
func PostgresChunkScanQuery(stream protocol.Stream, filterColumn string, chunk types.Chunk, filterColumnType types.DataType) string {
postgresFormatter := func(column string, operator string, value interface{}) string {
if filterColumnType == types.String {
return fmt.Sprintf("%s::text %s $$%v$$", column, operator, value)
}
return fmt.Sprintf("%s %s %v", column, operator, value)
}
condition := buildChunkCondition(filterColumn, chunk, postgresFormatter)
return fmt.Sprintf(`SELECT * FROM "%s"."%s" WHERE %s`, stream.Namespace(), stream.Name(), condition)
}
// MySQL-Specific Queries
// MySQLWithoutState builds a chunk scan query for MySql
func MysqlChunkScanQuery(stream protocol.Stream, filterColumn string, chunk types.Chunk) string {
condition := buildChunkCondition(filterColumn, chunk, nil)
return fmt.Sprintf("SELECT * FROM `%s`.`%s` WHERE %s", stream.Namespace(), stream.Name(), condition)
}
// MySQLDiscoverTablesQuery returns the query to discover tables in a MySQL database
func MySQLDiscoverTablesQuery() string {
return `
SELECT
TABLE_NAME,
TABLE_SCHEMA
FROM
INFORMATION_SCHEMA.TABLES
WHERE
TABLE_SCHEMA = ?
AND TABLE_TYPE = 'BASE TABLE'
`
}
// MySQLTableSchemaQuery returns the query to fetch schema information for a table in MySQL
func MySQLTableSchemaQuery() string {
return `
SELECT
COLUMN_NAME,
COLUMN_TYPE,
DATA_TYPE,
IS_NULLABLE,
COLUMN_KEY
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_SCHEMA = ? AND TABLE_NAME = ?
ORDER BY
ORDINAL_POSITION
`
}
// MySQLPrimaryKeyQuery returns the query to fetch the primary key column of a table in MySQL
func MySQLPrimaryKeyQuery() string {
return `
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = ?
AND CONSTRAINT_NAME = 'PRIMARY'
LIMIT 1
`
}
// MySQLTableRowsQuery returns the query to fetch the estimated row count of a table in MySQL
func MySQLTableRowsQuery() string {
return `
SELECT TABLE_ROWS
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = ?
`
}
// MySQLMasterStatusQuery returns the query to fetch the current binlog position in MySQL
func MySQLMasterStatusQuery() string {
return "SHOW MASTER STATUS"
}
// MySQLTableColumnsQuery returns the query to fetch column names of a table in MySQL
func MySQLTableColumnsQuery() string {
return `
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
ORDER BY ORDINAL_POSITION
`
}
func WithIsolation(ctx context.Context, client *sql.DB, fn func(tx *sql.Tx) error) error {
tx, err := client.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer func() {
if rerr := tx.Rollback(); rerr != nil && rerr != sql.ErrTxDone {
fmt.Printf("transaction rollback failed: %v\n", rerr)
}
}()
if err := fn(tx); err != nil {
return err
}
return tx.Commit()
}