Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,36 @@ var truncateFormat = regexp.MustCompile(`(?i)\sFORMAT\s+[^\s]+`)
var truncateValues = regexp.MustCompile(`\sVALUES\s.*$`)
var extractInsertColumnsMatch = regexp.MustCompile(`(?si)INSERT INTO .+\s\((?P<Columns>.+)\)$`)

func splitColumnsRespectingQuotes(columnsStr string) []string {
var columns []string
var current strings.Builder
inBacktick := false
inDoubleQuote := false

for i := 0; i < len(columnsStr); i++ {
c := columnsStr[i]

if c == '`' && !inDoubleQuote {
inBacktick = !inBacktick
current.WriteByte(c)
} else if c == '"' && !inBacktick {
inDoubleQuote = !inDoubleQuote
current.WriteByte(c)
} else if c == ',' && !inBacktick && !inDoubleQuote {
columns = append(columns, strings.TrimSpace(current.String()))
current.Reset()
} else {
current.WriteByte(c)
}
}

if current.Len() > 0 {
columns = append(columns, strings.TrimSpace(current.String()))
}

return columns
}

func extractNormalizedInsertQueryAndColumns(query string) (normalizedQuery string, tableName string, columns []string, err error) {
query = truncateFormat.ReplaceAllString(query, "")
query = truncateValues.ReplaceAllString(query, "")
Expand All @@ -27,11 +57,9 @@ func extractNormalizedInsertQueryAndColumns(query string) (normalizedQuery strin
columns = make([]string, 0)
matches = extractInsertColumnsMatch.FindStringSubmatch(matches[1])
if len(matches) == 2 {
columns = strings.Split(matches[1], ",")
for i := range columns {
// refers to https://clickhouse.com/docs/en/sql-reference/syntax#identifiers
// we can use identifiers with double quotes or backticks, for example: "id", `id`, but not both, like `"id"`.
columns[i] = strings.ReplaceAll(strings.Trim(strings.TrimSpace(columns[i]), "\""), "`", "")
rawColumns := splitColumnsRespectingQuotes(matches[1])
for _, col := range rawColumns {
columns = append(columns, strings.ReplaceAll(strings.Trim(strings.TrimSpace(col), "\""), "`", ""))
}
}

Expand Down
16 changes: 16 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,22 @@ INSERT INTO ` + "`_test_1345# $.ДБ`.`2. Таблица №2`" + ` (col1, col2)
expectedColumns: []string{"col1", "col2"},
expectedError: false,
},
{
name: "Insert with comma in backtick-quoted column name",
query: "INSERT INTO my_table (col1, `my_weird,col2`, col3) VALUES (1, 2, 3)",
expectedNormalizedQuery: "INSERT INTO my_table (col1, `my_weird,col2`, col3) FORMAT Native",
expectedTableName: "my_table",
expectedColumns: []string{"col1", "my_weird,col2", "col3"},
expectedError: false,
},
{
name: "Insert with comma in double-quoted column name",
query: `INSERT INTO my_table (col1, "my_weird,col2", col3) VALUES (1, 2, 3)`,
expectedNormalizedQuery: `INSERT INTO my_table (col1, "my_weird,col2", col3) FORMAT Native`,
expectedTableName: "my_table",
expectedColumns: []string{"col1", "my_weird,col2", "col3"},
expectedError: false,
},
}

for _, tc := range testCases {
Expand Down