Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/Storages/PruneShards.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <Storages/IStorage.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/Stream/StorageStream.h>
#include <Storages/parseShards.h>
#include <Common/assert_cast.h>
#include <Common/logger_useful.h>
Expand Down Expand Up @@ -257,6 +258,22 @@ QueryMode getQueryMode(ConstStoragePtr storage, const SelectQueryInfo & query_in

if (require_back_fill_from_historical)
{
/// For time-based seeks, try resolving via NativeLog first.
/// If the streaming store still has the data, skip the expensive
/// historical MergeTree scan entirely.
if (query_info.seek_to_info->isTimeBased())
{
if (const auto * stream_storage = dynamic_cast<const StorageStream *>(storage.get()))
{
if (auto resolved = stream_storage->tryResolveTimeSeekViaStreamingStore(query_info.seek_to_info))
{
query_info.seek_to_info->seek_points = std::move(*resolved);
query_info.seek_to_info->type = SeekToType::SEQUENCE_NUMBER;
return QueryMode::Streaming;
}
}
}

/// By default, we will seek to earliest for backfill concat
if (query_info.seek_to_info->getSeekTo().empty())
query_info.seek_to_info->seek_points = {cluster::Constants::EarliestSN};
Expand Down
48 changes: 48 additions & 0 deletions src/Storages/Stream/StorageStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1682,4 +1682,52 @@ IStorage::SnapshotDataWithExpiration StorageStream::readSnapshot(const Names & r

return {std::move(result), std::move(snapshot_expired)};
}

std::optional<std::vector<Int64>> StorageStream::tryResolveTimeSeekViaStreamingStore(const SeekToInfoPtr & seek_to_info) const
{
if (!seek_to_info || !seek_to_info->isTimeBased())
return std::nullopt;

auto local_shards = stream_shards;
if (local_shards.empty())
return std::nullopt;

try
{
for (const auto & shard : local_shards)
{
if (shard->isVirtualReplica() || shard->isInmemory())
continue;

auto seek_copy = std::make_shared<SeekToInfo>(*seek_to_info);
seek_copy->replicateForShards(shards);

auto resolved_sns = shard->sequencesForTimestamps(seek_copy->getSeekPoints());

/// Check all resolved SNs are still available in NativeLog
bool all_available = true;
for (UInt32 i = 0; i < shards && all_available; ++i)
{
auto range = local_shards[i]->sequenceRange();
if (range.first < 0 || resolved_sns[i] < range.first)
all_available = false;
}

if (all_available)
{
LOG_INFO(log, "Time-based seek resolved via streaming store, skipping historical backfill");
return resolved_sns;
}

LOG_DEBUG(log, "Time-based seek data partially compacted, falling back to historical backfill");
return std::nullopt;
}
}
catch (...)
{
LOG_DEBUG(log, "Failed to resolve time-based seek via streaming store, falling back to historical backfill");
}

return std::nullopt;
}
}
6 changes: 6 additions & 0 deletions src/Storages/Stream/StorageStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/SeekToInfo.h>
#include <Storages/QueryShard.h>
#include <Storages/ShardAppliedSequence.h>
#include <Storages/ShardCommittedSequence.h>
Expand Down Expand Up @@ -269,6 +270,11 @@ class StorageStream final : public shared_ptr_helper<StorageStream>, public Merg

std::vector<int64_t> getLastSNs() const;

/// For time-based seek_to, probe NativeLog to check if the streaming store
/// still has the requested data. Returns resolved sequence numbers per shard
/// if available, std::nullopt if the data has been compacted away.
std::optional<std::vector<Int64>> tryResolveTimeSeekViaStreamingStore(const SeekToInfoPtr & seek_to_info) const;

bool supportsStreamingQuery() const override { return true; }

friend class StreamSink;
Expand Down