@@ -35,6 +35,7 @@ constexpr const char *errOldTimestamp = "Timestamp is older than retention";
3535constexpr const char *errDupBlock =
3636 " Error at upsert, update is not supported when DUPLICATE_POLICY is set to BLOCK mode" ;
3737constexpr const char *errTSKeyNotFound = " the key is not a TSDB key" ;
38+ constexpr const char *errTSInvalidAlign = " unknown ALIGN parameter" ;
3839
3940using ChunkType = TimeSeriesMetadata::ChunkType;
4041using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
@@ -70,6 +71,13 @@ std::string FormatAddResultAsRedisReply(TSChunk::AddResultWithTS res) {
7071 return " " ;
7172}
7273
74+ std::string FormatTSSampleAsRedisReply (TSSample sample) {
75+ std::string res = redis::MultiLen (2 );
76+ res += redis::Integer (sample.ts );
77+ res += redis::Double (redis::RESP::v3, sample.v );
78+ return res;
79+ }
80+
7381std::string_view FormatChunkTypeAsRedisReply (ChunkType chunk_type) {
7482 auto it = kChunkTypeMap .find (chunk_type);
7583 if (it == kChunkTypeMap .end ()) {
@@ -285,10 +293,11 @@ class CommandTSInfo : public Commander {
285293 *output += redis::SimpleString (" rules" );
286294 std::vector<std::string> rules_str;
287295 rules_str.reserve (info.downstream_rules .size ());
288- for (const auto &rule : info.downstream_rules ) {
289- auto str = redis::Array ({redis::BulkString (rule.first ), redis::Integer (rule.second .bucket_duration ),
290- redis::SimpleString (FormatAggregatorTypeAsRedisReply (rule.second .aggregator )),
291- redis::Integer (rule.second .alignment )});
296+ for (const auto &[key, rule] : info.downstream_rules ) {
297+ const auto &aggregator = rule.aggregator ;
298+ auto str = redis::Array ({redis::BulkString (key), redis::Integer (aggregator.bucket_duration ),
299+ redis::SimpleString (FormatAggregatorTypeAsRedisReply (aggregator.type )),
300+ redis::Integer (aggregator.alignment )});
292301 rules_str.push_back (str);
293302 }
294303 *output += redis::Array (rules_str);
@@ -421,9 +430,235 @@ class CommandTSMAdd : public Commander {
421430 std::unordered_map<std::string_view, std::vector<size_t >> userkey_indexes_map_;
422431};
423432
433+ class CommandTSRangeBase : public KeywordCommandBase {
434+ public:
435+ CommandTSRangeBase (size_t skip_num, size_t tail_skip_num)
436+ : KeywordCommandBase(skip_num + 2 , tail_skip_num), skip_num_(skip_num) {
437+ registerHandler (" LATEST" , [this ](TSOptionsParser &parser) { return handleLatest (parser); });
438+ registerHandler (" FILTER_BY_TS" , [this ](TSOptionsParser &parser) { return handleFilterByTS (parser); });
439+ registerHandler (" FILTER_BY_VALUE" , [this ](TSOptionsParser &parser) { return handleFilterByValue (parser); });
440+ registerHandler (" COUNT" , [this ](TSOptionsParser &parser) { return handleCount (parser); });
441+ registerHandler (" ALIGN" , [this ](TSOptionsParser &parser) { return handleAlign (parser); });
442+ registerHandler (" AGGREGATION" , [this ](TSOptionsParser &parser) { return handleAggregation (parser); });
443+ registerHandler (" BUCKETTIMESTAMP" , [this ](TSOptionsParser &parser) { return handleBucketTimestamp (parser); });
444+ registerHandler (" EMPTY" , [this ](TSOptionsParser &parser) { return handleEmpty (parser); });
445+ }
446+
447+ Status Parse (const std::vector<std::string> &args) override {
448+ TSOptionsParser parser (std::next (args.begin (), static_cast <std::ptrdiff_t >(skip_num_)), args.end ());
449+ // Parse start timestamp
450+ auto start_ts = parser.TakeInt <uint64_t >();
451+ if (!start_ts.IsOK ()) {
452+ auto start_ts_str = parser.TakeStr ();
453+ if (!start_ts_str.IsOK () || start_ts_str.GetValue () != " -" ) {
454+ return {Status::RedisParseErr, " wrong fromTimestamp" };
455+ }
456+ // "-" means use default start timestamp: 0
457+ } else {
458+ is_start_explicit_set_ = true ;
459+ option_.start_ts = start_ts.GetValue ();
460+ }
461+
462+ // Parse end timestamp
463+ auto end_ts = parser.TakeInt <uint64_t >();
464+ if (!end_ts.IsOK ()) {
465+ auto end_ts_str = parser.TakeStr ();
466+ if (!end_ts_str.IsOK () || end_ts_str.GetValue () != " +" ) {
467+ return {Status::RedisParseErr, " wrong toTimestamp" };
468+ }
469+ // "+" means use default end timestamp: MAX_TIMESTAMP
470+ } else {
471+ is_end_explicit_set_ = true ;
472+ option_.end_ts = end_ts.GetValue ();
473+ }
474+
475+ auto s = KeywordCommandBase::Parse (args);
476+ if (!s.IsOK ()) return s;
477+ if (is_alignment_explicit_set_ && option_.aggregator .type == TSAggregatorType::NONE) {
478+ return {Status::RedisParseErr, " ALIGN parameter can only be used with AGGREGATION" };
479+ }
480+ return s;
481+ }
482+
483+ const TSRangeOption &GetRangeOption () const { return option_; }
484+
485+ private:
486+ TSRangeOption option_;
487+ size_t skip_num_;
488+ bool is_start_explicit_set_ = false ;
489+ bool is_end_explicit_set_ = false ;
490+ bool is_alignment_explicit_set_ = false ;
491+
492+ Status handleLatest ([[maybe_unused]] TSOptionsParser &parser) {
493+ option_.is_return_latest = true ;
494+ return Status::OK ();
495+ }
496+
497+ Status handleFilterByTS (TSOptionsParser &parser) {
498+ option_.filter_by_ts .clear ();
499+ while (parser.Good ()) {
500+ auto ts = parser.TakeInt <uint64_t >();
501+ if (!ts.IsOK ()) break ;
502+ option_.filter_by_ts .insert (ts.GetValue ());
503+ }
504+ return Status::OK ();
505+ }
506+
507+ Status handleFilterByValue (TSOptionsParser &parser) {
508+ auto min = parser.TakeFloat <double >();
509+ auto max = parser.TakeFloat <double >();
510+ if (!min.IsOK () || !max.IsOK ()) {
511+ return {Status::RedisParseErr, " Invalid min or max value" };
512+ }
513+ option_.filter_by_value = std::make_optional (std::make_pair (min.GetValue (), max.GetValue ()));
514+ return Status::OK ();
515+ }
516+
517+ Status handleCount (TSOptionsParser &parser) {
518+ auto count = parser.TakeInt <uint64_t >();
519+ if (!count.IsOK ()) {
520+ return {Status::RedisParseErr, " Couldn't parse COUNT" };
521+ }
522+ option_.count_limit = count.GetValue ();
523+ if (option_.count_limit == 0 ) {
524+ return {Status::RedisParseErr, " Invalid COUNT value" };
525+ }
526+ return Status::OK ();
527+ }
528+
529+ Status handleAlign (TSOptionsParser &parser) {
530+ auto align = parser.TakeInt <uint64_t >();
531+ if (align.IsOK ()) {
532+ is_alignment_explicit_set_ = true ;
533+ option_.aggregator .alignment = align.GetValue ();
534+ return Status::OK ();
535+ }
536+
537+ auto align_str = parser.TakeStr ();
538+ if (!align_str.IsOK ()) {
539+ return {Status::RedisParseErr, errTSInvalidAlign};
540+ }
541+
542+ const auto &value = align_str.GetValue ();
543+ if (value == " -" || value == " +" ) {
544+ bool is_explicit_set = value == " -" ? is_start_explicit_set_ : is_end_explicit_set_;
545+ auto err_msg = value == " -" ? " start alignment can only be used with explicit start timestamp"
546+ : " end alignment can only be used with explicit end timestamp" ;
547+
548+ if (!is_explicit_set) {
549+ return {Status::RedisParseErr, err_msg};
550+ }
551+
552+ option_.aggregator .alignment = value == " -" ? option_.start_ts : option_.end_ts ;
553+ } else {
554+ return {Status::RedisParseErr, errTSInvalidAlign};
555+ }
556+ is_alignment_explicit_set_ = true ;
557+ return Status::OK ();
558+ }
559+
560+ Status handleAggregation (TSOptionsParser &parser) {
561+ auto &type = option_.aggregator .type ;
562+ if (parser.EatEqICase (" AVG" )) {
563+ type = TSAggregatorType::AVG;
564+ } else if (parser.EatEqICase (" SUM" )) {
565+ type = TSAggregatorType::SUM;
566+ } else if (parser.EatEqICase (" MIN" )) {
567+ type = TSAggregatorType::MIN;
568+ } else if (parser.EatEqICase (" MAX" )) {
569+ type = TSAggregatorType::MAX;
570+ } else if (parser.EatEqICase (" RANGE" )) {
571+ type = TSAggregatorType::RANGE;
572+ } else if (parser.EatEqICase (" COUNT" )) {
573+ type = TSAggregatorType::COUNT;
574+ } else if (parser.EatEqICase (" FIRST" )) {
575+ type = TSAggregatorType::FIRST;
576+ } else if (parser.EatEqICase (" LAST" )) {
577+ type = TSAggregatorType::LAST;
578+ } else if (parser.EatEqICase (" STD.P" )) {
579+ type = TSAggregatorType::STD_P;
580+ } else if (parser.EatEqICase (" STD.S" )) {
581+ type = TSAggregatorType::STD_S;
582+ } else if (parser.EatEqICase (" VAR.P" )) {
583+ type = TSAggregatorType::VAR_P;
584+ } else if (parser.EatEqICase (" VAR.S" )) {
585+ type = TSAggregatorType::VAR_S;
586+ } else {
587+ return {Status::RedisParseErr, " Invalid aggregator type" };
588+ }
589+
590+ auto duration = parser.TakeInt <uint64_t >();
591+ if (!duration.IsOK ()) {
592+ return {Status::RedisParseErr, " Couldn't parse AGGREGATION" };
593+ }
594+ option_.aggregator .bucket_duration = duration.GetValue ();
595+ if (option_.aggregator .bucket_duration == 0 ) {
596+ return {Status::RedisParseErr, " bucketDuration must be greater than zero" };
597+ }
598+ return Status::OK ();
599+ }
600+
601+ Status handleBucketTimestamp (TSOptionsParser &parser) {
602+ if (option_.aggregator .type == TSAggregatorType::NONE) {
603+ return {Status::RedisParseErr, " BUCKETTIMESTAMP flag should be the 3rd or 4th flag after AGGREGATION flag" };
604+ }
605+ using BucketTimestampType = TSRangeOption::BucketTimestampType;
606+ if (parser.EatEqICase (" START" )) {
607+ option_.bucket_timestamp_type = BucketTimestampType::Start;
608+ } else if (parser.EatEqICase (" END" )) {
609+ option_.bucket_timestamp_type = BucketTimestampType::End;
610+ } else if (parser.EatEqICase (" MID" )) {
611+ option_.bucket_timestamp_type = BucketTimestampType::Mid;
612+ } else {
613+ return {Status::RedisParseErr, " unknown BUCKETTIMESTAMP parameter" };
614+ }
615+ return Status::OK ();
616+ }
617+
618+ Status handleEmpty ([[maybe_unused]] TSOptionsParser &parser) {
619+ if (option_.aggregator .type == TSAggregatorType::NONE) {
620+ return {Status::RedisParseErr, " EMPTY flag should be the 3rd or 5th flag after AGGREGATION flag" };
621+ }
622+ option_.is_return_empty = true ;
623+ return Status::OK ();
624+ }
625+ };
626+
627+ class CommandTSRange : public CommandTSRangeBase {
628+ public:
629+ CommandTSRange () : CommandTSRangeBase(2 , 0 ) {}
630+ Status Parse (const std::vector<std::string> &args) override {
631+ if (args.size () < 4 ) {
632+ return {Status::RedisParseErr, " wrong number of arguments for 'ts.range' command" };
633+ }
634+
635+ user_key_ = args[1 ];
636+
637+ return CommandTSRangeBase::Parse (args);
638+ }
639+
640+ Status Execute (engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
641+ auto timeseries_db = TimeSeries (srv->storage , conn->GetNamespace ());
642+ std::vector<TSSample> res;
643+ auto s = timeseries_db.Range (ctx, user_key_, GetRangeOption (), &res);
644+ if (!s.ok ()) return {Status::RedisExecErr, errKeyNotFound};
645+ std::vector<std::string> reply;
646+ reply.reserve (res.size ());
647+ for (auto &sample : res) {
648+ reply.push_back (FormatTSSampleAsRedisReply (sample));
649+ }
650+ *output = redis::Array (reply);
651+ return Status::OK ();
652+ }
653+
654+ private:
655+ std::string user_key_;
656+ };
657+
424658REDIS_REGISTER_COMMANDS (Timeseries, MakeCmdAttr<CommandTSCreate>(" ts.create" , -2 , " write" , 1 , 1 , 1 ),
425659 MakeCmdAttr<CommandTSAdd>(" ts.add" , -4 , " write" , 1 , 1 , 1 ),
426660 MakeCmdAttr<CommandTSMAdd>(" ts.madd" , -4 , " write" , 1 , -3 , 1 ),
661+ MakeCmdAttr<CommandTSRange>(" ts.range" , -4 , " read-only" , 1 , 1 , 1 ),
427662 MakeCmdAttr<CommandTSInfo>(" ts.info" , -2 , " read-only" , 1 , 1 , 1 ));
428663
429664} // namespace redis
0 commit comments