Skip to content

Commit 858d9ee

Browse files
authored
Merge branch 'unstable' into unstable
2 parents 6ab4b3e + 65db00a commit 858d9ee

6 files changed

Lines changed: 122 additions & 117 deletions

File tree

.github/workflows/kvrocks.yaml

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ jobs:
240240
RISCV_PATH: "/opt/riscv"
241241
steps:
242242
- name: Maximize build space
243-
if: ${{ matrix.riscv_toolchain }}
243+
if: ${{ startsWith(matrix.os, 'ubuntu') }}
244244
run: |
245245
# refer to https://github.com/easimon/maximize-build-space,
246246
# we take some ideas from this action but don't use it directly.
@@ -503,10 +503,9 @@ jobs:
503503
- name: ArchLinux
504504
image: archlinux:base
505505
compiler: gcc
506-
# FIXME: disable it due to "No space left on device" issue
507-
# - name: Rocky Linux 8
508-
# image: rockylinux:8
509-
# compiler: gcc
506+
- name: Rocky Linux 8
507+
image: rockylinux:8
508+
compiler: gcc
510509
- name: Rocky Linux 9
511510
image: rockylinux:9
512511
compiler: gcc
@@ -521,7 +520,16 @@ jobs:
521520
runs-on: ubuntu-22.04
522521
container:
523522
image: ${{ matrix.image }}
523+
volumes:
524+
- /usr/local/lib/android:/usr-local-lib-android
525+
- /usr/share/dotnet:/usr-share-dotnet
524526
steps:
527+
- name: Maximize build space
528+
run: |
529+
df -h
530+
rm -rf /usr-local-lib-android/*
531+
rm -rf /usr-share-dotnet/*
532+
df -h
525533
- name: Setup ArchLinux
526534
if: ${{ startsWith(matrix.image, 'archlinux') }}
527535
run: |

src/commands/cmd_tdigest.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ class CommandTDigestAdd : public Commander {
176176
std::vector<double> values_;
177177
};
178178

179-
template <bool reverse>
179+
template <bool Reverse>
180180
class TDigestRankCommand : public Commander {
181181
public:
182182
Status Parse(const std::vector<std::string> &args) override {
@@ -202,7 +202,16 @@ class TDigestRankCommand : public Commander {
202202
TDigest tdigest(srv->storage, conn->GetNamespace());
203203
std::vector<int> result;
204204
result.reserve(origin_inputs_.size());
205-
if (const auto s = tdigest.Rank(ctx, key_name_, unique_inputs_, reverse, result); !s.ok()) {
205+
206+
if (const auto s =
207+
[&]() {
208+
if constexpr (Reverse) {
209+
return tdigest.RevRank(ctx, key_name_, unique_inputs_, result);
210+
} else {
211+
return tdigest.Rank(ctx, key_name_, unique_inputs_, result);
212+
}
213+
}();
214+
!s.ok()) {
206215
if (s.IsNotFound()) {
207216
return {Status::RedisExecErr, errKeyNotFound};
208217
}

src/types/redis_tdigest.cc

Lines changed: 80 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -47,114 +47,81 @@
4747

4848
namespace redis {
4949

50+
namespace {
51+
template <bool Reverse, typename Container>
52+
inline decltype(auto) GetCbeginIter(const Container& centroids) {
53+
if constexpr (Reverse) {
54+
return centroids.crbegin();
55+
} else {
56+
return centroids.cbegin();
57+
}
58+
}
59+
60+
template <bool Reverse, typename Container>
61+
inline decltype(auto) GetCendIter(const Container& centroids) {
62+
if constexpr (Reverse) {
63+
return centroids.crend();
64+
} else {
65+
return centroids.cend();
66+
}
67+
}
68+
} // namespace
69+
5070
// TODO: It should be replaced by a iteration of the rocksdb iterator
71+
template <bool Reverse>
5172
class DummyCentroids {
5273
public:
53-
class BaseIterator {
54-
public:
55-
virtual ~BaseIterator() = default;
56-
virtual bool Next() = 0;
57-
virtual bool Prev() = 0;
58-
virtual bool Valid() const = 0;
59-
virtual std::unique_ptr<BaseIterator> Clone() const = 0;
60-
virtual StatusOr<Centroid> GetCentroid() const = 0;
61-
};
62-
6374
DummyCentroids(const TDigestMetadata& meta_data, const std::vector<Centroid>& centroids)
6475
: meta_data_(meta_data), centroids_(centroids) {}
65-
class Iterator : public BaseIterator {
76+
class Iterator {
6677
public:
67-
Iterator(std::vector<Centroid>::const_iterator&& iter, const std::vector<Centroid>& centroids)
68-
: iter_(iter), centroids_(centroids) {}
69-
std::unique_ptr<BaseIterator> Clone() const override {
70-
if (iter_ != centroids_.cend()) {
71-
return std::make_unique<Iterator>(std::next(centroids_.cbegin(), std::distance(centroids_.cbegin(), iter_)),
72-
centroids_);
78+
using IterType = std::conditional_t<Reverse, std::vector<Centroid>::const_reverse_iterator,
79+
std::vector<Centroid>::const_iterator>;
80+
Iterator(IterType iter, const std::vector<Centroid>& centroids) : iter_(iter), centroids_(centroids) {}
81+
std::unique_ptr<Iterator> Clone() const {
82+
if (iter_ != GetCendIter<Reverse>(centroids_)) {
83+
return std::make_unique<Iterator>(
84+
std::next(GetCbeginIter<Reverse>(centroids_), std::distance(GetCbeginIter<Reverse>(centroids_), iter_)),
85+
centroids_);
7386
}
74-
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
87+
return std::make_unique<Iterator>(GetCendIter<Reverse>(centroids_), centroids_);
7588
}
76-
bool Next() override {
89+
bool Next() {
7790
if (Valid()) {
7891
std::advance(iter_, 1);
7992
}
80-
return iter_ != centroids_.cend();
93+
return iter_ != GetCendIter<Reverse>(centroids_);
8194
}
8295

8396
// The Prev function can only be called for item is not cend,
8497
// because we must guarantee the iterator to be inside the valid range before iteration.
85-
bool Prev() override {
86-
if (Valid() && iter_ != centroids_.cbegin()) {
98+
bool Prev() {
99+
if (Valid() && iter_ != GetCendIter<Reverse>(centroids_)) {
87100
std::advance(iter_, -1);
88101
}
89102
return Valid();
90103
}
91-
bool Valid() const override { return iter_ != centroids_.cend(); }
92-
StatusOr<Centroid> GetCentroid() const override {
93-
if (iter_ == centroids_.cend()) {
104+
bool Valid() const { return iter_ != GetCendIter<Reverse>(centroids_); }
105+
StatusOr<Centroid> GetCentroid() const {
106+
if (iter_ == GetCendIter<Reverse>(centroids_)) {
94107
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
95108
}
96109
return *iter_;
97110
}
98111

99112
private:
100-
std::vector<Centroid>::const_iterator iter_;
113+
IterType iter_;
101114
const std::vector<Centroid>& centroids_;
102115
};
103116

104-
class ReverseIterator final : public BaseIterator {
105-
public:
106-
ReverseIterator(std::vector<Centroid>::const_reverse_iterator&& iter, const std::vector<Centroid>& centroids)
107-
: iter_(iter), centroids_(centroids) {}
108-
std::unique_ptr<BaseIterator> Clone() const override {
109-
if (iter_ != centroids_.crend()) {
110-
return std::make_unique<ReverseIterator>(
111-
std::next(centroids_.crbegin(), std::distance(centroids_.crbegin(), iter_)), centroids_);
112-
}
113-
return std::make_unique<ReverseIterator>(centroids_.crend(), centroids_);
114-
}
115-
bool Next() override {
116-
if (Valid()) {
117-
std::advance(iter_, 1);
118-
}
119-
return iter_ != centroids_.crend();
120-
}
121-
122-
bool Prev() override {
123-
if (Valid() && iter_ != centroids_.crbegin()) {
124-
std::advance(iter_, -1);
125-
}
126-
return Valid();
127-
}
128-
bool Valid() const override { return iter_ != centroids_.crend(); }
129-
StatusOr<Centroid> GetCentroid() const override {
130-
if (iter_ == centroids_.crend()) {
131-
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
132-
}
133-
return *iter_;
134-
}
135-
136-
private:
137-
std::vector<Centroid>::const_reverse_iterator iter_;
138-
const std::vector<Centroid>& centroids_;
139-
};
140-
141-
std::unique_ptr<BaseIterator> Begin(const bool reverse = false) const {
142-
if (reverse) {
143-
return std::make_unique<ReverseIterator>(centroids_.crbegin(), centroids_);
144-
}
145-
return std::make_unique<Iterator>(centroids_.cbegin(), centroids_);
117+
std::unique_ptr<Iterator> Begin() const {
118+
return std::make_unique<Iterator>(GetCbeginIter<Reverse>(centroids_), centroids_);
146119
}
147-
std::unique_ptr<BaseIterator> End(const bool reverse = false) const {
120+
std::unique_ptr<Iterator> End() const {
148121
if (centroids_.empty()) {
149-
if (reverse) {
150-
return std::make_unique<ReverseIterator>(centroids_.crend(), centroids_);
151-
}
152-
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
153-
}
154-
if (reverse) {
155-
return std::make_unique<ReverseIterator>(std::prev(centroids_.crend()), centroids_);
122+
return std::make_unique<Iterator>(GetCendIter<Reverse>(centroids_), centroids_);
156123
}
157-
return std::make_unique<Iterator>(std::prev(centroids_.cend()), centroids_);
124+
return std::make_unique<Iterator>(std::prev(GetCendIter<Reverse>(centroids_)), centroids_);
158125
}
159126
double TotalWeight() const { return static_cast<double>(meta_data_.total_weight); }
160127
double Min() const { return meta_data_.minimum; }
@@ -273,10 +240,9 @@ rocksdb::Status TDigest::mergeNodes(engine::Context& ctx, const std::string& ns_
273240
return rocksdb::Status::OK();
274241
}
275242

276-
rocksdb::Status TDigest::Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
277-
bool reverse, std::vector<int>& result) {
243+
rocksdb::Status TDigest::prepareRankData(engine::Context& ctx, const Slice& digest_name, TDigestMetadata& metadata,
244+
std::vector<Centroid>& centroids) {
278245
auto ns_key = AppendNamespacePrefix(digest_name);
279-
TDigestMetadata metadata;
280246
{
281247
LockGuard guard(storage_->GetLockManager(), ns_key);
282248

@@ -285,23 +251,51 @@ rocksdb::Status TDigest::Rank(engine::Context& ctx, const Slice& digest_name, co
285251
}
286252

287253
if (metadata.total_observations == 0) {
288-
result.resize(inputs.size(), -2);
289254
return rocksdb::Status::OK();
290255
}
291256

292257
if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
293258
return status;
294259
}
295260
}
261+
return dumpCentroids(ctx, ns_key, metadata, &centroids);
262+
}
296263

264+
rocksdb::Status TDigest::Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
265+
std::vector<int>& result) {
266+
TDigestMetadata metadata;
297267
std::vector<Centroid> centroids;
298-
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
268+
if (auto status = prepareRankData(ctx, digest_name, metadata, centroids); !status.ok()) {
269+
return status;
270+
}
271+
272+
if (metadata.total_observations == 0) {
273+
result.resize(inputs.size(), -2);
274+
return rocksdb::Status::OK();
275+
}
276+
277+
auto dump_centroids = DummyCentroids<false>(metadata, centroids);
278+
if (auto status = TDigestRank<false>(dump_centroids, inputs, result); !status) {
279+
return rocksdb::Status::InvalidArgument(status.Msg());
280+
}
281+
return rocksdb::Status::OK();
282+
}
283+
284+
rocksdb::Status TDigest::RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
285+
std::vector<int>& result) {
286+
TDigestMetadata metadata;
287+
std::vector<Centroid> centroids;
288+
if (auto status = prepareRankData(ctx, digest_name, metadata, centroids); !status.ok()) {
299289
return status;
300290
}
301291

302-
auto dump_centroids = DummyCentroids(metadata, centroids);
303-
auto status = TDigestRank(dump_centroids, inputs, reverse, result);
304-
if (!status) {
292+
if (metadata.total_observations == 0) {
293+
result.resize(inputs.size(), -2);
294+
return rocksdb::Status::OK();
295+
}
296+
297+
auto dump_centroids = DummyCentroids<true>(metadata, centroids);
298+
if (auto status = TDigestRank<true>(dump_centroids, inputs, result); !status) {
305299
return rocksdb::Status::InvalidArgument(status.Msg());
306300
}
307301
return rocksdb::Status::OK();
@@ -332,7 +326,7 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name
332326
return status;
333327
}
334328

335-
auto dump_centroids = DummyCentroids(metadata, centroids);
329+
auto dump_centroids = DummyCentroids<false>(metadata, centroids);
336330

337331
auto quantile_results = std::vector<double>();
338332
quantile_results.reserve(qs.size());

src/types/redis_tdigest.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,10 @@ class TDigest : public SubKeyScanner {
7777

7878
rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const std::vector<std::string>& source_digests,
7979
const TDigestMergeOptions& options);
80-
rocksdb::Status Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs, bool reverse,
80+
rocksdb::Status Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
8181
std::vector<int>& result);
82+
rocksdb::Status RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
83+
std::vector<int>& result);
8284
rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata);
8385

8486
private:
@@ -130,6 +132,7 @@ class TDigest : public SubKeyScanner {
130132
static std::string internalValueFromCentroid(const Centroid& centroid);
131133
rocksdb::Status decodeCentroidFromKeyValue(const rocksdb::Slice& key, const rocksdb::Slice& value,
132134
Centroid* centroid) const;
135+
rocksdb::Status prepareRankData(engine::Context& ctx, const Slice& digest_name, TDigestMetadata& metadata,
136+
std::vector<Centroid>& centroids);
133137
};
134-
135138
} // namespace redis

src/types/tdigest.h

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ struct DoubleComparator {
171171
bool operator()(const double& a, const double& b) const { return DoubleCompare(a, b) == -1; }
172172
};
173173

174-
template <typename TD, bool Reverse>
175-
inline Status TDigestRankImpl(TD&& td, const std::vector<double>& inputs, std::vector<int>& result) {
174+
template <bool Reverse, typename TD>
175+
inline Status TDigestRank(TD&& td, const std::vector<double>& inputs, std::vector<int>& result) {
176176
std::map<double, size_t, DoubleComparator> value_to_index;
177177
for (size_t i = 0; i < inputs.size(); ++i) {
178178
value_to_index[inputs[i]] = i;
@@ -211,7 +211,7 @@ inline Status TDigestRankImpl(TD&& td, const std::vector<double>& inputs, std::v
211211
}
212212
}
213213

214-
auto iter = td.Begin(Reverse);
214+
auto iter = td.Begin();
215215
double cumulative_weight = 0;
216216
while (iter->Valid() && !is_end()) {
217217
auto centroid = GET_OR_RET(iter->GetCentroid());
@@ -267,12 +267,3 @@ inline Status TDigestRankImpl(TD&& td, const std::vector<double>& inputs, std::v
267267
}
268268
return Status::OK();
269269
}
270-
271-
template <typename TD>
272-
inline Status TDigestRank(TD&& td, const std::vector<double>& inputs, bool reverse, std::vector<int>& result) {
273-
if (reverse) {
274-
return TDigestRankImpl<TD, true>(std::forward<TD>(td), inputs, result);
275-
} else {
276-
return TDigestRankImpl<TD, false>(std::forward<TD>(td), inputs, result);
277-
}
278-
}

0 commit comments

Comments
 (0)