|
26 | 26 | #include <cctype> |
27 | 27 | #include <cmath> |
28 | 28 | #include <random> |
| 29 | +#include <thread> |
29 | 30 | #include <utility> |
30 | 31 |
|
31 | 32 | #include "db_util.h" |
| 33 | +#include "logging.h" |
32 | 34 | #include "parse_util.h" |
33 | 35 | #include "sample_helper.h" |
34 | 36 |
|
@@ -66,6 +68,7 @@ rocksdb::Status Hash::Get(engine::Context &ctx, const Slice &user_key, const Sli |
66 | 68 | return rocksdb::Status::Corruption("failed to decode hash field value"); |
67 | 69 | } |
68 | 70 | if (field_value.IsExpired()) { |
| 71 | + AsyncRepairHash(ns_key, field, metadata); |
69 | 72 | return rocksdb::Status::NotFound(); |
70 | 73 | } |
71 | 74 | *value = field_value.value.ToString(); |
@@ -245,6 +248,7 @@ rocksdb::Status Hash::MGet(engine::Context &ctx, const Slice &user_key, const st |
245 | 248 | return rocksdb::Status::Corruption("failed to decode hash field value"); |
246 | 249 | } |
247 | 250 | if (field_value.IsExpired()) { |
| 251 | + AsyncRepairHash(ns_key, fields[i], metadata); |
248 | 252 | values->emplace_back(""); |
249 | 253 | statuses->emplace_back(rocksdb::Status::NotFound()); |
250 | 254 | } else { |
@@ -394,16 +398,17 @@ rocksdb::Status Hash::RangeByLex(engine::Context &ctx, const Slice &user_key, co |
394 | 398 | } |
395 | 399 | int64_t pos = 0; |
396 | 400 | for (; iter->Valid() && iter->key().starts_with(prefix_key); (!spec.reversed ? iter->Next() : iter->Prev())) { |
| 401 | + InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); |
397 | 402 | // Decode and check expiration |
398 | 403 | HashFieldValue field_value; |
399 | 404 | if (!HashFieldValue::Decode(iter->value(), &field_value)) { |
400 | 405 | continue; // Skip corrupted values |
401 | 406 | } |
402 | 407 | if (field_value.IsExpired()) { |
| 408 | + AsyncRepairHash(ns_key, ikey.GetSubKey(), metadata); |
403 | 409 | continue; // Skip expired fields |
404 | 410 | } |
405 | 411 |
|
406 | | - InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); |
407 | 412 | if (spec.reversed) { |
408 | 413 | if (ikey.GetSubKey().ToString() < spec.min || (spec.minex && ikey.GetSubKey().ToString() == spec.min)) { |
409 | 414 | break; |
@@ -445,22 +450,22 @@ rocksdb::Status Hash::GetAll(engine::Context &ctx, const Slice &user_key, std::v |
445 | 450 |
|
446 | 451 | auto iter = util::UniqueIterator(ctx, read_options); |
447 | 452 | for (iter->Seek(prefix_key); iter->Valid() && iter->key().starts_with(prefix_key); iter->Next()) { |
| 453 | + InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); |
448 | 454 | // Decode and check expiration for all fetch types |
449 | 455 | HashFieldValue field_value; |
450 | 456 | if (!HashFieldValue::Decode(iter->value(), &field_value)) { |
451 | 457 | continue; // Skip corrupted values |
452 | 458 | } |
453 | 459 | if (field_value.IsExpired()) { |
| 460 | + AsyncRepairHash(ns_key, ikey.GetSubKey(), metadata); |
454 | 461 | continue; // Skip expired fields |
455 | 462 | } |
456 | 463 |
|
457 | 464 | if (type == HashFetchType::kOnlyKey) { |
458 | | - InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); |
459 | 465 | field_values->emplace_back(ikey.GetSubKey().ToString(), ""); |
460 | 466 | } else if (type == HashFetchType::kOnlyValue) { |
461 | 467 | field_values->emplace_back("", field_value.value.ToString()); |
462 | 468 | } else { |
463 | | - InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); |
464 | 469 | field_values->emplace_back(ikey.GetSubKey().ToString(), field_value.value.ToString()); |
465 | 470 | } |
466 | 471 | } |
@@ -674,4 +679,33 @@ rocksdb::Status Hash::PersistFields(engine::Context &ctx, const Slice &user_key, |
674 | 679 | return rocksdb::Status::OK(); |
675 | 680 | } |
676 | 681 |
|
| 682 | +void Hash::AsyncRepairHash(const std::string &ns_key, const Slice &field, const HashMetadata &metadata) const { |
| 683 | + auto repair_task = [storage = storage_, ns_key, field_str = field.ToString(), version = metadata.version, |
| 684 | + size = metadata.size]() { |
| 685 | + engine::Context ctx(storage); |
| 686 | + auto batch = storage->GetWriteBatchBase(); |
| 687 | + std::string sub_key = InternalKey(ns_key, field_str, version, storage->IsSlotIdEncoded()).Encode(); |
| 688 | + batch->Delete(sub_key); |
| 689 | + |
| 690 | + // Use Merge to decrement the size atomically. This may temporarily result in inconsistent |
| 691 | + // metadata if multiple repairs run concurrently or if the size reaches 0, leaving an empty |
| 692 | + // hash. However, this is acceptable as: |
| 693 | + // 1. The inconsistency is temporary and will be corrected by subsequent operations |
| 694 | + // 2. Empty hashes with size=0 will be cleaned up during the next compaction |
| 695 | + // 3. Re-reading metadata would add overhead and complexity without eliminating all race conditions |
| 696 | + HashMetadata new_metadata(false); |
| 697 | + new_metadata.size = -1; |
| 698 | + std::string bytes; |
| 699 | + new_metadata.Encode(&bytes); |
| 700 | + batch->Merge(storage->GetCFHandle(ColumnFamilyID::Metadata), ns_key, bytes); |
| 701 | + |
| 702 | + auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); |
| 703 | + if (!s.ok()) { |
| 704 | + error("Failed to async repair hash field: {}", s.ToString()); |
| 705 | + } |
| 706 | + }; |
| 707 | + |
| 708 | + std::thread(repair_task).detach(); |
| 709 | +} |
| 710 | + |
677 | 711 | } // namespace redis |
0 commit comments