@@ -19,14 +19,34 @@ func NextChunkEndQuery(stream protocol.Stream, column string, chunkSize int) str
1919 return fmt .Sprintf (`SELECT MAX(%[1]s) FROM (SELECT %[1]s FROM %[2]s.%[3]s WHERE %[1]s > ? ORDER BY %[1]s LIMIT %[4]d) AS subquery` , column , stream .Namespace (), stream .Name (), chunkSize )
2020}
2121
22- // buildChunkCondition builds the condition for a chunk
23- func buildChunkCondition (filterColumn string , chunk types.Chunk ) string {
24- if chunk .Min != nil && chunk .Max != nil {
25- return fmt .Sprintf ("%s >= %v AND %s <= %v" , filterColumn , chunk .Min , filterColumn , chunk .Max )
26- } else if chunk .Min != nil {
27- return fmt .Sprintf ("%s >= %v" , filterColumn , chunk .Min )
22+ // buildChunkCondition creates SQL conditions for filtering based on chunk boundaries
23+ // with formatting determined by the provided formatter function
24+ func buildChunkCondition (
25+ filterColumn string ,
26+ chunk types.Chunk ,
27+ formatter func (column string , operator string , value interface {}) string ,
28+ ) string {
29+ // If formatter is nil, use default formatting
30+ if formatter == nil {
31+ formatter = func (column string , operator string , value interface {}) string {
32+ return fmt .Sprintf ("%s %s %v" , column , operator , value )
33+ }
2834 }
29- return fmt .Sprintf ("%s <= %v" , filterColumn , chunk .Max )
35+
36+ // Only Min condition
37+ if chunk .Min != nil && chunk .Max == nil {
38+ return formatter (filterColumn , ">=" , chunk .Min )
39+ }
40+
41+ // Only Max condition
42+ if chunk .Min == nil && chunk .Max != nil {
43+ return formatter (filterColumn , "<=" , chunk .Max )
44+ }
45+
46+ // Both Min and Max conditions
47+ return fmt .Sprintf ("%s AND %s" ,
48+ formatter (filterColumn , ">=" , chunk .Min ),
49+ formatter (filterColumn , "<=" , chunk .Max ))
3050}
3151
3252// PostgreSQL-Specific Queries
@@ -80,37 +100,21 @@ func PostgresMinQuery(stream protocol.Stream, filterColumn string, filterValue i
80100
81101// PostgresBuildSplitScanQuery builds a chunk scan query for PostgreSQL
82102func PostgresChunkScanQuery (stream protocol.Stream , filterColumn string , chunk types.Chunk , filterColumnType types.DataType ) string {
83- condition := buildPostgresChunkCondition (filterColumn , chunk , filterColumnType )
84- return fmt .Sprintf (`SELECT * FROM "%s"."%s" WHERE %s` , stream .Namespace (), stream .Name (), condition )
85- }
86-
87- func buildPostgresChunkCondition (filterColumn string , chunk types.Chunk , filterColumnType types.DataType ) string {
88- formatCondition := func (operator string , value interface {}) string {
103+ postgresFormatter := func (column string , operator string , value interface {}) string {
89104 if filterColumnType == types .String {
90- return fmt .Sprintf ("%s::text %s $$%v$$" , filterColumn , operator , value )
105+ return fmt .Sprintf ("%s::text %s $$%v$$" , column , operator , value )
91106 }
92- return fmt .Sprintf ("%s %s %v" , filterColumn , operator , value )
93- }
94-
95- // Only Min condition
96- if chunk .Min != nil && chunk .Max == nil {
97- return formatCondition (">=" , chunk .Min )
98- }
99-
100- // Only Max condition
101- if chunk .Min == nil && chunk .Max != nil {
102- return formatCondition ("<=" , chunk .Max )
107+ return fmt .Sprintf ("%s %s %v" , column , operator , value )
103108 }
104-
105- // Both Min and Max conditions
106- return fmt .Sprintf ("%s AND %s" , formatCondition (">=" , chunk .Min ), formatCondition ("<=" , chunk .Max ))
109+ condition := buildChunkCondition (filterColumn , chunk , postgresFormatter )
110+ return fmt .Sprintf (`SELECT * FROM "%s"."%s" WHERE %s` , stream .Namespace (), stream .Name (), condition )
107111}
108112
109113// MySQL-Specific Queries
110114
111115// MySQLWithoutState builds a chunk scan query for MySql
112116func MysqlChunkScanQuery (stream protocol.Stream , filterColumn string , chunk types.Chunk ) string {
113- condition := buildChunkCondition (filterColumn , chunk )
117+ condition := buildChunkCondition (filterColumn , chunk , nil )
114118 return fmt .Sprintf ("SELECT * FROM `%s`.`%s` WHERE %s" , stream .Namespace (), stream .Name (), condition )
115119}
116120
0 commit comments