Skip to content

Commit 965b2dc

Browse files
SPARKC-706: Add basic support for Cassandra vectors (#1366)
1 parent 6c6ce1b commit 965b2dc

File tree

21 files changed

+445
-28
lines changed

21 files changed

+445
-28
lines changed

CHANGES.txt

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
3.5.1
2+
* Support for Vector type available in Cassandra 5.0 (SPARKC-706)
3+
* Upgrade Cassandra Java Driver to 4.18.1, support Cassandra 5.0 in test framework (SPARKC-710)
14

25
3.5.0
36
* Support for Apache Spark 3.5 (SPARKC-704)

README.md

+14-7
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@
66

77
## Quick Links
88

9-
| What | Where |
10-
| ---------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
11-
| Community | Chat with us at [Apache Cassandra](https://cassandra.apache.org/_/community.html#discussions) |
12-
| Scala Docs | Most Recent Release (3.5.0): [Connector API docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.0/connector/com/datastax/spark/connector/index.html), [Connector Driver docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.0/driver/com/datastax/spark/connector/index.html) |
13-
| Latest Production Release | [3.5.0](https://search.maven.org/artifact/com.datastax.spark/spark-cassandra-connector_2.12/3.5.0/jar) |
9+
| What | Where |
10+
| ---------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
11+
| Community | Chat with us at [Apache Cassandra](https://cassandra.apache.org/_/community.html#discussions) |
12+
| Scala Docs | Most Recent Release (3.5.1): [Connector API docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.1/connector/com/datastax/spark/connector/index.html), [Connector Driver docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.1/driver/com/datastax/spark/connector/index.html) |
13+
| Latest Production Release | [3.5.1](https://search.maven.org/artifact/com.datastax.spark/spark-cassandra-connector_2.12/3.5.1/jar) |
14+
15+
## News
16+
### 3.5.1
17+
- The latest release of the Spark-Cassandra-Connector introduces support for vector types, greatly enhancing its capabilities. This new feature allows developers to seamlessly integrate and work with Cassandra 5.0 and Astra vectors within the Spark ecosystem. By supporting vector types, the connector now provides insights into AI and Retrieval-Augmented Generation (RAG) data, enabling more advanced and efficient data processing and analysis.
1418

1519
## Features
1620

@@ -55,7 +59,7 @@ Currently, the following branches are actively supported:
5559

5660
| Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions |
5761
|-----------|---------------|----------------------------|-----------------------|----------------------|--------------------------|
58-
| 3.5.1 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x, 5.0 | 4.18 | 8 | 2.12, 2.13 |
62+
| 3.5.1 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x, 5.0 | 4.18.1 | 8 | 2.12, 2.13 |
5963
| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
6064
| 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
6165
| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 |
@@ -80,6 +84,9 @@ Currently, the following branches are actively supported:
8084
## Hosted API Docs
8185
API documentation for the Scala and Java interfaces are available online:
8286

87+
### 3.5.1
88+
* [Spark-Cassandra-Connector](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.1/connector/com/datastax/spark/connector/index.html)
89+
8390
### 3.5.0
8491
* [Spark-Cassandra-Connector](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.0/connector/com/datastax/spark/connector/index.html)
8592

@@ -111,7 +118,7 @@ This project is available on the Maven Central Repository.
111118
For SBT to download the connector binaries, sources and javadoc, put this in your project
112119
SBT config:
113120

114-
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.0"
121+
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.1"
115122

116123
* The default Scala version for Spark 3.0+ is 2.12 please choose the appropriate build. See the
117124
[FAQ](doc/FAQ.md) for more information.

connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala

+23-9
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ trait SparkCassandraITSpecBase
9898
}
9999

100100
override def withFixture(test: NoArgTest): Outcome = wrapUnserializableExceptions {
101-
super.withFixture(test)
101+
super.withFixture(test)
102102
}
103103

104104
def getKsName = {
@@ -145,18 +145,32 @@ trait SparkCassandraITSpecBase
145145
else report(s"Skipped Because ProtocolVersion $pv < $protocolVersion")
146146
}
147147

148-
/** Skips the given test if the Cluster Version is lower or equal to the given `cassandra` Version or `dse` Version
149-
* (if this is a DSE cluster) */
150-
def from(cassandra: Version, dse: Version)(f: => Unit): Unit = {
148+
/** Runs the given test only if the cluster type and version matches.
149+
*
150+
* @param cassandra run the test if the cluster is Cassandra >= the given version;
151+
* if `None`, never run the test for Cassandra clusters
152+
* @param dse run the test if the cluster is DSE >= the given version;
153+
* if `None`, never run the test for DSE clusters
154+
* @param f the test to run
155+
*/
156+
def from(cassandra: Version, dse: Version)(f: => Unit): Unit = from(Option(cassandra), Option(dse))(f)
157+
158+
def from(cassandra: Option[Version] = None, dse: Option[Version] = None)(f: => Unit): Unit = {
151159
if (isDse(conn)) {
152-
from(dse)(f)
160+
dse match {
161+
case Some(dseVersion) => from(dseVersion)(f)
162+
case None => report(s"Skipped because not DSE")
163+
}
153164
} else {
154-
from(cassandra)(f)
165+
cassandra match {
166+
case Some(cassandraVersion) => from(cassandraVersion)(f)
167+
case None => report(s"Skipped because not Cassandra")
168+
}
155169
}
156170
}
157171

158-
/** Skips the given test if the Cluster Version is lower or equal to the given version */
159-
def from(version: Version)(f: => Unit): Unit = {
172+
/** Skips the given test if the Cluster Version is lower than the given version */
173+
private def from(version: Version)(f: => Unit): Unit = {
160174
skip(cluster.getCassandraVersion, version) { f }
161175
}
162176

@@ -172,7 +186,7 @@ trait SparkCassandraITSpecBase
172186
else f
173187
}
174188

175-
/** Skips the given test if the Cluster Version is lower or equal to the given version or the cluster is not DSE */
189+
/** Skips the given test if the Cluster Version is lower than the given version or the cluster is not DSE */
176190
def dseFrom(version: Version)(f: => Any): Unit = {
177191
dseOnly {
178192
skip(cluster.getDseVersion.get, version) { f }

connector/src/it/scala/com/datastax/spark/connector/cql/SchemaSpec.scala

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.datastax.spark.connector.cql
22

33
import com.datastax.spark.connector.SparkCassandraITWordSpecBase
4+
import com.datastax.spark.connector.ccm.CcmConfig
45
import com.datastax.spark.connector.cluster.DefaultCluster
56
import com.datastax.spark.connector.types._
67
import com.datastax.spark.connector.util.schemaFromCassandra
@@ -49,6 +50,9 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
4950
s"""CREATE INDEX test_d9_m23423ap_idx ON $ks.test (full(d10_set))""")
5051
session.execute(
5152
s"""CREATE INDEX test_d7_int_idx ON $ks.test (d7_int)""")
53+
from(Some(CcmConfig.V5_0_0), None) {
54+
session.execute(s"ALTER TABLE $ks.test ADD d17_vector frozen<vector<int,3>>")
55+
}
5256

5357
for (i <- 0 to 9) {
5458
session.execute(s"insert into $ks.test (k1,k2,k3,c1,c2,c3,d10_set) " +
@@ -111,8 +115,8 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
111115

112116
"allow to read regular column definitions" in {
113117
val columns = table.regularColumns
114-
columns.size shouldBe 16
115-
columns.map(_.columnName).toSet shouldBe Set(
118+
columns.size should be >= 16
119+
columns.map(_.columnName).toSet should contain allElementsOf Set(
116120
"d1_blob", "d2_boolean", "d3_decimal", "d4_double", "d5_float",
117121
"d6_inet", "d7_int", "d8_list", "d9_map", "d10_set",
118122
"d11_timestamp", "d12_uuid", "d13_timeuuid", "d14_varchar",
@@ -136,6 +140,9 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
136140
table.columnByName("d14_varchar").columnType shouldBe VarCharType
137141
table.columnByName("d15_varint").columnType shouldBe VarIntType
138142
table.columnByName("d16_address").columnType shouldBe a [UserDefinedType]
143+
from(Some(CcmConfig.V5_0_0), None) {
144+
table.columnByName("d17_vector").columnType shouldBe VectorType(IntType, 3)
145+
}
139146
}
140147

141148
"allow to list fields of a user defined type" in {

connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption
99
import com.datastax.oss.driver.api.core.cql.SimpleStatement
1010
import com.datastax.oss.driver.api.core.cql.SimpleStatement._
1111
import com.datastax.spark.connector._
12-
import com.datastax.spark.connector.ccm.CcmConfig.{DSE_V6_7_0, V3_6_0}
12+
import com.datastax.spark.connector.ccm.CcmConfig.{DSE_V5_1_0, DSE_V6_7_0, V3_6_0}
1313
import com.datastax.spark.connector.cluster.DefaultCluster
1414
import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf}
1515
import com.datastax.spark.connector.mapper.{DefaultColumnMapper, JavaBeanColumnMapper, JavaTestBean, JavaTestUDTBean}
@@ -794,7 +794,7 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase with DefaultCluster
794794
results should contain ((KeyGroup(3, 300), (3, 300, "0003")))
795795
}
796796

797-
it should "allow the use of PER PARTITION LIMITs " in from(V3_6_0) {
797+
it should "allow the use of PER PARTITION LIMITs " in from(cassandra = V3_6_0, dse = DSE_V5_1_0) {
798798
val result = sc.cassandraTable(ks, "clustering_time").perPartitionLimit(1).collect
799799
result.length should be (1)
800800
}

connector/src/it/scala/com/datastax/spark/connector/rdd/RDDSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption._
55
import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, BoundStatement}
66
import com.datastax.oss.driver.api.core.{DefaultConsistencyLevel, DefaultProtocolVersion}
77
import com.datastax.spark.connector._
8-
import com.datastax.spark.connector.ccm.CcmConfig.V3_6_0
8+
import com.datastax.spark.connector.ccm.CcmConfig.{DSE_V5_1_0, V3_6_0}
99
import com.datastax.spark.connector.cluster.DefaultCluster
1010
import com.datastax.spark.connector.cql.CassandraConnector
1111
import com.datastax.spark.connector.embedded.SparkTemplate._
@@ -425,7 +425,7 @@ class RDDSpec extends SparkCassandraITFlatSpecBase with DefaultCluster {
425425

426426
}
427427

428-
it should "should be joinable with a PER PARTITION LIMIT limit" in from(V3_6_0){
428+
it should "should be joinable with a PER PARTITION LIMIT limit" in from(cassandra = V3_6_0, dse = DSE_V5_1_0){
429429
val source = sc.parallelize(keys).map(x => (x, x * 100))
430430
val someCass = source
431431
.joinWithCassandraTable(ks, wideTable, joinColumns = SomeColumns("key", "group"))

0 commit comments

Comments
 (0)