Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions axiom/cli/Console.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ DEFINE_string(

DEFINE_bool(debug, false, "Enable debug mode");

DEFINE_bool(sample_joins, false, "Enable join sampling");
DEFINE_bool(sample_filters, false, "Enable filter sampling");

using namespace facebook::velox;

namespace axiom::sql {
Expand Down Expand Up @@ -222,6 +225,8 @@ void Console::runNoThrow(std::string_view sql, bool isInteractive) {
.splitTargetBytes = FLAGS_split_target_bytes,
.optimizerTraceFlags = FLAGS_optimizer_trace,
.debugMode = FLAGS_debug,
.sampleJoins = FLAGS_sample_joins,
.sampleFilters = FLAGS_sample_filters,
};

decltype(runner_.parseMultiple(sql, options)) statements;
Expand Down
9 changes: 8 additions & 1 deletion axiom/cli/SqlQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ std::string SqlQueryRunner::runExplain(
case presto::ExplainStatement::Type::kExecutable:
return optimize(logicalPlan, newQuery(options), options).toString();
}
VELOX_UNREACHABLE();
}

namespace {
Expand Down Expand Up @@ -317,14 +318,20 @@ optimizer::PlanAndStats SqlQueryRunner::optimize(

auto session = std::make_shared<Session>(queryCtx->queryId());

optimizer::OptimizerOptions optimizerOptions{
.sampleJoins = options.sampleJoins,
.sampleFilters = options.sampleFilters,
.traceFlags = options.optimizerTraceFlags,
};

optimizer::Optimization optimization(
session,
*logicalPlan,
*schema_,
*history_,
queryCtx,
evaluator,
{.traceFlags = options.optimizerTraceFlags},
optimizerOptions,
opts);

if (checkDerivedTable && !checkDerivedTable(*optimization.rootDt())) {
Expand Down
6 changes: 6 additions & 0 deletions axiom/cli/SqlQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class SqlQueryRunner {

/// If true, EXPLAIN ANALYZE output includes custom operator stats.
bool debugMode{false};

/// If true, enable join sampling.
bool sampleJoins{false};

/// If true, enable filter sampling.
bool sampleFilters{false};
};

/// Runs a single SQL statement and returns the result.
Expand Down
62 changes: 62 additions & 0 deletions axiom/connectors/ConnectorMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

#include "axiom/connectors/ConnectorMetadata.h"

#include <fmt/format.h>
#include <folly/String.h>
#include <sstream>

namespace facebook::axiom::connector {
namespace {

Expand Down Expand Up @@ -226,4 +230,62 @@ void ConnectorMetadata::unregisterMetadata(std::string_view connectorId) {
metadataRegistry().erase(connectorId);
}

std::string ColumnStatistics::toString() const {
std::vector<std::string> parts;

if (!name.empty()) {
parts.push_back(fmt::format("name={}", name));
}

if (nonNull) {
parts.push_back("nonNull=true");
}

if (nullPct != 0) {
parts.push_back(fmt::format("nullPct={}", nullPct));
}

if (min.has_value()) {
std::ostringstream oss;
oss << min.value();
parts.push_back("min=" + oss.str());
}

if (max.has_value()) {
std::ostringstream oss;
oss << max.value();
parts.push_back("max=" + oss.str());
}

if (maxLength.has_value()) {
parts.push_back(fmt::format("maxLength={}", maxLength.value()));
}

if (ascendingPct.has_value()) {
parts.push_back(fmt::format("ascendingPct={}", ascendingPct.value()));
}

if (descendingPct.has_value()) {
parts.push_back(fmt::format("descendingPct={}", descendingPct.value()));
}

if (avgLength.has_value()) {
parts.push_back(fmt::format("avgLength={}", avgLength.value()));
}

if (numDistinct.has_value()) {
parts.push_back(fmt::format("numDistinct={}", numDistinct.value()));
}

if (numValues != 0) {
parts.push_back(fmt::format("numValues={}", numValues));
}

if (!children.empty()) {
parts.push_back(fmt::format("children={}", children.size()));
}

return fmt::format("<{}>", folly::join(", ", parts));
}

} // namespace facebook::axiom::connector
16 changes: 16 additions & 0 deletions axiom/connectors/ConnectorMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ struct ColumnStatistics {
/// map, may have one element for each key. In all cases, stats may be
/// missing.
std::vector<ColumnStatistics> children;

/// Returns a string representation of the statistics in the form
/// <field1=value1,...> containing only fields that have values.
std::string toString() const;
};

/// Abstract representation of statistics per data covered by a PartitionHandle.
struct PartitionStatistics {
int32_t numRows{0};
int64_t numFiles{0};

/// Column names.
std::vector<std::string> columns;

/// Column statistics, 1:1 to column names.
std::vector<ColumnStatistics> columnStatistics;
};

/// Base class for column. The column's name and type are immutable but the
Expand Down
25 changes: 25 additions & 0 deletions axiom/connectors/ConnectorSplitManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,25 @@ struct SplitOptions {
class PartitionHandle {
public:
virtual ~PartitionHandle() = default;

virtual std::string toString() const {
return "PartitionHandle";
}

/// Returns the partition path string in Hive format (e.g.,
/// "ds=2023-01-01/product=p1"). Returns an empty string for unpartitioned
/// tables.
virtual const std::string& partition() const {
static std::string empty;
return empty;
}
};

using PartitionHandlePtr = std::shared_ptr<const PartitionHandle>;

struct PartitionStatistics;
using PartitionStatisticsPtr = std::shared_ptr<PartitionStatistics>;

class ConnectorSplitManager {
public:
virtual ~ConnectorSplitManager() = default;
Expand All @@ -73,6 +88,16 @@ class ConnectorSplitManager {
const ConnectorSessionPtr& session,
const velox::connector::ConnectorTableHandlePtr& tableHandle) = 0;

/// Returns per-partition statistics for the 'partitions' and
/// 'columns'. This is a separate function because split enumeration
/// for reading files transfers less data. Typically use only a
/// subset of the partitions to get stats.
virtual std::vector<PartitionStatisticsPtr> getPartitionStatistics(
std::span<const PartitionHandlePtr> partitions,
const std::vector<std::string>& columns) {
return {};
}

/// Returns a SplitSource that covers the contents of 'partitions'. The set of
/// partitions is exposed separately so that the caller may process the
/// partitions in a specific order or distribute them to specific nodes in a
Expand Down
35 changes: 35 additions & 0 deletions axiom/connectors/hive/HiveConnectorMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,29 @@
#include "velox/exec/TableWriter.h"
#include "velox/expression/ExprConstants.h"

#include <fmt/format.h>
#include <folly/String.h>
#include <algorithm>

namespace facebook::axiom::connector::hive {

std::string HivePartitionHandle::makePartitionString(
const folly::F14FastMap<std::string, std::optional<std::string>>& keys) {
if (keys.empty()) {
return "";
}
std::vector<std::string> parts;
parts.reserve(keys.size());
for (const auto& [key, value] : keys) {
if (value.has_value()) {
parts.push_back(fmt::format("{}={}", key, value.value()));
} else {
parts.push_back(fmt::format("{}=__HIVE_DEFAULT_PARTITION__", key));
}
}
return folly::join("/", parts);
}

const PartitionType* HivePartitionType::copartition(
const PartitionType& other) const {
if (const auto* otherPartitionType = other.as<HivePartitionType>()) {
Expand Down Expand Up @@ -376,4 +397,18 @@ void HiveConnectorMetadata::validateOptions(
}
}

std::string HivePartitionHandle::toString() const {
std::string result = partition();

// Append bucket number if present
if (tableBucketNumber.has_value()) {
if (!result.empty()) {
result += ", ";
}
result += fmt::format("buckets={}", tableBucketNumber.value());
}

return fmt::format("<hive partition: {}>", result);
}

} // namespace facebook::axiom::connector::hive
25 changes: 24 additions & 1 deletion axiom/connectors/hive/HiveConnectorMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include <fmt/format.h>
#include <folly/String.h>
#include "axiom/connectors/ConnectorMetadata.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSink.h"
Expand All @@ -33,11 +35,24 @@ struct HivePartitionHandle : public PartitionHandle {
folly::F14FastMap<std::string, std::optional<std::string>> partitionKeys,
std::optional<int32_t> tableBucketNumber)
: partitionKeys(std::move(partitionKeys)),
tableBucketNumber(tableBucketNumber) {}
tableBucketNumber(tableBucketNumber),
partition_(makePartitionString(this->partitionKeys)) {}

std::string toString() const override;

const folly::F14FastMap<std::string, std::optional<std::string>>
partitionKeys;
const std::optional<int32_t> tableBucketNumber;

const std::string& partition() const override {
return partition_;
}

private:
static std::string makePartitionString(
const folly::F14FastMap<std::string, std::optional<std::string>>& keys);

const std::string partition_;
};

/// For Hive, 'partition' means 'bucket'.
Expand All @@ -61,6 +76,10 @@ class HivePartitionType : public connector::PartitionType {

std::string toString() const override;

int32_t numPartitions() const {
return numPartitions_;
}

private:
const int32_t numPartitions_;
const std::vector<velox::TypePtr> partitionKeyTypes_;
Expand Down Expand Up @@ -124,6 +143,10 @@ class HiveTableLayout : public TableLayout {
return partitionType_.has_value() ? &partitionType_.value() : nullptr;
}

std::span<const Column* const> discretePredicateColumns() const override {
return hivePartitionColumns_;
}

/// Returns SerDe parameters for this layout. Default implementation returns
/// empty map. Derived classes can override to provide actual parameters.
virtual const std::unordered_map<std::string, std::string>& serdeParameters()
Expand Down
Loading
Loading