Skip to content

Commit 9446fbd

Browse files
authored
feat(ts): Add TS.GET command (#3142)
Part of #3048
1 parent 2e160e5 commit 9446fbd

7 files changed

Lines changed: 314 additions & 130 deletions

File tree

src/commands/cmd_timeseries.cc

Lines changed: 210 additions & 130 deletions
Large diffs are not rendered by default.

src/types/redis_timeseries.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -782,4 +782,41 @@ rocksdb::Status TimeSeries::Range(engine::Context &ctx, const Slice &user_key, c
782782
return rocksdb::Status::OK();
783783
}
784784

785+
rocksdb::Status TimeSeries::Get(engine::Context &ctx, const Slice &user_key, bool is_return_latest,
786+
std::vector<TSSample> *res) {
787+
res->clear();
788+
std::string ns_key = AppendNamespacePrefix(user_key);
789+
790+
TimeSeriesMetadata metadata(false);
791+
rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
792+
if (!s.ok()) {
793+
return s;
794+
}
795+
796+
// In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
797+
std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, "");
798+
std::string end_key = internalKeyFromChunkID(ns_key, metadata, TSSample::MAX_TIMESTAMP);
799+
std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
800+
801+
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
802+
rocksdb::Slice upper_bound(chunk_upper_bound);
803+
read_options.iterate_upper_bound = &upper_bound;
804+
rocksdb::Slice lower_bound(prefix);
805+
read_options.iterate_lower_bound = &lower_bound;
806+
807+
// Get the latest chunk
808+
auto iter = util::UniqueIterator(ctx, read_options);
809+
iter->SeekForPrev(end_key);
810+
if (!iter->Valid() || !iter->key().starts_with(prefix)) {
811+
return rocksdb::Status::OK();
812+
}
813+
auto chunk = CreateTSChunkFromData(iter->value());
814+
815+
if (is_return_latest) {
816+
// TODO: need process `latest` option
817+
}
818+
res->push_back(chunk->GetLatestSample(0));
819+
return rocksdb::Status::OK();
820+
}
821+
785822
} // namespace redis

src/types/redis_timeseries.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ class TimeSeries : public SubKeyScanner {
168168
rocksdb::Status Info(engine::Context &ctx, const Slice &user_key, TSInfoResult *res);
169169
rocksdb::Status Range(engine::Context &ctx, const Slice &user_key, const TSRangeOption &option,
170170
std::vector<TSSample> *res);
171+
rocksdb::Status Get(engine::Context &ctx, const Slice &user_key, bool is_return_latest, std::vector<TSSample> *res);
171172

172173
private:
173174
rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata *metadata);

src/types/timeseries.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,3 +527,10 @@ std::string UncompTSChunk::UpdateSampleValue(uint64_t ts, double value, bool is_
527527

528528
return new_buffer;
529529
}
530+
531+
TSSample UncompTSChunk::GetLatestSample(uint32_t idx) const {
532+
if (metadata_.count == 0 || idx >= metadata_.count) {
533+
unreachable();
534+
}
535+
return samples_[metadata_.count - 1 - idx];
536+
}

src/types/timeseries.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ class TSChunk {
184184
// Returns empty string if no changes
185185
virtual std::string UpdateSampleValue(uint64_t ts, double value, bool is_add_on) const = 0;
186186

187+
// Get idx-th latest sample, idx=0 means latest sample
188+
virtual TSSample GetLatestSample(uint32_t idx) const = 0;
189+
187190
protected:
188191
nonstd::span<const char> data_;
189192
MetaData metadata_;
@@ -202,6 +205,7 @@ class UncompTSChunk : public TSChunk {
202205
bool is_fix_split_mode) const override;
203206
std::string RemoveSamplesBetween(uint64_t from, uint64_t to) const override;
204207
std::string UpdateSampleValue(uint64_t ts, double value, bool is_add_on) const override;
208+
TSSample GetLatestSample(uint32_t idx) const override;
205209

206210
private:
207211
nonstd::span<const TSSample> samples_;

tests/cppunit/types/timeseries_test.cc

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,3 +332,35 @@ TEST_F(TimeSeriesTest, Range) {
332332
EXPECT_TRUE(s.ok());
333333
EXPECT_EQ(res.size(), 1);
334334
}
335+
336+
TEST_F(TimeSeriesTest, Get) {
337+
redis::TSCreateOption option;
338+
auto s = ts_db_->Create(*ctx_, key_, option);
339+
EXPECT_TRUE(s.ok());
340+
341+
std::vector<TSSample> res;
342+
// Test empty timeseries
343+
s = ts_db_->Get(*ctx_, key_, false, &res);
344+
EXPECT_TRUE(s.ok());
345+
EXPECT_EQ(res.size(), 0);
346+
347+
// Add multiple samples
348+
std::vector<TSSample> samples = {{1, 10}, {2, 20}, {3, 30}};
349+
std::vector<TSChunk::AddResultWithTS> results;
350+
results.resize(samples.size());
351+
352+
s = ts_db_->MAdd(*ctx_, key_, samples, &results);
353+
EXPECT_TRUE(s.ok());
354+
355+
// Test basic GET (returns latest sample)
356+
s = ts_db_->Get(*ctx_, key_, false, &res);
357+
EXPECT_TRUE(s.ok());
358+
EXPECT_EQ(res.size(), 1);
359+
EXPECT_EQ(res[0].ts, 3);
360+
EXPECT_EQ(res[0].v, 30);
361+
362+
// Test GET with empty timeseries
363+
std::vector<TSSample> empty_res;
364+
s = ts_db_->Get(*ctx_, "nonexistent_key", false, &empty_res);
365+
EXPECT_FALSE(s.ok());
366+
}

tests/gocase/unit/type/timeseries/timeseries_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,4 +408,27 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) {
408408
res = rdb.Do(ctx, "ts.range", key, "-", "+", "AGGREGATION", "MIN", 20, "COUNT", 1).Val().([]interface{})
409409
assert.Equal(t, 1, len(res))
410410
})
411+
412+
t.Run("TS.GET Basic", func(t *testing.T) {
413+
key := "test_get_key"
414+
require.NoError(t, rdb.Del(ctx, key).Err())
415+
require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())
416+
// Test GET on empty timeseries
417+
res := rdb.Do(ctx, "ts.get", key).Val().([]interface{})
418+
require.Equal(t, 0, len(res))
419+
420+
// Add samples
421+
require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", key, "1000", "12.3").Val())
422+
require.Equal(t, int64(2000), rdb.Do(ctx, "ts.add", key, "2000", "15.6").Val())
423+
424+
// Test basic GET
425+
res = rdb.Do(ctx, "ts.get", key).Val().([]interface{})
426+
require.Equal(t, 1, len(res))
427+
require.Equal(t, int64(2000), res[0].([]interface{})[0])
428+
require.Equal(t, 15.6, res[0].([]interface{})[1])
429+
430+
// Test GET on non-existent key
431+
_, err := rdb.Do(ctx, "ts.get", "nonexistent_key").Result()
432+
require.ErrorContains(t, err, "key does not exist")
433+
})
411434
}

0 commit comments

Comments
 (0)