Skip to content

Commit 1c6d046

Browse files
[BugFix] Fix cancel query for external low cardinality (backport StarRocks#58278) (backport StarRocks#58342) (StarRocks#59068)
Signed-off-by: zihe.liu <[email protected]> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: zihe.liu <[email protected]>
1 parent 8ba4f9e commit 1c6d046

File tree

12 files changed

+643
-7
lines changed

12 files changed

+643
-7
lines changed

be/src/exec/pipeline/fragment_context.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,12 @@ void FragmentContext::set_stream_load_contexts(const std::vector<StreamLoadConte
244244
}
245245

246246
// Note: this function should be thread safe
247-
void FragmentContext::cancel(const Status& status) {
247+
void FragmentContext::cancel(const Status& status, bool cancelled_by_fe) {
248248
if (!status.ok() && _runtime_state != nullptr && _runtime_state->query_ctx() != nullptr) {
249249
_runtime_state->query_ctx()->release_workgroup_token_once();
250+
if (cancelled_by_fe) {
251+
_runtime_state->query_ctx()->set_cancelled_by_fe();
252+
}
250253
}
251254

252255
_runtime_state->set_is_cancelled(true);

be/src/exec/pipeline/fragment_context.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class FragmentContext {
9090
return status == nullptr ? Status::OK() : *status;
9191
}
9292

93-
void cancel(const Status& status);
93+
void cancel(const Status& status, bool cancelled_by_fe = false);
9494

9595
void finish() { cancel(Status::OK()); }
9696

be/src/exec/pipeline/fragment_executor.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExec
9999
const auto& query_id = params.query_id;
100100
const auto& fragment_instance_id = request.fragment_instance_id();
101101
const auto& query_options = request.common().query_options;
102+
const auto& t_desc_tbl = request.common().desc_tbl;
102103

103104
auto&& existing_query_ctx = exec_env->query_context_mgr()->get(query_id);
104105
if (existing_query_ctx) {
@@ -108,7 +109,8 @@ Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExec
108109
}
109110
}
110111

111-
ASSIGN_OR_RETURN(_query_ctx, exec_env->query_context_mgr()->get_or_register(query_id));
112+
const bool query_ctx_should_exist = t_desc_tbl.__isset.is_cached && t_desc_tbl.is_cached;
113+
ASSIGN_OR_RETURN(_query_ctx, exec_env->query_context_mgr()->get_or_register(query_id, query_ctx_should_exist));
112114
_query_ctx->set_exec_env(exec_env);
113115
if (params.__isset.instances_number) {
114116
_query_ctx->set_total_fragments(params.instances_number);

be/src/exec/pipeline/query_context.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ void QueryContext::cancel(const Status& status, bool cancelled_by_fe) {
110110
_is_cancelled = true;
111111
if (cancelled_by_fe) {
112112
// only update when confirm cancelled from fe
113-
_cancelled_by_fe = true;
113+
set_cancelled_by_fe();
114114
}
115115
if (_cancelled_status.load() != nullptr) {
116116
return;
@@ -387,7 +387,8 @@ QueryContextManager::~QueryContextManager() {
387387
return query_ctx->get_cancelled_status(); \
388388
}
389389

390-
StatusOr<QueryContext*> QueryContextManager::get_or_register(const TUniqueId& query_id) {
390+
StatusOr<QueryContext*> QueryContextManager::get_or_register(const TUniqueId& query_id,
391+
bool return_error_if_not_exist) {
391392
size_t i = _slot_idx(query_id);
392393
auto& mutex = _mutexes[i];
393394
auto& context_map = _context_maps[i];
@@ -422,6 +423,10 @@ StatusOr<QueryContext*> QueryContextManager::get_or_register(const TUniqueId& qu
422423
}
423424
}
424425

426+
if (return_error_if_not_exist) {
427+
return Status::Cancelled("Query terminates prematurely");
428+
}
429+
425430
// finally, find no query contexts, so create a new one
426431
auto&& ctx = std::make_shared<QueryContext>();
427432
auto* ctx_raw_ptr = ctx.get();

be/src/exec/pipeline/query_context.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
154154
FragmentContextManager* fragment_mgr();
155155

156156
void cancel(const Status& status, bool cancelled_by_fe);
157+
void set_cancelled_by_fe() { _cancelled_by_fe = true; }
157158

158159
void set_is_runtime_filter_coordinator(bool flag) { _is_runtime_filter_coordinator = flag; }
159160

@@ -386,7 +387,7 @@ class QueryContextManager {
386387
QueryContextManager(size_t log2_num_slots);
387388
~QueryContextManager();
388389
Status init();
389-
StatusOr<QueryContext*> get_or_register(const TUniqueId& query_id);
390+
StatusOr<QueryContext*> get_or_register(const TUniqueId& query_id, bool return_error_if_not_exist = false);
390391
QueryContextPtr get(const TUniqueId& query_id, bool need_prepared = false);
391392
size_t size();
392393
bool remove(const TUniqueId& query_id);

be/src/formats/parquet/column_reader_factory.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
#include "formats/parquet/scalar_column_reader.h"
1919
#include "formats/parquet/schema.h"
2020
#include "formats/utils.h"
21+
#include "util/failpoint/fail_point.h"
2122

2223
namespace starrocks::parquet {
2324

25+
DEFINE_FAIL_POINT(parquet_reader_returns_global_dict_not_match_status);
26+
2427
StatusOr<ColumnReaderPtr> ColumnReaderFactory::create(const ColumnReaderOptions& opts, const ParquetField* field,
2528
const TypeDescriptor& col_type) {
2629
// We will only set a complex type in ParquetField
@@ -158,6 +161,11 @@ StatusOr<ColumnReaderPtr> ColumnReaderFactory::create(const ColumnReaderOptions&
158161

159162
StatusOr<ColumnReaderPtr> ColumnReaderFactory::create(ColumnReaderPtr ori_reader, const GlobalDictMap* dict,
160163
SlotId slot_id, int64_t num_rows) {
164+
FAIL_POINT_TRIGGER_EXECUTE(parquet_reader_returns_global_dict_not_match_status, {
165+
return Status::GlobalDictNotMatch(
166+
fmt::format("SlotId: {}, Not dict encoded and not low rows on global dict column. ", slot_id));
167+
});
168+
161169
if (ori_reader->get_column_parquet_field()->type == ColumnType::ARRAY) {
162170
ASSIGN_OR_RETURN(ColumnReaderPtr child_reader,
163171
ColumnReaderFactory::create(

be/src/service/internal_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ void PInternalServiceImplBase<T>::_cancel_plan_fragment(google::protobuf::RpcCon
564564
"FragmentContext already destroyed: query_id=$0, fragment_instance_id=$1", print_id(query_id),
565565
print_id(tid));
566566
} else {
567-
fragment_ctx->cancel(Status::Cancelled(reason_string));
567+
fragment_ctx->cancel(Status::Cancelled(reason_string), true);
568568
}
569569
}
570570
} else {

fe/fe-core/src/main/java/com/starrocks/qe/ExecuteExceptionHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.starrocks.sql.optimizer.statistics.IRelaxDictManager;
3939
import com.starrocks.sql.plan.ExecPlan;
4040
import com.starrocks.thrift.TExplainLevel;
41+
import com.starrocks.thrift.TStatusCode;
4142
import org.apache.logging.log4j.LogManager;
4243
import org.apache.logging.log4j.Logger;
4344

@@ -66,6 +67,12 @@ public static void handle(Exception e, RetryContext context) throws Exception {
6667
}
6768
}
6869

70+
public static boolean isRetryableStatus(TStatusCode statusCode) {
71+
return statusCode == TStatusCode.REMOTE_FILE_NOT_FOUND
72+
|| statusCode == TStatusCode.THRIFT_RPC_ERROR
73+
|| statusCode == TStatusCode.GLOBAL_DICT_NOT_MATCH;
74+
}
75+
6976
// If modifications are made to the partition files of a Hive table by user,
7077
// such as through "insert overwrite partition", the Frontend couldn't be aware of these changes.
7178
// As a result, queries may use the file information cached in the FE for execution.

fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Deployer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.starrocks.common.profile.Timer;
2424
import com.starrocks.common.profile.Tracers;
2525
import com.starrocks.qe.ConnectContext;
26+
import com.starrocks.qe.ExecuteExceptionHandler;
2627
import com.starrocks.qe.scheduler.dag.ExecutionDAG;
2728
import com.starrocks.qe.scheduler.dag.ExecutionFragment;
2829
import com.starrocks.qe.scheduler.dag.FragmentInstance;
@@ -262,6 +263,12 @@ private void waitForDeploymentCompletion(List<FragmentInstanceExecState> executi
262263
if (firstErrResult == null) {
263264
firstErrResult = res;
264265
firstErrExecution = execution;
266+
} else if (firstErrResult.getStatusCode() == TStatusCode.CANCELLED &&
267+
ExecuteExceptionHandler.isRetryableStatus(res.getStatusCode())) {
268+
// If the first error is cancelled and the subsequent error is retryable, we store the latter to give a chance
269+
// to retry this query.
270+
firstErrResult = res;
271+
firstErrExecution = execution;
265272
}
266273
if (TStatusCode.TIMEOUT == res.getStatusCode()) {
267274
break;

fe/fe-core/src/test/java/com/starrocks/qe/scheduler/StartSchedulingTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.common.collect.Maps;
1919
import com.starrocks.common.Reference;
2020
import com.starrocks.common.StarRocksException;
21+
import com.starrocks.connector.exception.GlobalDictNotMatchException;
2122
import com.starrocks.proto.PCancelPlanFragmentRequest;
2223
import com.starrocks.proto.PCancelPlanFragmentResult;
2324
import com.starrocks.proto.PExecPlanFragmentResult;
@@ -54,6 +55,7 @@
5455
import java.util.concurrent.TimeUnit;
5556
import java.util.concurrent.TimeoutException;
5657
import java.util.concurrent.atomic.AtomicBoolean;
58+
import java.util.concurrent.atomic.AtomicInteger;
5759

5860
import static com.starrocks.utframe.MockedBackend.MockPBackendService;
5961
import static org.assertj.core.api.Assertions.assertThat;
@@ -212,6 +214,57 @@ public Future<PExecPlanFragmentResult> execPlanFragmentAsync(PExecPlanFragmentRe
212214
Assert.assertTrue(scheduler.isDone());
213215
}
214216

217+
@Test
218+
public void testDeployFutureThrowRetryableException() throws Exception {
219+
AtomicInteger numDeployedFragments = new AtomicInteger(0);
220+
221+
setBackendService(address -> {
222+
223+
final int numFragments = numDeployedFragments.incrementAndGet();
224+
if (numFragments == 1) {
225+
return new MockPBackendService();
226+
}
227+
228+
if (numFragments == 2) {
229+
return new MockPBackendService() {
230+
@Override
231+
public Future<PExecPlanFragmentResult> execPlanFragmentAsync(PExecPlanFragmentRequest request) {
232+
return submit(() -> {
233+
PExecPlanFragmentResult result = new PExecPlanFragmentResult();
234+
StatusPB pStatus = new StatusPB();
235+
pStatus.statusCode = TStatusCode.CANCELLED.getValue();
236+
pStatus.errorMsgs = Collections.singletonList("test CANCELLED error message");
237+
result.status = pStatus;
238+
return result;
239+
});
240+
}
241+
};
242+
}
243+
244+
if (numFragments == 3) {
245+
return new MockPBackendService() {
246+
@Override
247+
public Future<PExecPlanFragmentResult> execPlanFragmentAsync(PExecPlanFragmentRequest request) {
248+
return submit(() -> {
249+
PExecPlanFragmentResult result = new PExecPlanFragmentResult();
250+
StatusPB pStatus = new StatusPB();
251+
pStatus.statusCode = TStatusCode.GLOBAL_DICT_NOT_MATCH.getValue();
252+
pStatus.errorMsgs = Collections.singletonList("test GLOBAL_DICT_NOT_MATCH error message");
253+
result.status = pStatus;
254+
return result;
255+
});
256+
}
257+
};
258+
}
259+
260+
return new MockPBackendService();
261+
});
262+
String sql =
263+
"select count(1) from lineitem UNION ALL select count(1) from lineitem UNION ALL select count(1) from lineitem";
264+
DefaultCoordinator scheduler = getScheduler(sql);
265+
Assert.assertThrows(GlobalDictNotMatchException.class, scheduler::exec);
266+
}
267+
215268
@Test
216269
public void testDeployFutureReturnErrorStatus() throws Exception {
217270
final int successDeployedFragmentCount = 3;

0 commit comments

Comments
 (0)