Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions axiom/connectors/ConnectorMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

#include "axiom/connectors/ConnectorMetadata.h"

#include <fmt/format.h>
#include <folly/String.h>
#include <sstream>

namespace facebook::axiom::connector {
namespace {

Expand Down Expand Up @@ -226,4 +230,62 @@ void ConnectorMetadata::unregisterMetadata(std::string_view connectorId) {
metadataRegistry().erase(connectorId);
}

std::string ColumnStatistics::toString() const {
std::vector<std::string> parts;

if (!name.empty()) {
parts.push_back(fmt::format("name={}", name));
}

if (nonNull) {
parts.push_back("nonNull=true");
}

if (nullPct != 0) {
parts.push_back(fmt::format("nullPct={}", nullPct));
}

if (min.has_value()) {
std::ostringstream oss;
oss << min.value();
parts.push_back("min=" + oss.str());
}

if (max.has_value()) {
std::ostringstream oss;
oss << max.value();
parts.push_back("max=" + oss.str());
}

if (maxLength.has_value()) {
parts.push_back(fmt::format("maxLength={}", maxLength.value()));
}

if (ascendingPct.has_value()) {
parts.push_back(fmt::format("ascendingPct={}", ascendingPct.value()));
}

if (descendingPct.has_value()) {
parts.push_back(fmt::format("descendingPct={}", descendingPct.value()));
}

if (avgLength.has_value()) {
parts.push_back(fmt::format("avgLength={}", avgLength.value()));
}

if (numDistinct.has_value()) {
parts.push_back(fmt::format("numDistinct={}", numDistinct.value()));
}

if (numValues != 0) {
parts.push_back(fmt::format("numValues={}", numValues));
}

if (!children.empty()) {
parts.push_back(fmt::format("children={}", children.size()));
}

return fmt::format("<{}>", folly::join(", ", parts));
}

} // namespace facebook::axiom::connector
16 changes: 16 additions & 0 deletions axiom/connectors/ConnectorMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ struct ColumnStatistics {
/// map, may have one element for each key. In all cases, stats may be
/// missing.
std::vector<ColumnStatistics> children;

/// Returns a string representation of the statistics in the form
/// <field1=value1,...> containing only fields that have values.
std::string toString() const;
};

/// Abstract representation of statistics per data covered by a PartitionHandle.
struct PartitionStatistics {
int32_t numRows{0};
int64_t numFiles{0};

/// Column names.
std::vector<std::string> columns;

/// Column statistics, 1:1 to column names.
std::vector<ColumnStatistics> columnStatistics;
};

/// Base class for column. The column's name and type are immutable but the
Expand Down
24 changes: 24 additions & 0 deletions axiom/connectors/ConnectorSplitManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,24 @@ struct SplitOptions {
class PartitionHandle {
public:
virtual ~PartitionHandle() = default;

virtual std::string toString() const {
return "PartitionHandle";
}

/// Returns the partition path string in Hive format (e.g., "ds=2023-01-01/product=p1").
/// Returns an empty string for unpartitioned tables.
virtual const std::string& partition() const {
static std::string empty;
return empty;
}
};

using PartitionHandlePtr = std::shared_ptr<const PartitionHandle>;

struct PartitionStatistics;
using PartitionStatisticsPtr = std::shared_ptr<PartitionStatistics>;

class ConnectorSplitManager {
public:
virtual ~ConnectorSplitManager() = default;
Expand All @@ -73,6 +87,16 @@ class ConnectorSplitManager {
const ConnectorSessionPtr& session,
const velox::connector::ConnectorTableHandlePtr& tableHandle) = 0;

/// Returns per-partition statistics for the 'partitions' and
/// 'columns'. This is a separate function because split enumeration
/// for reading files transfers less data. Typically use only a
/// subset of the partitions to get stats.
virtual std::vector<PartitionStatisticsPtr> getPartitionStatistics(
std::span<const PartitionHandlePtr> partitions,
const std::vector<std::string>& columns) {
return {};
}

/// Returns a SplitSource that covers the contents of 'partitions'. The set of
/// partitions is exposed separately so that the caller may process the
/// partitions in a specific order or distribute them to specific nodes in a
Expand Down
17 changes: 17 additions & 0 deletions axiom/connectors/hive/HiveConnectorMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@

namespace facebook::axiom::connector::hive {

std::string HivePartitionHandle::makePartitionString(
const folly::F14FastMap<std::string, std::optional<std::string>>& keys) {
if (keys.empty()) {
return "";
}
std::vector<std::string> parts;
parts.reserve(keys.size());
for (const auto& [key, value] : keys) {
if (value.has_value()) {
parts.push_back(fmt::format("{}={}", key, value.value()));
} else {
parts.push_back(fmt::format("{}=__HIVE_DEFAULT_PARTITION__", key));
}
}
return folly::join("/", parts);
}

const PartitionType* HivePartitionType::copartition(
const PartitionType& other) const {
if (const auto* otherPartitionType = other.as<HivePartitionType>()) {
Expand Down
19 changes: 18 additions & 1 deletion axiom/connectors/hive/HiveConnectorMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include <fmt/format.h>
#include <folly/String.h>
#include "axiom/connectors/ConnectorMetadata.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSink.h"
Expand All @@ -33,11 +35,22 @@ struct HivePartitionHandle : public PartitionHandle {
folly::F14FastMap<std::string, std::optional<std::string>> partitionKeys,
std::optional<int32_t> tableBucketNumber)
: partitionKeys(std::move(partitionKeys)),
tableBucketNumber(tableBucketNumber) {}
tableBucketNumber(tableBucketNumber),
partition_(makePartitionString(this->partitionKeys)) {}

const folly::F14FastMap<std::string, std::optional<std::string>>
partitionKeys;
const std::optional<int32_t> tableBucketNumber;

const std::string& partition() const override {
return partition_;
}

private:
static std::string makePartitionString(
const folly::F14FastMap<std::string, std::optional<std::string>>& keys);

const std::string partition_;
};

/// For Hive, 'partition' means 'bucket'.
Expand Down Expand Up @@ -124,6 +137,10 @@ class HiveTableLayout : public TableLayout {
return partitionType_.has_value() ? &partitionType_.value() : nullptr;
}

std::span<const Column* const> discretePredicateColumns() const override {
return hivePartitionColumns_;
}

/// Returns SerDe parameters for this layout. Default implementation returns
/// empty map. Derived classes can override to provide actual parameters.
virtual const std::unordered_map<std::string, std::string>& serdeParameters()
Expand Down
Loading
Loading