1515#include " publish_version_manager.h"
1616
1717#include " agent/finish_task.h"
18- #include " agent/master_info.h"
1918#include " agent/task_signatures_manager.h"
2019#include " common/config.h"
21- #include " runtime/client_cache.h"
2220#include " storage/storage_engine.h"
2321#include " storage/tablet.h"
2422#include " storage/tablet_manager.h"
2523#include " util/cpu_info.h"
26- #include " util/thrift_rpc_helper.h"
2724
2825namespace starrocks {
2926const int MIN_FINISH_PUBLISH_WORKER_COUNT = 8 ;
@@ -85,8 +82,8 @@ bool PublishVersionManager::_all_task_applied(const TFinishTaskRequest& finish_t
8582 return all_task_applied;
8683}
8784
88- size_t PublishVersionManager::_left_task_applied (const TFinishTaskRequest& finish_task_request) {
89- size_t unapplied_tablet_num = 0 ;
85+ bool PublishVersionManager::_left_task_applied (const TFinishTaskRequest& finish_task_request) {
86+ bool applied = true ;
9087 int64_t signature = finish_task_request.signature ;
9188 std::set<std::pair<int64_t , int64_t >> unapplied_tablet;
9289 auto iter = _unapplied_tablet_by_txn.find (signature);
@@ -103,19 +100,19 @@ size_t PublishVersionManager::_left_task_applied(const TFinishTaskRequest& finis
103100 continue ;
104101 }
105102 if (tablet->max_readable_version () < request_version) {
106- unapplied_tablet_num++ ;
103+ applied = false ;
107104 unapplied_tablet.insert (std::make_pair (tablet_id, request_version));
108105 }
109106 VLOG (2 ) << " tablet: " << tablet->tablet_id () << " max_readable_version is "
110107 << tablet->max_readable_version () << " , request_version is " << request_version;
111108 }
112109 }
113- if (unapplied_tablet_num > 0 ) {
110+ if (!applied ) {
114111 iter->second .swap (unapplied_tablet);
115112 } else {
116113 _unapplied_tablet_by_txn.erase (signature);
117114 }
118- return unapplied_tablet_num ;
115+ return applied ;
119116}
120117
121118void PublishVersionManager::wait_publish_task_apply_finish (std::vector<TFinishTaskRequest> finish_task_requests) {
@@ -124,17 +121,14 @@ void PublishVersionManager::wait_publish_task_apply_finish(std::vector<TFinishTa
124121 if (_all_task_applied (finish_task_requests[i])) {
125122 _finish_task_requests[finish_task_requests[i].signature ] = std::move (finish_task_requests[i]);
126123 } else {
127- FinishTaskInfo info;
128- info.last_report_time = MonotonicMillis ();
129- info.not_report_tablet_num = finish_task_requests[i].tablet_publish_versions .size ();
130- info.request = std::move (finish_task_requests[i]);
131- _waitting_finish_task_requests[finish_task_requests[i].signature ] = std::move (info);
124+ _waitting_finish_task_requests[finish_task_requests[i].signature ] = std::move (finish_task_requests[i]);
132125 }
133126 }
134127 DCHECK (has_pending_task ());
135128}
136129
137- void PublishVersionManager::update_tablet_version (std::vector<TTabletVersionPair>& tablet_versions) {
130+ void PublishVersionManager::update_tablet_version (TFinishTaskRequest& finish_task_request) {
131+ auto & tablet_versions = finish_task_request.tablet_versions ;
138132 for (int32_t i = 0 ; i < tablet_versions.size (); i++) {
139133 int64_t tablet_id = tablet_versions[i].tablet_id ;
140134 TabletSharedPtr tablet = StorageEngine::instance ()->tablet_manager ()->get_tablet (tablet_id);
@@ -154,7 +148,7 @@ void PublishVersionManager::finish_publish_version_task() {
154148 // submit finish task
155149 st = _finish_publish_version_thread_pool->submit_func (
156150 [this , finish_request = std::move (finish_task_request)]() mutable {
157- update_tablet_version (finish_request. tablet_versions );
151+ update_tablet_version (finish_request);
158152#ifndef BE_TEST
159153 finish_task (finish_request);
160154#endif
@@ -166,12 +160,11 @@ void PublishVersionManager::finish_publish_version_task() {
166160 }
167161
168162 std::vector<int64_t > clear_txn;
169- for (auto & [signature, finish_task_info] : _waitting_finish_task_requests) {
170- size_t unapplied_tablet_num = _left_task_applied (finish_task_info.request );
171- if (unapplied_tablet_num == 0 ) {
163+ for (auto & [signature, finish_task_request] : _waitting_finish_task_requests) {
164+ if (_left_task_applied (finish_task_request)) {
172165 st = _finish_publish_version_thread_pool->submit_func (
173- [this , finish_request = std::move (finish_task_info. request )]() mutable {
174- update_tablet_version (finish_request. tablet_versions );
166+ [this , finish_request = std::move (finish_task_request )]() mutable {
167+ update_tablet_version (finish_request);
175168#ifndef BE_TEST
176169 finish_task (finish_request);
177170#endif
@@ -180,41 +173,6 @@ void PublishVersionManager::finish_publish_version_task() {
180173 if (st.ok ()) {
181174 erase_waitting_finish_task_signature.emplace_back (signature);
182175 }
183- } else {
184- size_t not_report_tablet_num = finish_task_info.not_report_tablet_num ;
185- if (unapplied_tablet_num < not_report_tablet_num &&
186- MonotonicMillis () - finish_task_info.last_report_time >
187- config::max_update_tablet_version_internal_ms) {
188- VLOG (2 ) << " unapplied_tablet_num: " << unapplied_tablet_num
189- << " , not_report_tablet_num: " << not_report_tablet_num
190- << " , report_internal_ms: " << MonotonicMillis () - finish_task_info.last_report_time
191- << " , allow_internla_ms: " << config::max_update_tablet_version_internal_ms;
192-
193- finish_task_info.not_report_tablet_num = unapplied_tablet_num;
194- finish_task_info.last_report_time = MonotonicMillis ();
195- TUpdateTabletVersionRequest update_request;
196- update_request.__set_backend (finish_task_info.request .backend );
197- update_request.__set_signature (signature);
198- update_request.__set_tablet_versions (finish_task_info.request .tablet_versions );
199- st = _finish_publish_version_thread_pool->submit_func (
200- [this , request = std::move (update_request)]() mutable {
201- update_tablet_version (request.tablet_versions );
202- TNetworkAddress master_addr = get_master_address ();
203- TUpdateTabletVersionResult result;
204- auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
205- master_addr.hostname , master_addr.port ,
206- [&request, &result](FrontendServiceConnection& client) {
207- client->updateTabletVersion (result, request);
208- });
209- if (!st.ok ()) {
210- LOG (WARNING) << " updateTabletVersion failed: " << st
211- << " , signature: " << request.signature ;
212- }
213- });
214- if (!st.ok ()) {
215- LOG (WARNING) << " submit report tablet version task failed" ;
216- }
217- }
218176 }
219177 }
220178 for (auto & signature : erase_finish_task_signature) {
0 commit comments