Skip to content

Commit 348a22a

Browse files
committed
Merge pull request #447 from datastax/wip-446-npe-when-saving-nulls
Fix NPE when saving CassandraRows containing null values. Fixes #446.
2 parents 16dffbc + 2b51235 commit 348a22a

File tree

7 files changed

+51
-24
lines changed

7 files changed

+51
-24
lines changed

CHANGES.txt

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fix NPE when saving CassandraRows containing null values (#446)
2+
13
1.1.0 rc 2
24
* Added JavaTypeConverter to make is easy to implement custom TypeConverter in Java (#429)
35
* Fix SparkSQL failures caused by presence of non-selected columns of UDT type in the table.

spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/TableWriterSpec.scala

+17
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class TableWriterSpec extends FlatSpec with Matchers with BeforeAndAfter with Sh
3737
session.execute("CREATE TABLE IF NOT EXISTS write_test.key_value_9 (key INT, group BIGINT, value TEXT, PRIMARY KEY (key, group))")
3838
session.execute("CREATE TABLE IF NOT EXISTS write_test.key_value_10 (key INT, group BIGINT, value TEXT, PRIMARY KEY (key, group))")
3939

40+
session.execute("CREATE TABLE IF NOT EXISTS write_test.nulls (key INT PRIMARY KEY, text_value TEXT, int_value INT)")
4041
session.execute("CREATE TABLE IF NOT EXISTS write_test.collections (key INT PRIMARY KEY, l list<text>, s set<text>, m map<text, text>)")
4142
session.execute("CREATE TABLE IF NOT EXISTS write_test.blobs (key INT PRIMARY KEY, b blob)")
4243
session.execute("CREATE TABLE IF NOT EXISTS write_test.counters (pkey INT, ckey INT, c1 counter, c2 counter, PRIMARY KEY (pkey, ckey))")
@@ -130,6 +131,22 @@ class TableWriterSpec extends FlatSpec with Matchers with BeforeAndAfter with Sh
130131
}
131132
}
132133

