-
Notifications
You must be signed in to change notification settings - Fork 241
Expand file tree
/
Copy pathob_ivf_async_task.cpp
More file actions
159 lines (154 loc) · 6.54 KB
/
ob_ivf_async_task.cpp
File metadata and controls
159 lines (154 loc) · 6.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/*
* Copyright (c) 2025 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define USING_LOG_PREFIX SERVER
#include "ob_ivf_async_task.h"
#include "share/vector_index/ob_plugin_vector_index_service.h"
#include "share/ob_ls_id.h"
#include "share/vector_index/ob_vector_index_ivf_cache_util.h"
namespace oceanbase
{
namespace share
{
int ObIvfAsyncTask::delete_deprecated_cache(ObPluginVectorIndexService &vector_index_service)
{
int ret = OB_SUCCESS;
ObPluginVectorIndexMgr *ls_index_mgr = nullptr;
if (OB_FAIL(vector_index_service.get_ls_index_mgr_map().get_refactored(ls_id_, ls_index_mgr))) {
if (ret == OB_HASH_NOT_EXIST) {
// do not need delete
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get vector index mgr for ls", KR(ret), K(ls_id_));
}
} else if (OB_FAIL(ls_index_mgr->erase_ivf_cache_mgr(ctx_->task_status_.tablet_id_))) {
if (ret != OB_HASH_NOT_EXIST) {
LOG_WARN("failed to erase vector index ivf cache mgr",
K(ls_id_),
K(ctx_->task_status_.tablet_id_),
KR(ret));
} else { // already removed
ret = OB_SUCCESS;
}
}
return ret;
}
int ObIvfAsyncTask::write_cache(ObPluginVectorIndexService &vector_index_service)
{
int ret = OB_SUCCESS;
ObIvfCacheMgrGuard cache_guard;
ObIvfCacheMgr *cache_mgr = nullptr;
ObVectorIndexParam vec_param;
ObIvfCentCache *cent_cache = nullptr;
ObIvfAuxTableInfo *aux_table_info = nullptr;
ObSchemaGetterGuard schema_guard;
if (OB_ISNULL(ctx_)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("invalid null ctx_", K(ret), KP(ctx_));
} else if (OB_ISNULL(aux_table_info = reinterpret_cast<ObIvfAuxTableInfo *>(ctx_->extra_data_))) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("invalid null aux_table_info", K(ret), KP(ctx_->extra_data_));
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(
tenant_id_, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id_));
} else if (OB_FAIL(ObVectorIndexUtil::get_vector_index_param_with_dim(
schema_guard,
tenant_id_,
ctx_->task_status_.table_id_,
aux_table_info->data_table_id_,
ObVectorIndexType::VIT_IVF_INDEX,
vec_param))) {
LOG_WARN("fail to get vector index param with dim",
K(ret),
K(tenant_id_),
K(ctx_->task_status_.table_id_),
KPC(aux_table_info));
} else if (OB_FAIL(vector_index_service.acquire_ivf_cache_mgr_guard(ls_id_,
ctx_->task_status_.tablet_id_,
vec_param,
vec_param.dim_,
ctx_->task_status_.table_id_,
cache_guard))) {
LOG_WARN("fail to acquire ivf cache mgr with vec param",
K(ret),
K(ls_id_),
K(ctx_->task_status_),
K(vec_param));
} else if (OB_ISNULL(cache_mgr = cache_guard.get_ivf_cache_mgr())) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("invalid null cache mgr", K(ret));
} else if (OB_FAIL(cache_mgr->get_or_create_cache_node(IvfCacheType::IVF_CENTROID_CACHE,
cent_cache))) {
LOG_WARN("fail to get or create cache node", K(ret));
} else if (OB_FAIL(ObIvfCacheUtil::scan_and_write_ivf_cent_cache(
vector_index_service,
aux_table_info->centroid_table_id_,
aux_table_info->centroid_tablet_ids_[0],
*cent_cache,
false /* is_pq_centroid */))) {
LOG_WARN("fail to scan and write ivf cent cache", K(ret), K(aux_table_info));
} else if (aux_table_info->type_ == VIAT_IVF_PQ) {
ObIvfCentCache *pq_cent_cache = nullptr;
if (OB_FAIL(cache_mgr->get_or_create_cache_node(IvfCacheType::IVF_PQ_CENTROID_CACHE,
pq_cent_cache))) {
LOG_WARN("fail to get or create cache node", K(ret));
} else if (OB_FAIL(ObIvfCacheUtil::scan_and_write_ivf_cent_cache(
vector_index_service,
aux_table_info->pq_centroid_table_id_,
aux_table_info->pq_centroid_tablet_ids_[0],
*pq_cent_cache,
true /* is_pq_centroid */))) {
LOG_WARN("fail to scan and write ivf cent cache", K(ret), K(aux_table_info));
}
}
return ret;
}
int ObIvfAsyncTask::do_work()
{
int ret = OB_SUCCESS;
bool is_deprecated = false;
ObPluginVectorIndexService *vector_index_service = MTL(ObPluginVectorIndexService *);
DEBUG_SYNC(HANDLE_VECTOR_INDEX_ASYNC_TASK);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObVecIndexAsyncTask is not init", KR(ret));
} else if (OB_ISNULL(ctx_) || OB_ISNULL(ctx_->ls_) || OB_ISNULL(vector_index_service)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("unexpected nullptr", K(ret), KP(ctx_), KP(vector_index_service));
} else if (OB_ISNULL(vec_idx_mgr_)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("get invalid vector index ls mgr", KR(ret), K(tenant_id_), K(ls_id_));
} else if (ctx_->task_status_.task_type_ == OB_VECTOR_ASYNC_INDEX_IVF_CLEAN) {
if (OB_FAIL(delete_deprecated_cache(*vector_index_service))) {
LOG_WARN("fail to delete deprecated cache", K(ret));
}
} else if (ctx_->task_status_.task_type_ == OB_VECTOR_ASYNC_INDEX_IVF_LOAD) {
if (OB_FAIL(write_cache(*vector_index_service))) {
LOG_WARN("fail to write cache", K(ret));
}
} else {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid task type", K(ret), KPC(ctx_));
}
if (OB_NOT_NULL(ctx_)) {
common::ObSpinLockGuard ctx_guard(ctx_->lock_);
ctx_->task_status_.ret_code_ = ret;
ctx_->in_thread_pool_ = false;
}
LOG_INFO("end ivf do_work", K(ret), K(ctx_->task_status_.tablet_id_));
return ret;
}
} // namespace share
} // namespace oceanbase