-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Expand file tree
/
Copy pathbeta_rowset_writer_v2.cpp
More file actions
117 lines (103 loc) · 4.34 KB
/
beta_rowset_writer_v2.cpp
File metadata and controls
117 lines (103 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/beta_rowset_writer_v2.h"
#include <assert.h>
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <stdio.h>
#include <ctime> // time
#include <filesystem>
#include <memory>
#include <sstream>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/stream_sink_file_writer.h"
#include "olap/data_dir.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
#include "util/slice.h"
#include "util/time.h"
#include "vec/core/block.h"
#include "vec/sink/load_stream_stub.h"
namespace doris {
using namespace ErrorCode;
BetaRowsetWriterV2::BetaRowsetWriterV2(const std::vector<std::shared_ptr<LoadStreamStub>>& streams)
: _segment_creator(_context, _seg_files, _idx_files), _streams(streams) {}
BetaRowsetWriterV2::~BetaRowsetWriterV2() = default;
Status BetaRowsetWriterV2::init(const RowsetWriterContext& rowset_writer_context) {
_context = rowset_writer_context;
_context.segment_collector = std::make_shared<SegmentCollectorT<BetaRowsetWriterV2>>(this);
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BetaRowsetWriterV2>>(this);
return Status::OK();
}
Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer,
FileType file_type) {
auto partition_id = _context.partition_id;
auto index_id = _context.index_id;
auto tablet_id = _context.tablet_id;
auto load_id = _context.load_id;
auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(_streams);
stream_writer->init(load_id, partition_id, index_id, tablet_id, segment_id, file_type);
file_writer = std::move(stream_writer);
return Status::OK();
}
Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
bool ok = false;
for (const auto& stream : _streams) {
auto st = stream->add_segment(_context.partition_id, _context.index_id, _context.tablet_id,
segment_id, segstat);
if (!st.ok()) {
LOG(WARNING) << "failed to add segment " << segment_id << " to stream "
<< stream->stream_id();
}
ok = ok || st.ok();
}
if (!ok) {
return Status::InternalError("failed to add segment {} of tablet {} to any replicas",
segment_id, _context.tablet_id);
}
return Status::OK();
}
Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
if (block->rows() == 0) {
return Status::OK();
}
{
SCOPED_RAW_TIMER(&_segment_writer_ns);
RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size));
}
// delete bitmap and seg compaction are done on the destination BE.
return Status::OK();
}
Status BetaRowsetWriterV2::flush_single_block(const vectorized::Block* block) {
return _segment_creator.flush_single_block(block);
}
} // namespace doris