Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions drivers/postgres/internal/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (p *Postgres) backfill(pool *protocol.WriterPool, stream protocol.Stream) e
defer tx.Rollback()
splitColumn := stream.Self().StreamMetadata.SplitColumn
splitColumn = utils.Ternary(splitColumn == "", "ctid", splitColumn).(string)
stmt := jdbc.PostgresChunkScanQuery(stream, splitColumn, chunk)
splitColType, _ := stream.Schema().GetType(splitColumn)
stmt := jdbc.PostgresChunkScanQuery(stream, splitColumn, chunk, splitColType)

setter := jdbc.NewReader(backfillCtx, stmt, p.config.BatchSize, func(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
return tx.Query(query, args...)
Expand Down Expand Up @@ -163,8 +164,8 @@ func (p *Postgres) splitTableIntoChunks(stream protocol.Stream) ([]types.Chunk,
splitColumn := stream.Self().StreamMetadata.SplitColumn
if splitColumn != "" {
var minValue, maxValue interface{}
minMaxRowCountQuery := jdbc.MinMaxQuery(stream, splitColumn)
// TODO: Fails on UUID type (Good First Issue)
splitColType, _ := stream.Schema().GetType(splitColumn)
minMaxRowCountQuery := jdbc.PostgresMinMaxQuery(stream, splitColumn, splitColType)
err := p.client.QueryRow(minMaxRowCountQuery).Scan(&minValue, &maxValue)
if err != nil {
return nil, fmt.Errorf("failed to fetch table min max: %s", err)
Expand All @@ -180,7 +181,6 @@ func (p *Postgres) splitTableIntoChunks(stream protocol.Stream) ([]types.Chunk,
return nil, fmt.Errorf("provided split column is not a primary key")
}

splitColType, _ := stream.Schema().GetType(splitColumn)
// evenly distirbution only available for float and int types
if splitColType == types.Int64 || splitColType == types.Float64 {
return splitViaBatchSize(minValue, maxValue, p.config.BatchSize)
Expand All @@ -193,7 +193,8 @@ func (p *Postgres) splitTableIntoChunks(stream protocol.Stream) ([]types.Chunk,

func (p *Postgres) nextChunkEnd(stream protocol.Stream, previousChunkEnd interface{}, splitColumn string) (interface{}, error) {
var chunkEnd interface{}
nextChunkEnd := jdbc.PostgresNextChunkEndQuery(stream, splitColumn, previousChunkEnd, p.config.BatchSize)
splitColType, _ := stream.Schema().GetType(splitColumn)
nextChunkEnd := jdbc.PostgresNextChunkEndQuery(stream, splitColumn, previousChunkEnd, p.config.BatchSize, splitColType)
err := p.client.QueryRow(nextChunkEnd).Scan(&chunkEnd)
if err != nil {
return nil, fmt.Errorf("failed to query[%s] next chunk end: %s", nextChunkEnd, err)
Expand Down
40 changes: 37 additions & 3 deletions pkg/jdbc/jdbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ func buildChunkCondition(filterColumn string, chunk types.Chunk) string {
// PostgreSQL-Specific Queries
// TODO: Rewrite queries for taking vars as arguments while execution.

// PostgresMinMaxQuery returns the query to fetch MIN and MAX values of a column in a table
func PostgresMinMaxQuery(stream protocol.Stream, column string, columnType types.DataType) string {
if columnType == types.String {
return fmt.Sprintf(`SELECT MIN(%[1]s::text) AS min_value, MAX(%[1]s::text) AS max_value FROM %[2]s.%[3]s`, column, stream.Namespace(), stream.Name())
}
return fmt.Sprintf(`SELECT MIN(%[1]s) AS min_value, MAX(%[1]s) AS max_value FROM %[2]s.%[3]s`, column, stream.Namespace(), stream.Name())
}

// PostgresWithoutState returns the query for a simple SELECT without state
func PostgresWithoutState(stream protocol.Stream) string {
return fmt.Sprintf(`SELECT * FROM "%s"."%s" ORDER BY %s`, stream.Namespace(), stream.Name(), stream.Cursor())
Expand All @@ -58,7 +66,10 @@ func PostgresWalLSNQuery() string {
}

Comment thread
rajivharlalka marked this conversation as resolved.
// PostgresNextChunkEndQuery generates a SQL query to fetch the maximum value of a specified column
func PostgresNextChunkEndQuery(stream protocol.Stream, filterColumn string, filterValue interface{}, batchSize int) string {
func PostgresNextChunkEndQuery(stream protocol.Stream, filterColumn string, filterValue interface{}, batchSize int, filterColumnType types.DataType) string {
if filterColumnType == types.String {
Comment thread
rajivharlalka marked this conversation as resolved.
return fmt.Sprintf(`SELECT MAX(%s::text) FROM (SELECT %s FROM "%s"."%s" WHERE %s::text > $$%v$$ ORDER BY %s ASC LIMIT %d) AS T`, filterColumn, filterColumn, stream.Namespace(), stream.Name(), filterColumn, filterValue, filterColumn, batchSize)
}
return fmt.Sprintf(`SELECT MAX(%s) FROM (SELECT %s FROM "%s"."%s" WHERE %s > %v ORDER BY %s ASC LIMIT %d) AS T`, filterColumn, filterColumn, stream.Namespace(), stream.Name(), filterColumn, filterValue, filterColumn, batchSize)
}

Expand All @@ -68,11 +79,33 @@ func PostgresMinQuery(stream protocol.Stream, filterColumn string, filterValue i
}

// PostgresBuildSplitScanQuery builds a chunk scan query for PostgreSQL
func PostgresChunkScanQuery(stream protocol.Stream, filterColumn string, chunk types.Chunk) string {
condition := buildChunkCondition(filterColumn, chunk)
func PostgresChunkScanQuery(stream protocol.Stream, filterColumn string, chunk types.Chunk, filterColumnType types.DataType) string {
condition := buildPostgresChunkCondition(filterColumn, chunk, filterColumnType)
return fmt.Sprintf(`SELECT * FROM "%s"."%s" WHERE %s`, stream.Namespace(), stream.Name(), condition)
}

func buildPostgresChunkCondition(filterColumn string, chunk types.Chunk, filterColumnType types.DataType) string {
Comment thread
rajivharlalka marked this conversation as resolved.
Outdated
formatCondition := func(operator string, value interface{}) string {
if filterColumnType == types.String {
return fmt.Sprintf("%s::text %s $$%v$$", filterColumn, operator, value)
}
return fmt.Sprintf("%s %s %v", filterColumn, operator, value)
}

// Only Min condition
if chunk.Min != nil && chunk.Max == nil {
return formatCondition(">=", chunk.Min)
}

// Only Max condition
if chunk.Min == nil && chunk.Max != nil {
return formatCondition("<=", chunk.Max)
}

// Both Min and Max conditions
return fmt.Sprintf("%s AND %s", formatCondition(">=", chunk.Min), formatCondition("<=", chunk.Max))
}
Comment thread
rajivharlalka marked this conversation as resolved.
Outdated

// MySQL-Specific Queries

// MySQLWithoutState builds a chunk scan query for MySql
Expand Down Expand Up @@ -149,6 +182,7 @@ func MySQLTableColumnsQuery() string {
ORDER BY ORDINAL_POSITION
`
}

func WithIsolation(ctx context.Context, client *sql.DB, fn func(tx *sql.Tx) error) error {
tx, err := client.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
Expand Down