-
Notifications
You must be signed in to change notification settings - Fork 25
Working with JDBC API
Important
Using this driver, note that all the queries must be expressed in CQL3 (Cassandra Query Language).
The following code shows how to instantiate a connection to a Cassandra database using the JDBC driver:
public class HelloCassandra {
public static void main(final String[] args) {
// Used driver: com.ing.data.cassandra.cassandra.jdbc.CassandraDriver
final String url = "jdbc:cassandra://host1--host2--host3:9042/keyspace?localdatacenter=DC1";
final Connection connection = DriverManager.getConnection(url);
}
}
You can also obtain a connection using a CassandraDataSource
instance:
public class HelloCassandra {
public static void main(final String[] args) {
final List<ContactPoint> contactPoints = Arrays.asList(
ContactPoint.of("host1", 9042),
ContactPoint.of("host2", 9042),
ContactPoint.of("host3", 9042));
final CassandraDataSource dataSource = new CassandraDataSource(contactPoints, "keyspace");
dataSource.setLocalDataCenter("DC1");
// Using setters on dataSource instance to specify additional properties of the connection.
final Connection connection = dataSource.getConnection();
}
}
If you want to use a pre-existing session, you can directly build a new CassandraConnection
with the constructor CassandraConnection(Session, String, ConsistencyLevel, debugMode, OptionSet)
. For example:
public class HelloCassandraWithSession {
public static void main(final String[] args) {
final CqlSession session = CqlSession.builder()
.addContactPoint(new InetSocketAddress("localhost", 9042))
.withLocalDatacenter("DC1")
.build();
final Connection connection = new CassandraConnection(
session, "keyspace", ConsistencyLevel.ALL, false, new Default()
);
}
}
To issue a simple select query and get data from it, you can follow this example:
public class HelloCassandra {
public void selectValuesFromCassandra(final Connection connection) {
final Statement statement = connection.createStatement();
final ResultSet result = statement.executeQuery(
"SELECT b_value, i_value FROM test_table WHERE keyname = 'key0';"
);
while (result.next()) {
System.out.println("b_value = " + result.getBoolean("b_value"));
System.out.println("i_value = " + result.getInt("i_value"));
}
}
}
Considering the following table:
CREATE TABLE example_table (
bigint_col bigint PRIMARY KEY,
ascii_col ascii,
blob_col blob,
boolean_col boolean,
decimal_col decimal,
double_col double,
float_col float,
inet_col inet,
int_col int,
smallint_col smallint,
text_col text,
timestamp_col timestamp,
time_col time,
date_col date,
tinyint_col tinyint,
duration_col duration,
uuid_col uuid,
timeuuid_col timeuuid,
varchar_col varchar,
varint_col varint,
string_set_col set<text>,
string_list_col list<text>,
string_map_col map<text, text>,
vector_col vector<float, 5>
);
To insert a record into example_table
using a prepared statement, you can use a code similar to the following:
import com.datastax.oss.driver.api.core.data.CqlDuration;
import com.datastax.oss.driver.api.core.data.CqlVector;
import java.io.ByteArrayInputStream;
import java.sql.Date;
public class HelloCassandra {
public void insertRecordToCassandraTable(final Connection connection) {
final Statement statement = connection.createStatement();
final String insertCql = "INSERT INTO example_table (bigint_col, ascii_col, blob_col, boolean_col, decimal_col, "
+ "double_col, float_col, inet_col, int_col, smallint_col, text_col, timestamp_col, time_col, date_col, "
+ "tinyint_col, duration_col, uuid_col, timeuuid_col, varchar_col, varint_col, string_set_col, "
+ "string_list_col, string_map_col, vector_col) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), ?, ?, ?, ?, ?, ?);";
final PreparedStatement preparedStatement = connection.prepareStatement(insertCql);
preparedStatement.setObject(1, 1L); // bigint
preparedStatement.setObject(2, "test"); // ascii
final ByteArrayInputStream baInputStream = new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8));
preparedStatement.setObject(3, baInputStream); // blob
// Alternatively, you can also use byte arrays for blobs:
preparedStatement.setObject(3, "test".getBytes(StandardCharsets.UTF_8));
preparedStatement.setObject(4, true); // boolean
preparedStatement.setObject(5, new BigDecimal(5.1)); // decimal
preparedStatement.setObject(6, (double) 5.1); // double
preparedStatement.setObject(7, (float) 5.1); // float
final InetAddress inet = InetAddress.getLocalHost();
preparedStatement.setObject(8, inet); // inet
preparedStatement.setObject(9, 1); // int
preparedStatement.setObject(10, 1); // smallint
preparedStatement.setObject(11, "test"); // text
final long now = OffsetDateTime.now().toEpochSecond() * 1_000;
preparedStatement.setObject(12, new Timestamp(now)); // timestamp
preparedStatement.setObject(13, new Time(now)); // time
preparedStatement.setObject(14, new Date(now)); // date
preparedStatement.setObject(15, 1); // tinyint
final CqlDuration duration = CqlDuration.from("12h30m15s");
preparedStatement.setObject(16, duration); // duration
final UUID uuid = UUID.randomUUID();
preparedStatement.setObject(17, uuid); // uuid
preparedStatement.setObject(18, "test"); // varchar
preparedStatement.setObject(19, 1); // varint
final HashSet<String> sampleSet = new HashSet<String>();
sampleSet.add("test1");
sampleSet.add("test2");
preparedStatement.setObject(20, sampleSet); // set
final ArrayList<String> sampleList = new ArrayList<String>();
sampleList.add("test1");
sampleList.add("test2");
preparedStatement.setObject(21, sampleList); // list
// Alternatively, you can use setArray method for lists:
preparedStatement.setArray(21, new ArrayImpl(sampleList));
final HashMap<String, String> sampleMap = new HashMap<String, String>();
sampleMap.put("1", "test1");
sampleMap.put("2", "test2");
preparedStatement.setObject(22, sampleMap); // map
final CqlVector<Float> sampleVector = CqlVector.newInstance(1.0f, 0.0f, 1.0f, 0.5f, 0.2f);
preparedStatement.setObject(23, sampleVector); // vector
// Execute the prepare statement.
preparedStatement.execute();
}
}
There are two ways to insert/update data using asynchronous queries. The first one is to use JDBC batches (we're not talking about Cassandra atomic batches here).
With simple statements:
public class HelloCassandra {
public void insertUsingJdbcBatches(final Connection connection) {
final Statement statement = connection.createStatement();
for (int i = 0; i < 10; i++) {
statement.addBatch("INSERT INTO test_table (key_value, l_value) VALUES (" + i + ", [1, 3, 12345])");
}
final int[] counts = statement.executeBatch();
statement.close();
}
}
With prepared statements:
public class HelloCassandra {
public void insertUsingJdbcBatches(final Connection connection) {
final PreparedStatement statement = connection.prepareStatement(
"INSERT INTO test_table (key_value, l_value) VALUES (?, ?)"
);
for (int i = 0; i < 10; i++) {
statement.setInt(1, i);
statement.setObject(2, Arrays.asList(1, 3, 12345));
statement.addBatch();
}
final int[] counts = statement.executeBatch();
statement.close();
}
}
The second one is to put all the queries in a single CQL statement, each ended with a semicolon (;
):
public class HelloCassandra {
public void insertUsingSingleCqlStatement(final Connection connection) {
final Statement statement = connection.createStatement();
final StringBuilder queryBuilder = new StringBuilder();
for (int i = 0; i < 10; i++) {
queryBuilder.append("INSERT INTO test_table (key_value, l_value) VALUES (")
.append(i)
.append(", [1, 3, 12345]);");
}
statement.execute(queryBuilder.toString());
statement.close();
}
}
As JDBC batches do not support returning result sets, there is only one way to send asynchronous select queries through the JDBC driver:
public class HelloCassandra {
public void multipleSelectQueries(final Connection connection) {
final StringBuilder queries = new StringBuilder();
for (int i = 0; i < 10; i++) {
queries.append("SELECT * FROM test_table WHERE key_value = ").append(i).append(";");
}
// Send all the select queries at once.
final Statement statement = connection.createStatement();
final ResultSet result = statement.executeQuery(queries.toString());
// Get all the results from all the select queries in a single result set.
final ArrayList<Integer> ids = new ArrayList<>();
while (result.next()){
ids.add(result.getInt("key_value"));
}
}
}
Note
Make sure you send select queries that return the exact same columns, or you might get pretty unpredictable results.
To create a new Tuple
object in Java (see Tuple documentation), use the com.datastax.oss.driver.api.core.type.DataTypes.tupleOf(...).newValue()
method.
Note that the UDT (User-Defined Types) fields cannot be instantiated outside the Java Driver for Apache Cassandra® core. If you want to use prepared statements, you must proceed as in the following example:
public class HelloCassandra {
public void insertTuples(final Connection connection) {
final Statement statement = connection.createStatement();
final String createUDT = "CREATE TYPE IF NOT EXISTS fieldmap (key text, value text)";
final String createTbl = "CREATE TABLE t_udt (id bigint PRIMARY KEY, field_values frozen<fieldmap>, "
+ "the_tuple frozen<tuple<int, text, float>>, "
+ "the_other_tuple frozen<tuple<int, text, float>>);";
statement.execute(createUDT);
statement.execute(createTbl);
statement.close();
final String insertCql = "INSERT INTO t_udt (id, field_values, the_tuple, the_other_tuple) "
+ "VALUES (?, {key : ?, value : ?}, (?, ?, ?), ?);";
final TupleValue tuple = DataTypes.tupleOf(DataTypes.INT, DataTypes.TEXT, DataTypes.FLOAT).newValue();
tuple.setInt(0, 1).setString(1, "midVal").setFloat(2, (float)2.0);
final PreparedStatement preparedStatement = con.prepareStatement(insertCql);
preparedStatement.setLong(1, 1L);
preparedStatement.setString(2, "key1");
preparedStatement.setString(3, "value1");
preparedStatement.setInt(4, 1);
preparedStatement.setString(5, "midVal");
preparedStatement.setFloat(6, (float)2.0);
preparedStatement.setObject(7, (Object)tuple);
// Execute the prepared statement.
preparedStatement.execute();
preparedStatement.close();
}
}
When working on collections of UDTs, it is not possible to use prepared statements. You then have to use simple statements as follows:
public class HelloCassandra {
public void insertCollectionsOfUDT(final Connection connection) {
final Statement statement = connection.createStatement();
final String createUDT = "CREATE TYPE IF NOT EXISTS fieldmap (key text, value text)";
final String createTbl = "CREATE TABLE t_udt_tuple_coll (id bigint PRIMARY KEY, "
+ "field_values set<frozen<fieldmap>>, "
+ "the_tuple list<frozen<tuple<int, text, float>>>, "
+ "field_values_map map<text,frozen<fieldmap>>, "
+ "tuple_map map<text,frozen<tuple<int,int>>>);";
statement.execute(createUDT);
statement.execute(createTbl);
statement.close();
final Statement insertStatement = con.createStatement();
final String insertCql = "INSERT INTO t_udt_tuple_coll "
+ "(id, field_values, the_tuple, field_values_map, tuple_map) "
+ "VALUES (1, {{key : 'key1', value : 'value1'}, {key : 'key2', value : 'value2'}}, "
+ "[(1, 'midVal1', 1.0), (2, 'midVal2', 2.0)], "
+ "{'map_key1' : {key : 'key1', value : 'value1'},"
+ "'map_key2' : {key : 'key2', value : 'value2'}}, "
+ "{'tuple1' : (1, 2), 'tuple2' : (2, 3)});";
// Execute the statement.
insertStatement.execute(insertCql);
insertStatement.close();
}
}
In order to ease the usage of JSON features included in Cassandra, some non-JDBC standard functions are included in this wrapper:
- on the one hand, to deserialize into Java objects the JSON objects returned by Cassandra in
ResultSet
s. - on the other hand, to serialize Java objects into JSON objects usable in
PreparedStatement
s sent to Cassandra.
Considering the following Java classes:
public class JsonEntity {
@JsonProperty("col_int")
private int colInt;
@JsonProperty("col_text")
private String colText;
@JsonProperty("col_udt")
private JsonSubEntity colUdt;
public JsonEntity (final int colInt, final String colText, final JsonSubEntity colUdt) {
this.colInt = colInt;
this.colText = colText;
this.colUdt = colUdt;
}
}
public class JsonSubEntity {
@JsonProperty("text_val")
private String textVal;
@JsonProperty("bool_val")
private boolean boolVal;
public JsonSubEntity (final String textVal, final boolean boolVal) {
this.textVal = textVal;
this.boolVal = boolVal;
}
}
The class JsonSubEntity
above corresponds to the UDT subtype
in our Cassandra keyspace and the class JsonEntity
matches the columns of the table t_using_json
:
CREATE TYPE IF NOT EXISTS subtype (text_val text, bool_val boolean);
CREATE TABLE t_using_json (col_int int PRIMARY KEY, col_text text, col_udt frozen<subtype>);
In the CassandraResultSet
returned by select queries, we can use the JSON support of Cassandra and the JDBC wrapper to directly map the returned JSON to a Java object as shown below:
public class HelloCassandra {
public void selectJson(final Connection connection) {
// Using SELECT JSON syntax
final Statement selectStatement1 = sqlConnection.createStatement();
final ResultSet resultSet1 = selectStatement1.executeQuery("SELECT JSON * FROM t_using_json WHERE col_int = 1;");
resultSet1.next();
final CassandraResultSet cassandraResultSet1 = (CassandraResultSet) resultSet1;
final JsonEntity jsonEntity = cassandraResultSet1.getObjectFromJson(JsonEntity.class);
// Using toJson() function
final Statement selectStatement2 = sqlConnection.createStatement();
final ResultSet resultSet2 = selectStatement2.executeQuery(
"SELECT toJson(col_udt) AS json_udt FROM t_using_json WHERE col_int = 1;"
);
resultSet2.next();
final CassandraResultSet cassandraResultSet2 = (CassandraResultSet) resultSet2;
final JsonSubEntity jsonSubEntity = cassandraResultSet2.getObjectFromJson("json_udt", JsonSubEntity.class);
}
}
In the CassandraPreparedStatement
, we can use the JSON support of Cassandra and the JDBC wrapper to serialize a Java object into JSON to pass to Cassandra as shown below:
public class HelloCassandra {
public void insertJson(final Connection connection) {
// Using INSERT INTO ... JSON syntax
final CassandraPreparedStatement insertStatement1 = connection.prepareStatement("INSERT INTO t_using_json JSON ?;");
insertStatement1.setJson(1, new JsonEntity(1, "a text value", new JsonSubEntity("1.1", false)));
insertStatement1.execute();
insertStatement1.close();
// Using fromJson() function
final CassandraPreparedStatement insertStatement2 =
connection.prepareStatement("INSERT INTO t_using_json (col_int, col_text, col_udt) VALUES (?, ?, fromJson(?));");
insertStatement2.setInt(1, 2);
insertStatement2.setString(2, "another text value");
insertStatement2.setInt(3, new JsonSubEntity("2.1", true));
insertStatement2.execute();
insertStatement2.close();
}
}
- The serialization/deserialization uses Jackson library.
- Ensure having a default constructor in the target Java class to avoid deserialization issues.
- The JSON keys returned by Cassandra are fully lower case. So, if necessary, use
@JsonProperty
Jackson annotation in the target Java class. - Each field of the target Java class must use a Java type supported for deserialization of the corresponding CQL type as listed in the Javadoc of the interface
CassandraResultSetJsonSupport
and for serialization to the corresponding CQL type as listed in the Javadoc of the interfaceCassandraStatementJsonSupport
.
Note
This section is only applicable to the versions ≥4.13.0.
Some of the special CQL commands available through the cqlsh
CLI can be executed using methods execute()
or executeQuery()
of a CassandraStatement
instance.
Warning
Special CQL commands are not available neither via executeBatch()
method nor in PreparedStatement
instances.
You can modify the consistency level at any moment using the special CQL command CONSISTENCY [level]
.
If there are multiple statements in the same query, the execution will become synchronous and sequential to support consistency level changes correctly.
Example:
// Use consistency level 'LOCAL_ONE' for the next queries.
connection.createStatement().execute("CONSISTENCY LOCAL_ONE");
// Check the current consistency level.
Statement statement = connection.createStatement();
statement.execute("CONSISTENCY");
ResultSet resultSet = statement.getResultSet();
resultSet.next();
String currentConsistencyLevel = resultSet.getString("consistency_level");
System.out.println(currentConsistencyLevel); // This will print: LOCAL_ONE
Note
This section is only applicable to the versions ≥4.14.0.
You can modify the serial consistency level at any moment using the special CQL command SERIAL CONSISTENCY [level]
.
If there are multiple statements in the same query, the execution will become synchronous and sequential to support consistency level changes correctly.
Example:
// Use serial consistency level 'LOCAL_SERIAL' for the next queries.
connection.createStatement().execute("SERIAL CONSISTENCY LOCAL_SERIAL");
// Check the current serial consistency level.
Statement statement = connection.createStatement();
statement.execute("SERIAL CONSISTENCY");
ResultSet resultSet = statement.getResultSet();
resultSet.next();
String currentSerialConsistencyLevel = resultSet.getString("serial_consistency_level");
System.out.println(currentSerialConsistencyLevel); // This will print: LOCAL_SERIAL