Skip to content

Commit 2d0a5aa

Browse files
osopardo1osopardo1
osopardo1
authored and
osopardo1
committed
Revert "Merge pull request #167 from cugni/spark-3.3.0_delta-2.1.0"
This reverts commit cb19cd4, reversing changes made to d72fea5.
1 parent b9a96b3 commit 2d0a5aa

File tree

11 files changed

+50
-75
lines changed

11 files changed

+50
-75
lines changed

README.md

+23-12
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
4. **Table Tolerance** - Model for sampling fraction and **query accuracy** trade-off.
4040

4141

42-
## Query example with Qbeast
42+
### What does it mean? - Let's see an example:
4343

4444
<div>
4545
<img src="./docs/images/spark_delta_demo.gif" width="49%" alt="Demo for Delta format GIF" />
@@ -49,13 +49,25 @@
4949

5050
As you can see above, the Qbeast Spark extension allows **faster** queries with statistically **accurate** sampling.
5151

52+
5253
| Format | Execution Time | Result |
5354
|--------|:--------------:|:---------:|
54-
| Delta | ~ 151.3 sec. | 37.869383 |
55-
| Qbeast | ~ 6.6 sec. | 37.856333 |
55+
| Delta | ~ 2.5 min. | 37.869383 |
56+
| Qbeast | ~ 6.6 sec. | 37.856333 |
5657

5758
In this example, **1% sampling** provides the result **x22 times faster** compared to using Delta format, with an **error of 0,034%**.
5859

60+
# Getting Started
61+
62+
>### Warning: DO NOT USE IN PRODUCTION!
63+
> This project is in an early development phase: there are missing functionalities and the API might change drastically.
64+
>
65+
> Join ⨝ the community to be a part of this project!
66+
>
67+
> See Issues tab to know what is cooking 😎
68+
69+
70+
5971

6072
# Quickstart
6173
You can run the qbeast-spark application locally on your computer, or using a Docker image we already prepared with the dependencies.
@@ -67,11 +79,11 @@ Download **Spark 3.1.1 with Hadoop 3.2**, unzip it, and create the `SPARK_HOME`
6779
>:information_source: **Note**: You can use Hadoop 2.7 if desired, but you could have some troubles with different cloud providers' storage, read more about it [here](docs/CloudStorages.md).
6880
6981
```bash
70-
wget https://www.apache.org/dyn/closer.lua/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
82+
wget https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
7183

72-
tar xzvf spark-3.3.2-bin-hadoop3.tgz
84+
tar xzvf spark-3.1.1-bin-hadoop3.2.tgz
7385

74-
export SPARK_HOME=$PWD/spark-3.3.2-bin-hadoop3
86+
export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2
7587
```
7688
### 1. Launch a spark-shell
7789

@@ -164,12 +176,11 @@ Go to [QbeastTable documentation](./docs/QbeastTable.md) for more detailed infor
164176
Use [Python index visualizer](./utils/visualizer/README.md) for your indexed table to visually examine index structure and gather sampling metrics.
165177

166178
# Dependencies and Version Compatibility
167-
| Version | Spark | Hadoop | Delta Lake |
168-
|----------------------|:-----:|:------:|:----------:|
169-
| 0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
170-
| 0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
171-
| 0.3.x | 3.2.x | 3.3.x | 1.2.x |
172-
| 0.4.x (coming soon!) | 3.3.x | 3.3.x | 2.1.x |
179+
| Version | Spark | Hadoop | Delta Lake |
180+
|---------|:-----:|:------:|:----------:|
181+
| 0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
182+
| 0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
183+
| 0.3.x | 3.2.x | 3.3.x | 1.2.x |
173184

