Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test](test) #48828

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
if (!p._has_window_start &&
p._window.window_end.type == TAnalyticWindowBoundaryType::CURRENT_ROW) {
_executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_unbounded_rows;
_streaming_mode = true;
} else {
_executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_sliding_rows;
}
Expand Down Expand Up @@ -218,7 +219,9 @@ bool AnalyticSinkLocalState::_get_next_for_unbounded_rows(int64_t batch_rows,
// going on calculate, add up data, no need to reset state
_execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
_current_row_position, _current_row_position + 1);
_insert_result_info(1);
int64_t pos = _current_row_position + _have_removed_rows -
_input_block_first_row_positions[_output_block_index];
_insert_result_info(1, pos);
_current_row_position++;
if (_current_row_position - current_block_base_pos >= batch_rows) {
return true;
Expand Down Expand Up @@ -309,7 +312,7 @@ Status AnalyticSinkLocalState::_execute_impl() {
while (_output_block_index < _input_blocks.size()) {
{
_get_partition_by_end();
if (!_partition_by_pose.is_ended) {
if (!_partition_by_pose.is_ended && !_streaming_mode) {
break;
}
_init_result_columns();
Expand Down Expand Up @@ -359,7 +362,7 @@ void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, int6
}
}

void AnalyticSinkLocalState::_insert_result_info(int64_t real_deal_with_width) {
void AnalyticSinkLocalState::_insert_result_info(int64_t real_deal_with_width, int64_t pos) {
// here is the core function, should not add timer
for (size_t i = 0; i < _agg_functions_size; ++i) {
for (size_t j = 0; j < real_deal_with_width; ++j) {
Expand All @@ -375,9 +378,10 @@ void AnalyticSinkLocalState::_insert_result_info(int64_t real_deal_with_width) {
&dst->get_nested_column());
}
} else {
_agg_functions[i]->insert_result_info(
_fn_place_ptr + _offsets_of_aggregate_states[i],
_result_window_columns[i].get());
// _agg_functions[i]->function()->insert_result_into(
_agg_functions[i]->function()->insert_result_into_pos(
_fn_place_ptr + _offsets_of_aggregate_states[i], *_result_window_columns[i],
pos);
}
}
}
Expand Down Expand Up @@ -410,6 +414,8 @@ void AnalyticSinkLocalState::_init_result_columns() {
// return type create result column
for (size_t i = 0; i < _agg_functions_size; ++i) {
_result_window_columns[i] = _agg_functions[i]->data_type()->create_column();
_result_window_columns[i]->reserve(_input_blocks[_output_block_index].rows());
// _result_window_columns[i]->resize(_input_blocks[_output_block_index].rows());
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
void _init_result_columns();
void _execute_for_function(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end);
void _insert_result_info(int64_t real_deal_with_width);
void _insert_result_info(int64_t real_deal_with_width, int64_t pos = 0);
void _output_current_block(vectorized::Block* block);
void _reset_state_for_next_partition();
void _refresh_buffer_and_dependency_state(vectorized::Block* block);
Expand Down Expand Up @@ -137,6 +137,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
executor _executor;

bool _current_window_empty = false;
bool _streaming_mode = false;
int64_t _current_row_position = 0;
int64_t _output_block_index = 0;
std::vector<vectorized::MutableColumnPtr> _result_window_columns;
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/aggregate_functions/aggregate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ class IAggregateFunction {
/// Inserts results into a column.
virtual void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const = 0;

virtual void insert_result_into_pos(ConstAggregateDataPtr __restrict place, IColumn& to,
size_t pos) const {
insert_result_into(place, to);
}

virtual void insert_result_into_vec(const std::vector<AggregateDataPtr>& places,
const size_t offset, IColumn& to,
const size_t num_rows) const = 0;
Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ struct AggregateJavaUdafData {
env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
env->DeleteLocalRef(ctor_params_bytes);
}
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
RETURN_ERROR_IF_EXC(env);
return Status::OK();
}

Expand Down Expand Up @@ -207,9 +207,8 @@ struct AggregateJavaUdafData {
jobject output_map = JniUtil::convert_to_java_map(env, output_params);
long output_address =
env->CallLongMethod(executor_obj, executor_get_value_id, place, output_map);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
env->DeleteLocalRef(output_map);

RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
return JniConnector::fill_block(&output_block, {0}, output_address);
}

Expand Down
10 changes: 9 additions & 1 deletion be/src/vec/aggregate_functions/aggregate_function_window.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ class WindowFunctionRowNumber final
}

void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const override {
assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).count);
assert_cast<ColumnInt64&, TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
doris::vectorized::WindowFunctionRowNumber::data(place).count);
}

void insert_result_into_pos(ConstAggregateDataPtr __restrict place, IColumn& to,
size_t pos) const override {
auto& column = assert_cast<ColumnInt64&, TypeCheckOnRelease::DISABLE>(to);
// column.get_data()[pos] = (doris::vectorized::WindowFunctionRowNumber::data(place).count);
column.get_data().push_back(doris::vectorized::WindowFunctionRowNumber::data(place).count);
}

void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*) const override {}
Expand Down
7 changes: 3 additions & 4 deletions be/src/vec/exprs/table_function/udf_table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,11 @@ Status UDFTableFunction::process_init(Block* block, RuntimeState* state) {
jobject output_map = JniUtil::convert_to_java_map(env, output_params);
DCHECK(_jni_ctx != nullptr);
DCHECK(_jni_ctx->executor != nullptr);
JNI_CALL_METHOD_CHECK_EXCEPTION(
long, output_address, env,
CallLongMethod(_jni_ctx->executor, _jni_ctx->executor_evaluate_id, input_map,
output_map));
long output_address = env->CallLongMethod(_jni_ctx->executor, _jni_ctx->executor_evaluate_id,
input_map, output_map);
env->DeleteLocalRef(input_map);
env->DeleteLocalRef(output_map);
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniConnector::fill_block(block, {_result_column_idx}, output_address));
block->erase(_result_column_idx);
if (!extract_column_array_info(*_array_result_column, _array_column_detail)) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/functions/function_java_udf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
env->DeleteLocalRef(ctor_params_bytes);
}
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor));
RETURN_ERROR_IF_EXC(env);
jni_ctx->open_successes = true;
}
return Status::OK();
Expand Down Expand Up @@ -115,10 +115,9 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
jobject output_map = JniUtil::convert_to_java_map(env, output_params);
long output_address = env->CallLongMethod(jni_ctx->executor, jni_ctx->executor_evaluate_id,
input_map, output_map);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
env->DeleteLocalRef(input_map);
env->DeleteLocalRef(output_map);

RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
return JniConnector::fill_block(&block, {result}, output_address);
}

Expand Down
Loading