134+
it should "write null values" in {
135+
val key = 1.asInstanceOf[AnyRef]
136+
val row = new CassandraRow(IndexedSeq(key, null, null), IndexedSeq("key", "text_value", "int_value"))
137+
138+
sc.parallelize(Seq(row)).saveToCassandra("write_test", "nulls")
139+
conn.withSessionDo { session =>
140+
val result = session.execute("SELECT * FROM write_test.nulls").all()
141+
result should have size 1
142+
for (r <- result) {
143+
r.getInt(0) shouldBe key
144+
r.isNull(1) shouldBe true
145+
r.isNull(2) shouldBe true
146+
}
147+
}
148+
}
149+
133150
it should "write only specific column data if ColumnNames is passed as 'columnNames'" in {
134151
val col = Seq((1, 1L, None))
135152
sc.parallelize(col).saveToCassandra("write_test", "key_value_8", SomeColumns("key", "group"))

spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/CassandraRow.scala

+27-18
Original file line numberDiff line numberDiff line change
@@ -99,34 +99,43 @@ final class CassandraRow(data: IndexedSeq[AnyRef], columnNames: IndexedSeq[Strin
9999
* Looks the column up by column name. Column names are case-sensitive.*/
100100
def get[T](name: String)(implicit c: TypeConverter[T]): T =
101101
get[T](_indexOfOrThrow(name))
102-
103-
/** Equivalent to `getAny` */
104-
def apply(index: Int): Any = getAny(index)
105-
def apply(name: String): Any = getAny(name)
106-
107-
def get(index: Int): AnyRef = getAnyRef(index)
108-
def get(name: String): AnyRef = getAnyRef(name)
109102

110103
/** Returns a column value without applying any conversion.
111104
* The underlying type is the same as the type returned by the low-level Cassandra driver.
112105
* May return Java null. */
113-
def getAny(index: Int) = get[Any](index)
114-
def getAny(name: String) = get[Any](name)
106+
@deprecated("Use getRaw instead", "1.1")
107+
def getAny(index: Int) = getRaw(index)
108+
@deprecated("Use getRaw instead", "1.1")
109+
def getAny(name: String) = getRaw(name)
115110

116111
/** Returns a column value without applying any conversion, besides converting a null to a None.
117112
* The underlying type is the same as the type returned by the low-level Cassandra driver.*/
118-
def getAnyOption(index: Int) = get[Option[Any]](index)
119-
def getAnyOption(name: String) = get[Option[Any]](name)
113+
@deprecated("Use getRaw and wrap the result in an Option instead", "1.1")
114+
def getAnyOption(index: Int) = Option(getRaw(index))
115+
@deprecated("Use getRaw and wrap the result in an Option instead", "1.1")
116+
def getAnyOption(name: String) = Option(getRaw(name))
120117

121-
/** Returns a column value by index without applying any conversion.
122-
* The underlying type is the same as the type returned by the low-level Cassandra driver. */
123-
def getAnyRef(index: Int) = get[AnyRef](index)
124-
def getAnyRef(name: String) = get[AnyRef](name)
118+
/** Returns a column value without applying any conversion.
119+
* The underlying type is the same as the type returned by the low-level Cassandra driver.
120+
* May return Java null. */
121+
@deprecated("Use getRaw instead", "1.1")
122+
def getAnyRef(index: Int) = getRaw(index)
123+
@deprecated("Use getRaw instead", "1.1")
124+
def getAnyRef(name: String) = getRaw(name)
125125

126126
/** Returns a column value without applying any conversion, besides converting a null to a None.
127-
* The underlying type is the same as the type returned by the low-level Cassandra driver. */
128-
def getAnyRefOption(index: Int) = get[Option[AnyRef]](index)
129-
def getAnyRefOption(name: String) = get[Option[AnyRef]](name)
127+
* The underlying type is the same as the type returned by the low-level Cassandra driver.*/
128+
@deprecated("Use getRaw and wrap the result in an Option instead", "1.1")
129+
def getAnyRefOption(index: Int) = Option(getRaw(index))
130+
@deprecated("Use getRaw and wrap the result in an Option instead", "1.1")
131+
def getAnyRefOption(name: String) = Option(getRaw(name))
132+
133+
/** Returns a column value by index without applying any conversion.
134+
* The underlying type is the same as the type returned by the low-level Cassandra driver,
135+
* is implementation defined and may change in the future.
136+
* Cassandra nulls are returned as Scala nulls. */
137+
def getRaw(index: Int): AnyRef = data(index)
138+
def getRaw(name: String): AnyRef = data(_indexOfOrThrow(name))
130139

131140
/** Returns a `bool` column value. Besides working with `bool` Cassandra type, it can also read
132141
* numbers and strings. Non-zero numbers are converted to `true`, zero is converted to `false`.

spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ case class SetType[T](elemType: ColumnType[T]) extends CollectionColumnType[Set[
3535

3636
case class MapType[K, V](keyType: ColumnType[K], valueType: ColumnType[V]) extends CollectionColumnType[Map[K, V]] {
3737
@transient
38-
lazy val converterToCassandra: TypeConverter[_] =
38+
lazy val converterToCassandra =
3939
new OptionToNullConverter(
4040
TypeConverter.javaHashMapConverter(keyType.converterToCassandra, valueType.converterToCassandra))
4141

spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/ColumnType.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ trait ColumnType[T] extends Serializable {
1111

1212
/** Returns a converter that converts values to the type of this column expected by the
1313
* Cassandra Java driver when saving the row.*/
14-
def converterToCassandra: TypeConverter[_]
14+
def converterToCassandra: TypeConverter[_ <: AnyRef]
1515

1616
/** Returns a converter that converts values to the Scala type associated with this column. */
1717
lazy val converterToScala: TypeConverter[T] =

spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -631,14 +631,13 @@ object TypeConverter {
631631
new JavaHashMapConverter[K, V]
632632

633633
/** Converts Scala Options to Java nullable references. Used when saving data to Cassandra. */
634-
class OptionToNullConverter(nestedConverter: TypeConverter[_]) extends TypeConverter[AnyRef] {
634+
class OptionToNullConverter(nestedConverter: TypeConverter[_]) extends NullableTypeConverter[AnyRef] {
635635

636636
def targetTypeTag = implicitly[TypeTag[AnyRef]]
637637

638638
def convertPF = {
639639
case Some(x) => nestedConverter.convert(x).asInstanceOf[AnyRef]
640640
case None => null
641-
case null => null
642641
case x => nestedConverter.convert(x).asInstanceOf[AnyRef]
643642
}
644643
}

spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/GenericRowWriter.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ class GenericRowWriter(table: TableDef, selectedColumns: Seq[String])
1111
val index = data.indexOf(columnName)
1212
if (index >= 0) {
1313
val converter = table.columnByName(columnName).columnType.converterToCassandra
14-
val value = data.get[AnyRef](index)
15-
converter.convert(value).asInstanceOf[AnyRef]
14+
val value = data.getRaw(index)
15+
converter.convert(value)
1616
}
1717
else
1818
null

0 commit comments

Comments
 (0)