Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARKC-706: Add basic support for Cassandra vectors #1366

Merged
merged 6 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion doc/14_data_frames.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ CREATE TABLE casscatalog.ksname.testTable (key_1 Int, key_2 Int, key_3 Int, cc1

Any statements that involve creating a Table are also supported like `CREATE TABLE AS SELECT`

[//]: # (TODO add section for vector)
Note that creating columns of Cassandra vector type is not supported yet. Such columns have to
be created manually with CQL.

#### Altering Tables

Expand Down
49 changes: 45 additions & 4 deletions doc/2_loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ CREATE TABLE test.words (word text PRIMARY KEY, count int);

Load data into the table:

```scala
```sql
INSERT INTO test.words (word, count) VALUES ('foo', 20);
INSERT INTO test.words (word, count) VALUES ('bar', 20);
```
Expand Down Expand Up @@ -184,7 +184,49 @@ val street = address.getString("street")
val number = address.getInt("number")
```

[//]: # (TODO loading vectors)
### Reading vectors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


You can read vector columns in a Cassandra table similarly
to reading lists using `getList` or generic `get` methods of the
`CassandraRow` object.

Assuming you set up the test keyspace earlier, follow these steps
to access a Cassandra collection.

In the test keyspace, set up a collection set using cqlsh:

```sql
CREATE TABLE test.things (id int PRIMARY KEY, name text, features vector<float, 3>);
INSERT INTO test.things (id, name, features) VALUES (1, 'a', [1.0, 2.0, 3.0]);
INSERT INTO test.things (id, name, features) VALUES (2, 'b', [2.2, 2.1, 2.0]);
INSERT INTO test.things (id, name, features) VALUES (3, 'c', [1.0, 1.5, 4.0]);
```

Then in your application, retrieve the first row:

```scala
val row = sc.cassandraTable("test", "things").first
// row: com.datastax.spark.connector.CassandraRow = CassandraRow{id: 2, features: [2.2, 2.1, 2.0], name: b}
```

Query the vectors in Cassandra from Spark:

```scala
row.getList[Float]("features") // Vector[Float] = Vector(2.2, 2.1, 2.0)
row.get[List[Float]]("features") // List[Float] = List(2.2, 2.1, 2.0)
row.get[Seq[Double]]("features") // Seq[Double] = List(2.200000047683716, 2.0999999046325684, 2.0)
row.get[IndexedSeq[Int]]("features") // IndexedSeq[Int] = Vector(2, 2, 2)
row.get[Set[Long]]("features") // Set[Long] = Set(2)
```

It is also possible to convert a vector to CQL `String` representation:

```scala
scala> row.get[String]("features") // String = [2.2, 2.1, 2.0]
```

A `null` vector is equivalent to an empty list. You can also use
`get[Option[List[...]]]` to get `None` in case of `null`.

### Data type conversions

Expand Down Expand Up @@ -215,11 +257,10 @@ The following table shows recommended Scala types corresponding to Cassandra col
| `uuid` | `java.util.UUID`
| `varchar` | `String`
| `varint` | `BigInt`, `java.math.BigInteger`
| `vector` | `Vector`, `List`, `Iterable`, `Seq`, `IndexedSeq`, `java.util.List`
| `frozen<tuple<>>` | `TupleValue`, `scala.Product`, `org.apache.commons.lang3.tuple.Pair`, `org.apache.commons.lang3.tuple.Triple`
| user defined | `UDTValue`

[//]: # (TODO add vector)

*Since `time` is encoded in nanoseconds from epoch rather than milliseconds there will be Scale
error with an automatic conversion to `java.util.Date`*

Expand Down
5 changes: 3 additions & 2 deletions doc/4_mapper.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ sc.cassandraTable[(String, Int)]("test", "words").select("word", "count").toArra

sc.cassandraTable[(Int, String)]("test", "words").select("count", "word").toArray
// Array((20,bar), (10,foo))

scala> sc.cassandraTable[(String, List[Float])]("test", "things").select("name", "features").collect
// Array[(String, List[Float])] = Array((c,List(1.0, 1.5, 4.0)), (d,List()), (b,List(2.2, 2.1, 2.0)), (a,List(1.0, 2.0, 3.0)))
```

### Mapping rows to (case) objects
Expand Down Expand Up @@ -139,6 +142,4 @@ CREATE TYPE ks.address (street text, city text, zip int)
CREATE TABLE $ks.udts(key INT PRIMARY KEY, name text, addr frozen<address>)
```

[//]: # (TODO mapping vectors)

[Next - Saving data](5_saving.md)
33 changes: 31 additions & 2 deletions doc/5_saving.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,35 @@ cqlsh> Select * from ks.collections_mod where key = 1
(1 rows)
```

[//]: # (TODO saving vectors)
## Saving Cassandra vectors

```sql
CREATE TABLE test.things (
id int PRIMARY KEY,
name text,
features vector<float, 3>
);
```

```scala
val newData = sc.parallelize(Seq((5, "e", List(5, 6, 7)), (6, "f", List(6, 7, 8))))
// newData: org.apache.spark.rdd.RDD[(Int, String, List[Int])] = ParallelCollectionRDD[...]

newData.saveToCassandra("test", "things", SomeColumns("id", "name", "features"))
```

```sql
cqlsh> select * from test.things ;

id | features | name
---+---------------+------
5 | [5, 6, 7] | e
6 | [6, 7, 8] | f

(2 rows)
```
Note that Cassandra vectors are fixed size and are not capable of adding or removing
elements from them.

## Saving objects of Cassandra User Defined Types
To save structures consisting of many fields, use a [Case Class](4_mapper.md#Mapping-User-Defined-Types) or a
Expand Down Expand Up @@ -483,7 +511,8 @@ val rddOut = rdd.map(s => outData(s._1, s._2(0), s._2(1), s._3))
rddOut.saveAsCassandraTableEx(table, SomeColumns("col1", "col2", "col3", "col4"))
```

[//]: # (TODO add a note about vector)
Note that creating columns of Cassandra vector type is not supported yet and each
time you want to save vectors you need to create the table manually with CQL.

## Deleting Rows and Columns
`RDD.deleteFromCassandra(keyspaceName, tableName)` deletes specific rows
Expand Down
3 changes: 1 addition & 2 deletions doc/6_advanced_mapper.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,9 @@ Cassandra column type | Object type to convert from / to
`uuid` | `java.util.UUID`
`varchar` | `java.lang.String`
`varint` | `java.math.BigInteger`
`vector` | `com.datastax.oss.driver.api.core.data.CqlVector`
user defined | `com.datastax.spark.connector.UDTValue`

[//]: # (todo add vector type)

Custom converters for collections are not supported.

When defining your own `TypeConverter` make sure it is `Serializable` and
Expand Down
2 changes: 0 additions & 2 deletions doc/7_java_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ in [Working with user-defined case classes and tuples](4_mapper.md) and
Since 1.2, it is possible to easily provide custom column name to property
name translation by `select` method.

[//]: # (TODO add vector example)

#### Example Reading a Cassandra Table with into a Bean Class with Differently Named Fields
Say we have a table `people2` with columns `id INT`, `last_name TEXT`, `date_of_birth TIMESTAMP` and
we want to map the rows of this table to objects of `Person` class.
Expand Down
Loading