174185
Check [here](https://docs.delta.io/latest/releases.html) for **Delta Lake** and **Apache Spark** version compatibility.
175186

project/Dependencies.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import sbt._
44
* External libraries used in the project with versions.
55
*/
66
object Dependencies {
7-
lazy val sparkVersion: String = sys.props.get("spark.version").getOrElse("3.3.0")
8-
lazy val hadoopVersion: String = sys.props.get("hadoop.version").getOrElse("3.3.4")
9-
lazy val deltaVersion: String = "2.1.0"
7+
lazy val sparkVersion: String = sys.props.get("spark.version").getOrElse("3.2.2")
8+
lazy val hadoopVersion: String = sys.props.get("hadoop.version").getOrElse("3.3.1")
9+
lazy val deltaVersion: String = "1.2.0"
1010

1111
val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion
1212
val sparkSql = "org.apache.spark" %% "spark-sql" % sparkVersion

project/build.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version = 1.6.2
1+
sbt.version = 1.5.8

src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala

-25
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,6 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex with Logging {
110110
override def partitionSchema: StructType = index.partitionSchema
111111
}
112112

113-
/**
114-
* Companion object for OTreeIndex
115-
* Builds an OTreeIndex instance from the path to a table
116-
*/
117113
object OTreeIndex {
118114

119115
def apply(spark: SparkSession, path: Path): OTreeIndex = {
@@ -123,24 +119,3 @@ object OTreeIndex {
123119
}
124120

125121
}
126-
127-
/**
128-
* Singleton object for EmptyIndex.
129-
* Used when creating a table with no data added
130-
*/
131-
132-
object EmptyIndex extends FileIndex {
133-
override def rootPaths: Seq[Path] = Seq.empty
134-
135-
override def listFiles(
136-
partitionFilters: Seq[Expression],
137-
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = Seq.empty
138-
139-
override def inputFiles: Array[String] = Array.empty
140-
141-
override def refresh(): Unit = {}
142-
143-
override def sizeInBytes: Long = 0L
144-
145-
override def partitionSchema: StructType = StructType(Seq.empty)
146-
}

src/main/scala/io/qbeast/spark/internal/rules/SaveAsTableRule.scala

+9-7
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@ class SaveAsTableRule(spark: SparkSession) extends Rule[LogicalPlan] with Loggin
2424
// We need to pass the writeOptions as properties to the creation of the table
2525
// to make sure columnsToIndex is present
2626
plan transformDown {
27-
case saveAsSelect: CreateTableAsSelect if isQbeastProvider(saveAsSelect.tableSpec) =>
28-
val finalProperties = saveAsSelect.writeOptions ++ saveAsSelect.tableSpec.properties
29-
saveAsSelect.copy(tableSpec = saveAsSelect.tableSpec.copy(properties = finalProperties))
30-
case replaceAsSelect: ReplaceTableAsSelect if isQbeastProvider(replaceAsSelect.tableSpec) =>
31-
val finalProperties = replaceAsSelect.tableSpec.properties ++ replaceAsSelect.writeOptions
32-
replaceAsSelect.copy(tableSpec =
33-
replaceAsSelect.tableSpec.copy(properties = finalProperties))
27+
case saveAsSelect: CreateTableAsSelect if isQbeastProvider(saveAsSelect.properties) =>
28+
val options = saveAsSelect.writeOptions
29+
val finalProperties = saveAsSelect.properties ++ options
30+
saveAsSelect.copy(properties = finalProperties)
31+
case replaceAsSelect: ReplaceTableAsSelect
32+
if isQbeastProvider(replaceAsSelect.properties) =>
33+
val options = replaceAsSelect.writeOptions
34+
val finalProperties = replaceAsSelect.properties ++ options
35+
replaceAsSelect.copy(properties = finalProperties)
3436
}
3537
}
3638

src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.apache.spark.sql.SQLContext
99
import org.apache.spark.sql.types.{StructField, StructType}
1010
import org.apache.spark.sql.DataFrame
1111
import org.apache.spark.sql.SparkSession
12-
import io.qbeast.spark.delta.{EmptyIndex, OTreeIndex}
12+
import io.qbeast.spark.delta.OTreeIndex
1313
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
1414
import io.qbeast.spark.table.IndexedTable
1515
import io.qbeast.context.QbeastContext
@@ -45,7 +45,7 @@ object QbeastBaseRelation {
4545
// This could happen if we CREATE/REPLACE TABLE without inserting data
4646
// In this case, we use the options variable
4747
new HadoopFsRelation(
48-
EmptyIndex,
48+
OTreeIndex(spark, new Path(tableID.id)),
4949
partitionSchema = StructType(Seq.empty[StructField]),
5050
dataSchema = schema,
5151
bucketSpec = None,

src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala

+3-10
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import org.apache.spark.sql.catalyst.analysis.{
1515
}
1616
import org.apache.spark.sql.{SparkCatalogUtils, SparkSession}
1717
import org.apache.spark.sql.connector.catalog._
18-
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
1918
import org.apache.spark.sql.connector.expressions.Transform
2019
import org.apache.spark.sql.delta.catalog.DeltaCatalog
2120
import org.apache.spark.sql.types.StructType
@@ -30,7 +29,7 @@ import scala.collection.JavaConverters._
3029
* QbeastCatalog uses a session catalog of type T
3130
* to delegate high-level operations
3231
*/
33-
class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatalog]
32+
class QbeastCatalog[T <: TableCatalog with SupportsNamespaces]
3433
extends CatalogExtension
3534
with SupportsNamespaces
3635
with StagingTableCatalog {
@@ -234,8 +233,8 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
234233
override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit =
235234
getSessionCatalog().alterNamespace(namespace, changes.head)
236235

237-
override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean =
238-
getSessionCatalog().dropNamespace(namespace, cascade)
236+
override def dropNamespace(namespace: Array[String]): Boolean =
237+
getSessionCatalog().dropNamespace(namespace)
239238

240239
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
241240
// Initialize the catalog with the corresponding name
@@ -255,10 +254,4 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
255254
} else throw new IllegalArgumentException("Invalid session catalog: " + delegate)
256255
}
257256

258-
override def listFunctions(namespace: Array[String]): Array[Identifier] =
259-
getSessionCatalog().listFunctions(namespace)
260-
261-
override def loadFunction(ident: Identifier): UnboundFunction =
262-
getSessionCatalog().loadFunction(ident)
263-
264257
}

src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala

-5
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import org.apache.hadoop.fs.{FileSystem, Path}
1111
import org.apache.spark.sql.catalyst.TableIdentifier
1212
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
1313
import org.apache.spark.sql.catalyst.catalog._
14-
import org.apache.spark.sql.catalyst.plans.logical.TableSpec
1514
import org.apache.spark.sql.connector.catalog.{Identifier, Table}
1615
import org.apache.spark.sql.connector.expressions.Transform
1716
import org.apache.spark.sql.delta.DeltaLog
@@ -47,10 +46,6 @@ object QbeastCatalogUtils {
4746
provider.isDefined && provider.get == QBEAST_PROVIDER_NAME
4847
}
4948

50-
def isQbeastProvider(tableSpec: TableSpec): Boolean = {
51-
tableSpec.provider.contains(QBEAST_PROVIDER_NAME)
52-
}
53-
5449
def isQbeastProvider(properties: Map[String, String]): Boolean = isQbeastProvider(
5550
properties.get("provider"))
5651

src/test/scala/io/qbeast/spark/QbeastIntegrationTestSpec.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import io.qbeast.spark.delta.SparkDeltaMetadataManager
1111
import io.qbeast.spark.delta.writer.{SparkDeltaDataWriter}
1212
import io.qbeast.spark.index.{SparkOTreeManager, SparkRevisionFactory}
1313
import io.qbeast.spark.table.IndexedTableFactoryImpl
14-
import org.apache.log4j.{Level}
14+
import org.apache.log4j.{Level, Logger}
1515
import org.apache.spark.SparkConf
1616
import org.apache.spark.sql.internal.SQLConf
1717
import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -32,6 +32,8 @@ import java.nio.file.Files
3232
* }}}
3333
*/
3434
trait QbeastIntegrationTestSpec extends AnyFlatSpec with Matchers with DatasetComparer {
35+
// This reduce the verbosity of Spark
36+
Logger.getLogger("org.apache").setLevel(Level.WARN)
3537

3638
// Spark Configuration
3739
// Including Session Extensions and Catalog
@@ -78,7 +80,6 @@ trait QbeastIntegrationTestSpec extends AnyFlatSpec with Matchers with DatasetCo
7880
.appName("QbeastDataSource")
7981
.config(sparkConf)
8082
.getOrCreate()
81-
spark.sparkContext.setLogLevel(Level.WARN.toString)
8283
try {
8384
testCode(spark)
8485
} finally {

src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,7 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite
163163
val qbeastCatalog = createQbeastCatalog(spark)
164164
qbeastCatalog.loadNamespaceMetadata(defaultNamespace) shouldBe Map(
165165
"comment" -> "default database",
166-
"location" -> ("file:" + tmpLocation),
167-
"owner" -> scala.util.Properties.userName).asJava
166+
"location" -> ("file:" + tmpLocation)).asJava
168167
})
169168

