diff --git a/src/Storages/PruneShards.cpp b/src/Storages/PruneShards.cpp index 1c5ecff09e..3f5cd9f068 100644 --- a/src/Storages/PruneShards.cpp +++ b/src/Storages/PruneShards.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -257,6 +258,22 @@ QueryMode getQueryMode(ConstStoragePtr storage, const SelectQueryInfo & query_in if (require_back_fill_from_historical) { + /// For time-based seeks, try resolving via NativeLog first. + /// If the streaming store still has the data, skip the expensive + /// historical MergeTree scan entirely. + if (query_info.seek_to_info->isTimeBased()) + { + if (const auto * stream_storage = dynamic_cast(storage.get())) + { + if (auto resolved = stream_storage->tryResolveTimeSeekViaStreamingStore(query_info.seek_to_info)) + { + query_info.seek_to_info->seek_points = std::move(*resolved); + query_info.seek_to_info->type = SeekToType::SEQUENCE_NUMBER; + return QueryMode::Streaming; + } + } + } + /// By default, we will seek to earliest for backfill concat if (query_info.seek_to_info->getSeekTo().empty()) query_info.seek_to_info->seek_points = {cluster::Constants::EarliestSN}; diff --git a/src/Storages/Stream/StorageStream.cpp b/src/Storages/Stream/StorageStream.cpp index 2dd4f8404f..2ec30a3fd2 100644 --- a/src/Storages/Stream/StorageStream.cpp +++ b/src/Storages/Stream/StorageStream.cpp @@ -1682,4 +1682,52 @@ IStorage::SnapshotDataWithExpiration StorageStream::readSnapshot(const Names & r return {std::move(result), std::move(snapshot_expired)}; } + +std::optional> StorageStream::tryResolveTimeSeekViaStreamingStore(const SeekToInfoPtr & seek_to_info) const +{ + if (!seek_to_info || !seek_to_info->isTimeBased()) + return std::nullopt; + + auto local_shards = stream_shards; + if (local_shards.empty()) + return std::nullopt; + + try + { + for (const auto & shard : local_shards) + { + if (shard->isVirtualReplica() || shard->isInmemory()) + continue; + + auto seek_copy = std::make_shared(*seek_to_info); + seek_copy->replicateForShards(shards); + + auto resolved_sns = shard->sequencesForTimestamps(seek_copy->getSeekPoints()); + + /// Check all resolved SNs are still available in NativeLog + bool all_available = true; + for (UInt32 i = 0; i < shards && all_available; ++i) + { + auto range = local_shards[i]->sequenceRange(); + if (range.first < 0 || resolved_sns[i] < range.first) + all_available = false; + } + + if (all_available) + { + LOG_INFO(log, "Time-based seek resolved via streaming store, skipping historical backfill"); + return resolved_sns; + } + + LOG_DEBUG(log, "Time-based seek data partially compacted, falling back to historical backfill"); + return std::nullopt; + } + } + catch (...) + { + LOG_DEBUG(log, "Failed to resolve time-based seek via streaming store, falling back to historical backfill"); + } + + return std::nullopt; +} } diff --git a/src/Storages/Stream/StorageStream.h b/src/Storages/Stream/StorageStream.h index 08677d8373..552e42dffc 100644 --- a/src/Storages/Stream/StorageStream.h +++ b/src/Storages/Stream/StorageStream.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -269,6 +270,11 @@ class StorageStream final : public shared_ptr_helper, public Merg std::vector getLastSNs() const; + /// For time-based seek_to, probe NativeLog to check if the streaming store + /// still has the requested data. Returns resolved sequence numbers per shard + /// if available, std::nullopt if the data has been compacted away. + std::optional> tryResolveTimeSeekViaStreamingStore(const SeekToInfoPtr & seek_to_info) const; + bool supportsStreamingQuery() const override { return true; } friend class StreamSink;