-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Expand file tree
/
Copy pathrowset_writer_context.h
More file actions
250 lines (205 loc) · 9.46 KB
/
rowset_writer_context.h
File metadata and controls
250 lines (205 loc) · 9.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <functional>
#include <optional>
#include <string_view>
#include <unordered_map>
#include "cloud/config.h"
#include "common/status.h"
#include "io/fs/encrypted_fs_factory.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/packed_file_system.h"
#include "olap/olap_define.h"
#include "olap/partial_update_info.h"
#include "olap/storage_policy.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
namespace doris {
class RowsetWriterContextBuilder;
using RowsetWriterContextBuilderSharedPtr = std::shared_ptr<RowsetWriterContextBuilder>;
class DataDir;
class Tablet;
class FileWriterCreator;
class SegmentCollector;
struct RowsetWriterContext {
RowsetWriterContext() : schema_lock(new std::mutex) {
load_id.set_hi(0);
load_id.set_lo(0);
}
RowsetId rowset_id;
int64_t tablet_id {0};
int32_t tablet_schema_hash {0};
int64_t index_id {0};
int64_t partition_id {0};
RowsetTypePB rowset_type {BETA_ROWSET};
TabletSchemaSPtr tablet_schema;
// PREPARED/COMMITTED for pending rowset
// VISIBLE for non-pending rowset
RowsetStatePB rowset_state {PREPARED};
// properties for non-pending rowset
Version version {0, 0};
// properties for pending rowset
int64_t txn_id {0};
int64_t txn_expiration {0}; // For cloud mode
PUniqueId load_id;
TabletUid tablet_uid {0, 0};
// indicate whether the data among segments is overlapping.
// default is OVERLAP_UNKNOWN.
SegmentsOverlapPB segments_overlap {OVERLAP_UNKNOWN};
// segment file use uint32 to represent row number, therefore the maximum is UINT32_MAX.
// the default is set to INT32_MAX to avoid overflow issue when casting from uint32_t to int.
// test cases can change this value to control flush timing
uint32_t max_rows_per_segment = INT32_MAX;
// not owned, point to the data dir of this rowset
// for checking disk capacity when write data to disk.
// ATTN: not support for RowsetConvertor.
// (because it hard to refactor, and RowsetConvertor will be deprecated in future)
DataDir* data_dir = nullptr;
int64_t newest_write_timestamp = -1;
bool enable_unique_key_merge_on_write = false;
// store column_unique_id to do index compaction
std::set<int32_t> columns_to_do_index_compaction;
DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
// need to figure out the sub type of compaction
ReaderType compaction_type = ReaderType::UNKNOWN;
BaseTabletSPtr tablet = nullptr;
std::shared_ptr<MowContext> mow_context;
std::shared_ptr<FileWriterCreator> file_writer_creator;
std::shared_ptr<SegmentCollector> segment_collector;
// memtable_on_sink_support_index_v2 = true, we will create SinkFileWriter to send inverted index file
bool memtable_on_sink_support_index_v2 = false;
/// begin file cache opts
bool write_file_cache = false;
bool is_hot_data = false;
uint64_t file_cache_ttl_sec = 0;
uint64_t approximate_bytes_to_write = 0;
/// end file cache opts
// segcompaction for this RowsetWriter, only enabled when importing data
bool enable_segcompaction = false;
std::shared_ptr<PartialUpdateInfo> partial_update_info;
bool is_transient_rowset_writer = false;
// Intent flag: caller can actively turn merge-file feature on/off for this rowset.
// This describes whether we *want* to try small-file merging.
bool allow_packed_file = true;
// Effective flag: whether this context actually ends up using MergeFileSystem for writes.
// This is decided inside fs() based on enable_merge_file plus other conditions
// (cloud mode, S3 filesystem, V1 inverted index, global config, etc.), and once
// set to true it remains stable even if config::enable_merge_file changes later.
mutable bool packed_file_active = false;
// Cached FileSystem instance to ensure consistency across multiple fs() calls.
// This prevents creating multiple MergeFileSystem instances and ensures
// packed_file_active flag remains consistent.
mutable io::FileSystemSPtr _cached_fs = nullptr;
// For collect segment statistics for compaction
std::vector<RowsetReaderSharedPtr> input_rs_readers;
// TODO(lihangyu) remove this lock
// In semi-structure senario tablet_schema will be updated concurrently,
// this lock need to be held when update.Use shared_ptr to avoid delete copy contructor
std::shared_ptr<std::mutex> schema_lock;
int64_t compaction_level = 0;
// For local rowset
std::string tablet_path;
// For remote rowset
std::optional<StorageResource> storage_resource;
std::optional<EncryptionAlgorithmPB> encrypt_algorithm;
std::string job_id;
bool is_local_rowset() const { return !storage_resource; }
std::string segment_path(int seg_id) const {
if (is_local_rowset()) {
return local_segment_path(tablet_path, rowset_id.to_string(), seg_id);
} else {
return storage_resource->remote_segment_path(tablet_id, rowset_id.to_string(), seg_id);
}
}
io::FileSystemSPtr fs() const {
// Return cached instance if available to ensure consistency across multiple calls
if (_cached_fs != nullptr) {
return _cached_fs;
}
auto fs = [this]() -> io::FileSystemSPtr {
if (is_local_rowset()) {
return io::global_local_filesystem();
} else {
return storage_resource->fs;
}
}();
bool is_s3_fs = fs->type() == io::FileSystemType::S3;
auto algorithm = encrypt_algorithm;
if (!algorithm.has_value()) {
#ifndef BE_TEST
constexpr std::string_view msg =
"RowsetWriterContext::determine_encryption is not called when creating this "
"RowsetWriterContext, it will result in encrypted rowsets left unencrypted";
auto st = Status::InternalError(msg);
LOG(WARNING) << st;
DCHECK(false) << st;
#else
algorithm = EncryptionAlgorithmPB::PLAINTEXT;
#endif
}
// Apply packed file system first for write path if enabled
// Create empty index_map for write path
// Index information will be populated after write completes
bool has_v1_inverted_index = tablet_schema != nullptr &&
tablet_schema->has_inverted_index() &&
tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1;
if (has_v1_inverted_index && allow_packed_file && config::enable_packed_file) {
static constexpr std::string_view kMsg =
"Disable packed file for V1 inverted index tablet to avoid missing index "
"metadata (temporary workaround)";
LOG(INFO) << kMsg << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id;
}
// Only enable merge file for S3 file system, not for HDFS or other remote file systems
packed_file_active = allow_packed_file && config::is_cloud_mode() &&
config::enable_packed_file && !has_v1_inverted_index && is_s3_fs;
if (packed_file_active) {
io::PackedAppendContext append_info;
append_info.tablet_id = tablet_id;
append_info.rowset_id = rowset_id.to_string();
append_info.txn_id = txn_id;
append_info.expiration_time = file_cache_ttl_sec > 0 && newest_write_timestamp > 0
? newest_write_timestamp + file_cache_ttl_sec
: 0;
fs = std::make_shared<io::PackedFileSystem>(fs, append_info);
}
// Then apply encryption on top
if (algorithm.has_value()) {
fs = io::make_file_system(fs, algorithm.value());
}
// Cache the result to ensure consistency across multiple calls
_cached_fs = fs;
return fs;
}
io::FileSystem& fs_ref() const { return *fs(); }
io::FileWriterOptions get_file_writer_options() {
io::FileWriterOptions opts {
.write_file_cache = write_file_cache,
.is_cold_data = is_hot_data,
.file_cache_expiration = file_cache_ttl_sec > 0 && newest_write_timestamp > 0
? newest_write_timestamp + file_cache_ttl_sec
: 0,
.approximate_bytes_to_write = approximate_bytes_to_write};
return opts;
}
};
} // namespace doris