Skip to content

Commit a301056

Browse files
authored
Merge pull request #2868 from ClickHouse/06/08/26/kill_query_with_session
[jdbc-v2] Fix `Statement.cancel()` for query run with `session_id` parameters
2 parents 4574dec + df1a530 commit a301056

8 files changed

Lines changed: 212 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
## 0.10.9
1+
## 0.10.0
22

33
### Breaking Changes
44

@@ -39,6 +39,8 @@
3939

4040
### Bug Fixes
4141

42+
- **[jdbc-v2]** Fixed `Statement.cancel()` throwing `SESSION_IS_LOCKED` when the statement was running inside a ClickHouse session (e.g. via `clickhouse_setting_session_id`). The `KILL QUERY` request issued by `cancel()` now runs outside the session, so it no longer contends with the running query for the session lock. (https://github.com/ClickHouse/clickhouse-java/issues/2690)
43+
4244
- **[client-v2]** Fixed inconsistent use of `executionTimeout` parameter in `Client` component. The timeout was previously set in milliseconds but mistakenly retrieved and used in seconds in some places. Now it correctly uses milliseconds consistently. (https://github.com/ClickHouse/clickhouse-java/issues/2358)
4345

4446
## 0.9.8

client-v2/src/main/java/com/clickhouse/client/api/command/CommandSettings.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,10 @@ public CommandSettings use(Session session) {
3333
super.use(session);
3434
return this;
3535
}
36+
37+
@Override
38+
public CommandSettings clearSession() {
39+
super.clearSession();
40+
return this;
41+
}
3642
}

client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ public InsertSettings use(Session session) {
145145
return this;
146146
}
147147

148+
public InsertSettings clearSession() {
149+
settings.clearSession();
150+
return this;
151+
}
152+
148153
public int getInputStreamCopyBufferSize() {
149154
return this.inputStreamCopyBufferSize;
150155
}

client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ public CommonSettings use(Session session) {
139139
return this;
140140
}
141141

142+
public void clearSession() {
143+
resetOption(ClientConfigProperties.serverSetting(ClickHouseHttpProto.QPARAM_SESSION_ID));
144+
resetOption(ClientConfigProperties.serverSetting(ClickHouseHttpProto.QPARAM_SESSION_CHECK));
145+
resetOption(ClientConfigProperties.serverSetting(ClickHouseHttpProto.QPARAM_SESSION_TIMEOUT));
146+
// Do not clean `session_timezone` setting because it is not related to session management and used to
147+
// set timezone for consequent queries in some multi-user applications.
148+
}
149+
142150
/**
143151
* Operation id. Used internally to register new operation.
144152
* Should not be called directly.

client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ public QuerySettings use(Session session) {
139139
return this;
140140
}
141141

142+
public QuerySettings clearSession() {
143+
settings.clearSession();
144+
return this;
145+
}
146+
142147
/**
143148
* Read buffer is used for reading data from a server. Size is in bytes.
144149
* Minimal value is {@value MINIMAL_READ_BUFFER_SIZE} bytes.

client-v2/src/test/java/com/clickhouse/client/SettingsTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,32 @@ void testQuerySettingsSpecific() throws Exception {
143143
Assert.assertEquals(settings.getSessionTimeout().intValue(), 45);
144144
Assert.assertEquals(settings.getSessionTimezone(), "Europe/Berlin");
145145
}
146+
147+
{
148+
final QuerySettings settings = new QuerySettings();
149+
settings.setSessionId("session-clear-1");
150+
settings.setSessionCheck(true);
151+
settings.setSessionTimeout(60);
152+
settings.setSessionTimezone("America/New_York");
153+
Assert.assertNotNull(settings.getSessionId());
154+
Assert.assertNotNull(settings.getSessionCheck());
155+
Assert.assertNotNull(settings.getSessionTimeout());
156+
Assert.assertNotNull(settings.getSessionTimezone());
157+
158+
settings.clearSession();
159+
160+
Assert.assertNull(settings.getSessionId(), "clearSession() must remove session_id");
161+
Assert.assertNull(settings.getSessionCheck(), "clearSession() must remove session_check");
162+
Assert.assertNull(settings.getSessionTimeout(), "clearSession() must remove session_timeout");
163+
// session_timezone is not session-management state; it is preserved across clearSession().
164+
Assert.assertEquals(settings.getSessionTimezone(), "America/New_York",
165+
"clearSession() must not remove session_timezone");
166+
167+
// Non-session settings are unaffected.
168+
settings.setDatabase("db1");
169+
settings.clearSession();
170+
Assert.assertEquals(settings.getDatabase(), "db1");
171+
}
146172
}
147173

148174
@Test
@@ -232,5 +258,31 @@ public void testInsertSettingsSpecific() throws Exception {
232258
Assert.assertEquals(settings.getSessionTimeout().intValue(), 50);
233259
Assert.assertEquals(settings.getSessionTimezone(), "Europe/Paris");
234260
}
261+
262+
{
263+
final InsertSettings settings = new InsertSettings();
264+
settings.setSessionId("session-clear-2");
265+
settings.setSessionCheck(true);
266+
settings.setSessionTimeout(90);
267+
settings.setSessionTimezone("Asia/Tokyo");
268+
Assert.assertNotNull(settings.getSessionId());
269+
Assert.assertNotNull(settings.getSessionCheck());
270+
Assert.assertNotNull(settings.getSessionTimeout());
271+
Assert.assertNotNull(settings.getSessionTimezone());
272+
273+
settings.clearSession();
274+
275+
Assert.assertNull(settings.getSessionId(), "clearSession() must remove session_id");
276+
Assert.assertNull(settings.getSessionCheck(), "clearSession() must remove session_check");
277+
Assert.assertNull(settings.getSessionTimeout(), "clearSession() must remove session_timeout");
278+
// session_timezone is not session-management state; it is preserved across clearSession().
279+
Assert.assertEquals(settings.getSessionTimezone(), "Asia/Tokyo",
280+
"clearSession() must not remove session_timezone");
281+
282+
// Non-session settings are unaffected.
283+
settings.setDatabase("db2");
284+
settings.clearSession();
285+
Assert.assertEquals(settings.getDatabase(), "db2");
286+
}
235287
}
236288
}

jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,12 @@ public void cancel() throws SQLException {
333333
return;
334334
}
335335

336+
// KILL QUERY must not run inside the same session as the query being canceled otherwise it will
337+
// cause "Session is locked by a concurrent client" (SESSION_IS_LOCKED) error.
338+
QuerySettings cancelSettings = QuerySettings.merge(getLocalSettings(), new QuerySettings()).clearSession();
336339
try (QueryResponse response = connection.getClient().query(String.format("KILL QUERY%sWHERE query_id = '%s'",
337340
connection.onCluster ? " ON CLUSTER " + SQLUtils.enquoteIdentifier(connection.cluster, true) + ' ' : ' ',
338-
lastQueryId), connection.getDefaultQuerySettings()).get()){
341+
lastQueryId), cancelSettings).get()){
339342
LOG.debug("Query {} was killed by {}", lastQueryId, response.getQueryId());
340343
} catch (Exception e) {
341344
throw new SQLException(e);

jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.Properties;
3030
import java.util.UUID;
3131
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicReference;
3234

3335
import static org.testng.Assert.assertEquals;
3436
import static org.testng.Assert.assertFalse;
@@ -711,6 +713,133 @@ public void testConcurrentCancel() throws Exception {
711713
}
712714
}
713715

716+
/**
717+
* Waits until the given query id appears in {@code system.processes}, so we know the operation has actually
718+
* started on the server before attempting to cancel it. Uses a dedicated connection (no session) to observe.
719+
*/
720+
private boolean waitForQueryToStart(String queryId, int timeoutSeconds) throws Exception {
721+
if (queryId == null) {
722+
return false;
723+
}
724+
long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds);
725+
while (System.currentTimeMillis() < deadline) {
726+
try (Connection conn = getJdbcConnection();
727+
Statement stmt = conn.createStatement();
728+
ResultSet rs = stmt.executeQuery("SELECT count() FROM system.processes WHERE query_id = '" + queryId + "'")) {
729+
730+
if (rs.next() && rs.getLong(1) > 0) {
731+
return true;
732+
}
733+
}
734+
Thread.sleep(200);
735+
}
736+
return false;
737+
}
738+
739+
private String waitForQueryId(StatementImpl stmt, int timeoutSeconds) throws Exception {
740+
long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds);
741+
while (System.currentTimeMillis() < deadline) {
742+
String queryId = stmt.getLastQueryId();
743+
if (queryId != null && !queryId.isEmpty()) {
744+
return queryId;
745+
}
746+
Thread.sleep(50);
747+
}
748+
return null;
749+
}
750+
751+
@Test(groups = {"integration"})
752+
public void testCancelQueryWithSession() throws Exception {
753+
if (isCloud()) {
754+
throw new SkipException("Cloud + HTTP doesn't work well. Enough to test locally");
755+
}
756+
757+
// Regression test for #2690: cancelling a query that runs inside a session must not fail with
758+
// "Session is locked by a concurrent client" (SESSION_IS_LOCKED). The KILL QUERY request issued by
759+
// cancel() must not carry the session id of the query being cancelled.
760+
String sessionId = "test-session-" + UUID.randomUUID();
761+
try (Connection conn = getJdbcConnection()) {
762+
try (StatementImpl stmt = (StatementImpl) conn.createStatement()) {
763+
stmt.getLocalSettings().setSessionId(sessionId);
764+
stmt.setQueryTimeout(30); // safety net so a failed cancel cannot hang the test
765+
766+
final AtomicReference<Throwable> threadError = new AtomicReference<>();
767+
final CountDownLatch started = new CountDownLatch(1);
768+
Thread worker = new Thread(() -> {
769+
started.countDown();
770+
// Long-running query that only completes when killed.
771+
try (ResultSet rs = stmt.executeQuery("SELECT count() FROM system.numbers_mt")) {
772+
rs.next();
773+
} catch (Throwable t) {
774+
System.out.println("Error: " + t.getMessage());
775+
threadError.set(t);
776+
}
777+
});
778+
worker.start();
779+
started.await();
780+
781+
String queryId = waitForQueryId(stmt, 15);
782+
assertNotNull(queryId, "Query id was not assigned in time");
783+
assertTrue(waitForQueryToStart(queryId, 15), "Query did not start on the server in time");
784+
785+
// Cancel from the main thread - must not throw SESSION_IS_LOCKED.
786+
stmt.cancel();
787+
788+
worker.join(TimeUnit.SECONDS.toMillis(20));
789+
assertFalse(worker.isAlive(), "Query was not cancelled and is still running");
790+
}
791+
}
792+
}
793+
794+
@Test(groups = {"integration"})
795+
public void testCancelInsertWithSession() throws Exception {
796+
if (isCloud()) {
797+
throw new SkipException("Cloud + HTTP doesn't work well. Enough to test locally");
798+
}
799+
// Regression test for #2690 covering a long-running INSERT executed inside a session.
800+
String tableName = getDatabase() + ".cancel_insert_with_session";
801+
String sessionId = "test-session-" + UUID.randomUUID();
802+
try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) {
803+
try (Statement setup = conn.createStatement()) {
804+
setup.execute("DROP TABLE IF EXISTS " + tableName);
805+
setup.execute("CREATE TABLE " + tableName + " (num UInt64) ENGINE = MergeTree ORDER BY ()");
806+
}
807+
808+
try (StatementImpl stmt = (StatementImpl) conn.createStatement()) {
809+
stmt.getLocalSettings().setSessionId(sessionId);
810+
stmt.setQueryTimeout(30); // safety net so a failed cancel cannot hang the test
811+
812+
final AtomicReference<Throwable> threadError = new AtomicReference<>();
813+
final CountDownLatch started = new CountDownLatch(1);
814+
Thread worker = new Thread(() -> {
815+
started.countDown();
816+
// Long-running insert that only completes when killed.
817+
try {
818+
stmt.executeUpdate("INSERT INTO " + tableName + " SELECT number FROM system.numbers_mt");
819+
} catch (Throwable t) {
820+
threadError.set(t);
821+
}
822+
});
823+
worker.start();
824+
started.await();
825+
826+
String queryId = waitForQueryId(stmt, 15);
827+
assertNotNull(queryId, "Query id was not assigned in time");
828+
assertTrue(waitForQueryToStart(queryId, 15), "Insert did not start on the server in time");
829+
830+
// Cancel from the main thread - must not throw SESSION_IS_LOCKED.
831+
stmt.cancel();
832+
833+
worker.join(TimeUnit.SECONDS.toMillis(20));
834+
assertFalse(worker.isAlive(), "Insert was not cancelled and is still running");
835+
} finally {
836+
try (Statement cleanup = conn.createStatement()) {
837+
cleanup.execute("DROP TABLE IF EXISTS " + tableName);
838+
}
839+
}
840+
}
841+
}
842+
714843
@Test(groups = {"integration"})
715844
public void testTextFormatInResponse() throws Exception {
716845
try (Connection conn = getJdbcConnection();

0 commit comments

Comments
 (0)