170169
it should "alter namespace" in withQbeastContextSparkAndTmpWarehouse((spark, tmpLocation) => {
@@ -180,7 +179,6 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite
180179
qbeastCatalog.loadNamespaceMetadata(newNamespace) shouldBe Map(
181180
"comment" -> "",
182181
"location" -> ("file:" + tmpLocation + "/new_namespace.db"),
183-
"owner" -> scala.util.Properties.userName,
184182
"newPropertie" -> "newValue").asJava
185183

186184
})
@@ -193,7 +191,7 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite
193191
qbeastCatalog.listNamespaces() shouldBe Array(defaultNamespace, Array("new_namespace"))
194192

195193
// Drop Namespace
196-
qbeastCatalog.dropNamespace(newNamespace, true)
194+
qbeastCatalog.dropNamespace(newNamespace)
197195

198196
qbeastCatalog.listNamespaces() shouldBe Array(defaultNamespace)
199197

src/test/scala/io/qbeast/spark/utils/ConvertToQbeastTest.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class ConvertToQbeastTest
6363
val sourceDf = spark.read.format(fileFormat).load(tmpDir)
6464
val qbeastDf = spark.read.format("qbeast").load(tmpDir)
6565

66-
assertLargeDatasetEquality(qbeastDf, sourceDf, orderedComparison = false)
66+
assertLargeDatasetEquality(qbeastDf, sourceDf)
6767

