Skip to content

Commit dcc96b2

Browse files
authored
[FSTORE-1412] Feature monitoring improvements (#980)
1 parent 66b86bf commit dcc96b2

78 files changed

Lines changed: 14785 additions & 3106 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

java/hsfs/src/main/java/com/logicalclocks/hsfs/StatisticsConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ public class StatisticsConfig {
5050
@Setter
5151
private List<String> columns = new ArrayList<>();
5252

53+
@Getter
54+
@Setter
55+
private Boolean kll = false;
56+
57+
@Getter
58+
@Setter
59+
private Integer histogramBins = null;
60+
5361
public StatisticsConfig(Boolean enabled, Boolean histograms, Boolean correlations,
5462
Boolean exactUniqueness) {
5563
this.enabled = enabled;

java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/FeatureDescriptiveStatistics.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import lombok.Setter;
2525
import org.json.JSONObject;
2626

27-
import java.util.Map;
27+
import java.util.List;
2828

2929
@JsonIgnoreProperties(ignoreUnknown = true)
3030
@NoArgsConstructor
@@ -48,8 +48,8 @@ public class FeatureDescriptiveStatistics extends RestDto<FeatureDescriptiveStat
4848
private Double max;
4949
private Double sum;
5050
private Double mean;
51-
private Double stddev;
52-
private Map<String, Double> percentiles;
51+
private Double stdDev;
52+
private List<Double> percentiles;
5353

5454
// with exact uniqueness
5555
private Double distinctness;
@@ -122,7 +122,7 @@ public static FeatureDescriptiveStatistics fromDeequStatisticsJson(JSONObject st
122122
fds.setMean(statsJson.getDouble("mean"));
123123
}
124124
if (statsJson.has("stdDev")) {
125-
fds.setStddev(statsJson.getDouble("stdDev"));
125+
fds.setStdDev(statsJson.getDouble("stdDev"));
126126
}
127127

128128
JSONObject extendedStatistics = new JSONObject();

java/pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<lombok.version>1.18.36</lombok.version>
2828
<lombok.plugin.version>1.18.20.0</lombok.plugin.version>
2929
<fasterxml.jackson.databind.version>2.10.0</fasterxml.jackson.databind.version>
30-
<deequ.version>1.1.0.6-SNAPSHOT</deequ.version>
30+
<datasketches.version>6.2.0</datasketches.version>
3131
<spark.version>3.5.5.1</spark.version>
3232
<hudi.version>1.0.2.1</hudi.version>
3333
<awssdk.version>2.10.40</awssdk.version>
@@ -295,7 +295,6 @@
295295
<profile>
296296
<id>spark-3.3</id>
297297
<properties>
298-
<deequ.version>2.0.4.0-spark-3.3</deequ.version>
299298
<artifact.spark.version>spark3.3</artifact.spark.version>
300299
</properties>
301300
</profile>

java/spark/pom.xml

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,12 @@
5252
</exclusions>
5353
</dependency>
5454

55+
<!-- datasketches-java: KllDoublesSketch for mergeable percentile sketches.
56+
Replaces the Deequ-based profiling path as of Phase 1.5 (FSTORE-1412). -->
5557
<dependency>
56-
<groupId>com.logicalclocks</groupId>
57-
<artifactId>deequ_${scala-short.version}</artifactId>
58-
<version>${deequ.version}</version>
59-
<exclusions>
60-
<exclusion>
61-
<groupId>org.apache.spark</groupId>
62-
<artifactId>*</artifactId>
63-
</exclusion>
64-
<exclusion>
65-
<groupId>org.scala-lang</groupId>
66-
<artifactId>*</artifactId>
67-
</exclusion>
68-
</exclusions>
58+
<groupId>org.apache.datasketches</groupId>
59+
<artifactId>datasketches-java</artifactId>
60+
<version>${datasketches.version}</version>
6961
</dependency>
7062

7163
<dependency>

java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,6 @@
8484
import org.apache.spark.sql.types.TimestampType;
8585
import org.json.JSONObject;
8686

87-
import com.amazon.deequ.profiles.ColumnProfilerRunBuilder;
88-
import com.amazon.deequ.profiles.ColumnProfilerRunner;
89-
import com.amazon.deequ.profiles.ColumnProfiles;
9087
import com.google.common.base.Strings;
9188
import com.google.common.collect.Lists;
9289
import com.google.common.collect.Maps;
@@ -115,6 +112,7 @@
115112
import com.logicalclocks.hsfs.spark.TrainingDataset;
116113
import com.logicalclocks.hsfs.spark.constructor.Query;
117114
import com.logicalclocks.hsfs.spark.engine.hudi.HudiEngine;
115+
import com.logicalclocks.hsfs.spark.engine.profile.ColumnProfiler;
118116
import com.logicalclocks.hsfs.spark.util.StorageConnectorUtils;
119117
import com.logicalclocks.hsfs.util.Constants;
120118

@@ -666,28 +664,22 @@ private void writeSparkDataset(FeatureGroupBase featureGroup, Dataset<Row> datas
666664
.saveAsTable(featureGroupUtils.getTableName(featureGroup));
667665
}
668666

667+
public String profile(Dataset<Row> df, List<String> restrictToColumns, Boolean correlation,
668+
Boolean histogram, Boolean exactUniqueness, Boolean kll, Integer histogramBins) {
669+
// defaults aligned with the prior Deequ-backed implementation; preserved for training-dataset
670+
// callers where the backend doesn't set them.
671+
boolean correlationFlag = correlation == null ? true : correlation;
672+
boolean histogramFlag = histogram == null ? true : histogram;
673+
boolean exactUniquenessFlag = exactUniqueness == null ? true : exactUniqueness;
674+
boolean kllFlag = kll != null && kll;
675+
int binCount = histogramBins != null ? histogramBins : 20;
676+
return new ColumnProfiler().profile(df, restrictToColumns, correlationFlag, histogramFlag,
677+
binCount, exactUniquenessFlag, kllFlag);
678+
}
679+
669680
public String profile(Dataset<Row> df, List<String> restrictToColumns, Boolean correlation,
670681
Boolean histogram, Boolean exactUniqueness) {
671-
// only needed for training datasets, as the backend is not setting the defaults
672-
if (correlation == null) {
673-
correlation = true;
674-
}
675-
if (histogram == null) {
676-
histogram = true;
677-
}
678-
if (exactUniqueness == null) {
679-
exactUniqueness = true;
680-
}
681-
ColumnProfilerRunBuilder runner = new ColumnProfilerRunner()
682-
.onData(df)
683-
.withCorrelation(correlation, 100)
684-
.withHistogram(histogram, 20)
685-
.withExactUniqueness(exactUniqueness);
686-
if (restrictToColumns != null && !restrictToColumns.isEmpty()) {
687-
runner.restrictToColumns(JavaConverters.asScalaIteratorConverter(restrictToColumns.iterator()).asScala().toSeq());
688-
}
689-
ColumnProfiles result = runner.run();
690-
return ColumnProfiles.toJson(result.profiles().values().toSeq(), result.numRecords());
682+
return profile(df, restrictToColumns, correlation, histogram, exactUniqueness, null, null);
691683
}
692684

693685
public String profile(Dataset<Row> df, List<String> restrictToColumns, Boolean correlation, Boolean histogram) {

0 commit comments

Comments
 (0)