Skip to content

Commit b9e5054

Browse files
[Fix][Connector-V2][ClickHouse] Fix ThreadLocal memory leak in ClickhouseCatalogUtil (#10264)
1 parent 316fe87 commit b9e5054

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public String getCreateTableSql(
5454
return super.getCreateTableSql(
5555
template, database, table, tableSchema, comment, optionsKey);
5656
} finally {
57-
pkColumns.clear();
57+
PRIMARY_KEY_COLUMNS.remove();
5858
}
5959
}
6060

seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,20 @@
1818
package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
1919

2020
import org.apache.seatunnel.api.table.catalog.Column;
21+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
22+
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
23+
import org.apache.seatunnel.api.table.catalog.TableSchema;
2124
import org.apache.seatunnel.api.table.type.BasicType;
2225
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
26+
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSinkOptions;
2327

2428
import org.junit.jupiter.api.Test;
2529

30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.Collections;
33+
import java.util.List;
34+
2635
import static org.junit.jupiter.api.Assertions.assertEquals;
2736
import static org.junit.jupiter.api.Assertions.assertThrows;
2837
import static org.mockito.Mockito.mock;
@@ -101,4 +110,74 @@ void throwsExceptionWhenColumnIsNull() {
101110
NullPointerException.class,
102111
() -> ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(null));
103112
}
113+
114+
@Test
115+
void testPrimaryKeyColumnShouldNotBeNullable() {
116+
// Test that ThreadLocal is properly cleared after getCreateTableSql call
117+
Column column = mock(Column.class);
118+
when(column.getName()).thenReturn("pk_column");
119+
when(column.getSinkType()).thenReturn("String");
120+
when(column.isNullable()).thenReturn(true);
121+
when(column.getComment()).thenReturn("");
122+
123+
List<Column> columns = new ArrayList<>();
124+
columns.add(column);
125+
126+
TableSchema tableSchema =
127+
TableSchema.builder()
128+
.primaryKey(PrimaryKey.of("", Collections.singletonList("pk_column")))
129+
.columns(columns)
130+
.build();
131+
132+
ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
133+
"CREATE TABLE `${database}`.`${table}` (${rowtype_fields})",
134+
"test_db",
135+
"test_table",
136+
tableSchema,
137+
null,
138+
ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
139+
140+
// After getCreateTableSql call, ThreadLocal should be cleared
141+
// so columnToConnectorType should treat it as NOT a primary key
142+
String result = ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
143+
assertEquals("`pk_column` Nullable(String) ", result);
144+
}
145+
146+
@Test
147+
void testPrimaryKeyColumnWithNullableShouldNotWrapInNullable() {
148+
// Test the actual scenario: primary key columns should NOT be wrapped in Nullable
149+
// because ClickHouse doesn't allow nullable columns in ORDER BY / PRIMARY KEY
150+
String template =
151+
"CREATE TABLE `${database}`.`${table}` (\n"
152+
+ " ${rowtype_primary_key},\n"
153+
+ " ${rowtype_fields}\n"
154+
+ ") ENGINE = MergeTree()\n"
155+
+ "ORDER BY (${rowtype_primary_key})";
156+
157+
List<Column> columns = new ArrayList<>();
158+
columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, (Long) null, true, null, ""));
159+
columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, (Long) null, true, null, ""));
160+
columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, (Long) null, true, null, ""));
161+
162+
TableSchema tableSchema =
163+
TableSchema.builder()
164+
.primaryKey(PrimaryKey.of("", Arrays.asList("id", "age")))
165+
.columns(columns)
166+
.build();
167+
168+
String sql =
169+
ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
170+
template,
171+
"test_db",
172+
"test_table",
173+
tableSchema,
174+
null,
175+
ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
176+
177+
// Primary key columns (id, age) should NOT be wrapped in Nullable
178+
assertEquals(true, sql.contains("`id` Int64 "));
179+
assertEquals(true, sql.contains("`age` Int32 "));
180+
// Non-primary key column (name) should be wrapped in Nullable
181+
assertEquals(true, sql.contains("`name` Nullable(String) "));
182+
}
104183
}

0 commit comments

Comments
 (0)