6868
// All non-qbeast files are considered staging files and are placed
6969
// directly into the staging revision(RevisionID = 0)
@@ -83,7 +83,7 @@ class ConvertToQbeastTest
8383
val sourceDf = spark.read.format(fileFormat).load(tmpDir)
8484
val qbeastDf = spark.read.format("qbeast").load(tmpDir)
8585

86-
assertLargeDatasetEquality(qbeastDf, sourceDf, orderedComparison = false)
86+
assertLargeDatasetEquality(qbeastDf, sourceDf)
8787

8888
// All non-qbeast files are considered staging files and are placed
8989
// directly into the staging revision(RevisionID = 0)
@@ -214,7 +214,7 @@ class ConvertToQbeastTest
214214
// Compare DataFrames
215215
val sourceDf = spark.read.format(fileFormat).load(tmpDir)
216216
val qbeastDf = spark.read.format("qbeast").load(tmpDir)
217-
assertLargeDatasetEquality(qbeastDf, sourceDf, orderedComparison = false)
217+
assertLargeDatasetEquality(qbeastDf, sourceDf)
218218
})
219219

220220
"Compacting the staging revision" should "reduce the number of delta AddFiles" in
@@ -233,7 +233,7 @@ class ConvertToQbeastTest
233233
// Compare DataFrames
234234
val sourceDf = spark.read.format(fileFormat).load(tmpDir)
235235
val qbeastDf = spark.read.format("qbeast").load(tmpDir)
236-
assertLargeDatasetEquality(qbeastDf, sourceDf, orderedComparison = false)
236+
assertLargeDatasetEquality(qbeastDf, sourceDf)
237237

238238
// Standard staging revision behavior
239239
val qs = getQbeastSnapshot(spark, tmpDir)

0 commit comments

Comments
 (0)