2727#include < vector>
2828
2929#include " db_util.h"
30+ #include " string_util.h"
3031#include " time_util.h"
3132
3233namespace redis {
@@ -751,6 +752,14 @@ rocksdb::Status Stream::DestroyGroup(engine::Context &ctx, const Slice &stream_n
751752 return rocksdb::Status::InvalidArgument (errXGroupSubcommandRequiresKeyExist);
752753 }
753754
755+ std::string group_key = internalKeyFromGroupName (ns_key, metadata, group_name);
756+ std::string val;
757+ s = storage_->Get (ctx, ctx.GetReadOptions (), stream_cf_handle_, group_key, &val);
758+ if (s.IsNotFound ()) {
759+ return rocksdb::Status::OK ();
760+ }
761+ if (!s.ok ()) return s;
762+
754763 auto batch = storage_->GetWriteBatchBase ();
755764 WriteBatchLogData log_data (kRedisStream );
756765 s = batch->PutLogData (log_data.Encode ());
@@ -764,43 +773,22 @@ rocksdb::Status Stream::DestroyGroup(engine::Context &ctx, const Slice &stream_n
764773 PutFixed64 (&sub_key_prefix, group_name.size ());
765774 sub_key_prefix += group_name;
766775
767- std::string next_version_prefix_key =
768- InternalKey (ns_key, sub_key_prefix, metadata.version + 1 , storage_->IsSlotIdEncoded ()).Encode ();
769776 std::string prefix_key =
770777 InternalKey (ns_key, sub_key_prefix, metadata.version , storage_->IsSlotIdEncoded ()).Encode ();
778+ std::string end_key =
779+ InternalKey (ns_key, util::StringNext (sub_key_prefix), metadata.version , storage_->IsSlotIdEncoded ()).Encode ();
771780
772- rocksdb::ReadOptions read_options = ctx.DefaultScanOptions ();
773- rocksdb::Slice upper_bound (next_version_prefix_key);
774- read_options.iterate_upper_bound = &upper_bound;
775- rocksdb::Slice lower_bound (prefix_key);
776- read_options.iterate_lower_bound = &lower_bound;
777-
778- auto iter = util::UniqueIterator (ctx, read_options, stream_cf_handle_);
779- for (iter->SeekToFirst (); iter->Valid (); iter->Next ()) {
780- if (identifySubkeyType (iter->key ()) != type) {
781- continue ;
782- }
783- if (groupNameFromInternalKey (iter->key ()) != group_name) {
784- continue ;
785- }
786- s = batch->Delete (stream_cf_handle_, iter->key ());
787- if (!s.ok ()) return s;
788- *delete_cnt += 1 ;
789- }
790-
791- if (auto s = iter->status (); !s.ok ()) {
792- return s;
793- }
794- }
795-
796- if (*delete_cnt != 0 ) {
797- metadata.group_number -= 1 ;
798- std::string metadata_bytes;
799- metadata.Encode (&metadata_bytes);
800- s = batch->Put (metadata_cf_handle_, ns_key, metadata_bytes);
781+ s = batch->DeleteRange (stream_cf_handle_, prefix_key, end_key);
801782 if (!s.ok ()) return s;
802783 }
803784
785+ *delete_cnt = 1 ;
786+ metadata.group_number -= 1 ;
787+ std::string metadata_bytes;
788+ metadata.Encode (&metadata_bytes);
789+ s = batch->Put (metadata_cf_handle_, ns_key, metadata_bytes);
790+ if (!s.ok ()) return s;
791+
804792 return storage_->Write (ctx, storage_->DefaultWriteOptions (), batch->GetWriteBatch ());
805793}
806794
0 commit comments