diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index fb3117c800f6f5..6e3be567afcc2b 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -72,6 +72,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf if (!p._has_window_start && p._window.window_end.type == TAnalyticWindowBoundaryType::CURRENT_ROW) { _executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_unbounded_rows; + _streaming_mode = true; } else { _executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_sliding_rows; } @@ -218,7 +219,9 @@ bool AnalyticSinkLocalState::_get_next_for_unbounded_rows(int64_t batch_rows, // going on calculate, add up data, no need to reset state _execute_for_function(_partition_by_pose.start, _partition_by_pose.end, _current_row_position, _current_row_position + 1); - _insert_result_info(1); + int64_t pos = _current_row_position + _have_removed_rows - + _input_block_first_row_positions[_output_block_index]; + _insert_result_info(1, pos); _current_row_position++; if (_current_row_position - current_block_base_pos >= batch_rows) { return true; @@ -309,7 +312,7 @@ Status AnalyticSinkLocalState::_execute_impl() { while (_output_block_index < _input_blocks.size()) { { _get_partition_by_end(); - if (!_partition_by_pose.is_ended) { + if (!_partition_by_pose.is_ended && !_streaming_mode) { break; } _init_result_columns(); @@ -359,7 +362,7 @@ void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, int6 } } -void AnalyticSinkLocalState::_insert_result_info(int64_t real_deal_with_width) { +void AnalyticSinkLocalState::_insert_result_info(int64_t real_deal_with_width, int64_t pos) { // here is the core function, should not add timer for (size_t i = 0; i < _agg_functions_size; ++i) { for (size_t j = 0; j < real_deal_with_width; ++j) { @@ -375,9 +378,10 @@ void AnalyticSinkLocalState::_insert_result_info(int64_t real_deal_with_width) { &dst->get_nested_column()); } } else { - _agg_functions[i]->insert_result_info( - _fn_place_ptr + _offsets_of_aggregate_states[i], - _result_window_columns[i].get()); + // _agg_functions[i]->function()->insert_result_into( + _agg_functions[i]->function()->insert_result_into_pos( + _fn_place_ptr + _offsets_of_aggregate_states[i], *_result_window_columns[i], + pos); } } } @@ -410,6 +414,8 @@ void AnalyticSinkLocalState::_init_result_columns() { // return type create result column for (size_t i = 0; i < _agg_functions_size; ++i) { _result_window_columns[i] = _agg_functions[i]->data_type()->create_column(); + _result_window_columns[i]->reserve(_input_blocks[_output_block_index].rows()); + // _result_window_columns[i]->resize(_input_blocks[_output_block_index].rows()); } } } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 99e09372302fd3..b7fb2bccd22e50 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -87,7 +87,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState _result_window_columns; diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index d761d40c4c932c..e5aca22d6e946b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -184,6 +184,11 @@ class IAggregateFunction { /// Inserts results into a column. virtual void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const = 0; + virtual void insert_result_into_pos(ConstAggregateDataPtr __restrict place, IColumn& to, + size_t pos) const { + insert_result_into(place, to); + } + virtual void insert_result_into_vec(const std::vector& places, const size_t offset, IColumn& to, const size_t num_rows) const = 0; diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index c4c050f807954a..344e312df7777f 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -111,8 +111,8 @@ struct AggregateJavaUdafData { env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT); env->DeleteLocalRef(ctor_params_bytes); } - RETURN_ERROR_IF_EXC(env); RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj)); + RETURN_ERROR_IF_EXC(env); return Status::OK(); } @@ -207,9 +207,8 @@ struct AggregateJavaUdafData { jobject output_map = JniUtil::convert_to_java_map(env, output_params); long output_address = env->CallLongMethod(executor_obj, executor_get_value_id, place, output_map); - RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); env->DeleteLocalRef(output_map); - + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); return JniConnector::fill_block(&output_block, {0}, output_address); } diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h b/be/src/vec/aggregate_functions/aggregate_function_window.h index ca5b3bb07652bb..8b24910a2305eb 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window.h +++ b/be/src/vec/aggregate_functions/aggregate_function_window.h @@ -76,7 +76,15 @@ class WindowFunctionRowNumber final } void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const override { - assert_cast(to).get_data().push_back(data(place).count); + assert_cast(to).get_data().push_back( + doris::vectorized::WindowFunctionRowNumber::data(place).count); + } + + void insert_result_into_pos(ConstAggregateDataPtr __restrict place, IColumn& to, + size_t pos) const override { + auto& column = assert_cast(to); + // column.get_data()[pos] = (doris::vectorized::WindowFunctionRowNumber::data(place).count); + column.get_data().push_back(doris::vectorized::WindowFunctionRowNumber::data(place).count); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*) const override {} diff --git a/be/src/vec/exprs/table_function/udf_table_function.cpp b/be/src/vec/exprs/table_function/udf_table_function.cpp index 237b6806da675c..fc8db46a75888e 100644 --- a/be/src/vec/exprs/table_function/udf_table_function.cpp +++ b/be/src/vec/exprs/table_function/udf_table_function.cpp @@ -131,12 +131,11 @@ Status UDFTableFunction::process_init(Block* block, RuntimeState* state) { jobject output_map = JniUtil::convert_to_java_map(env, output_params); DCHECK(_jni_ctx != nullptr); DCHECK(_jni_ctx->executor != nullptr); - JNI_CALL_METHOD_CHECK_EXCEPTION( - long, output_address, env, - CallLongMethod(_jni_ctx->executor, _jni_ctx->executor_evaluate_id, input_map, - output_map)); + long output_address = env->CallLongMethod(_jni_ctx->executor, _jni_ctx->executor_evaluate_id, + input_map, output_map); env->DeleteLocalRef(input_map); env->DeleteLocalRef(output_map); + RETURN_ERROR_IF_EXC(env); RETURN_IF_ERROR(JniConnector::fill_block(block, {_result_column_idx}, output_address)); block->erase(_result_column_idx); if (!extract_column_array_info(*_array_result_column, _array_column_detail)) { diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp index bea8543d6f062d..738270b60e3c15 100644 --- a/be/src/vec/functions/function_java_udf.cpp +++ b/be/src/vec/functions/function_java_udf.cpp @@ -83,8 +83,8 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT); env->DeleteLocalRef(ctor_params_bytes); } - RETURN_ERROR_IF_EXC(env); RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor)); + RETURN_ERROR_IF_EXC(env); jni_ctx->open_successes = true; } return Status::OK(); @@ -115,10 +115,9 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block, jobject output_map = JniUtil::convert_to_java_map(env, output_params); long output_address = env->CallLongMethod(jni_ctx->executor, jni_ctx->executor_evaluate_id, input_map, output_map); - RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); env->DeleteLocalRef(input_map); env->DeleteLocalRef(output_map); - + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); return JniConnector::fill_block(&block, {result}, output_address); }