diff --git a/src/java/org/apache/cassandra/rocksdb/RocksDBCF.java b/src/java/org/apache/cassandra/rocksdb/RocksDBCF.java index 900188ef936c..c24792989271 100644 --- a/src/java/org/apache/cassandra/rocksdb/RocksDBCF.java +++ b/src/java/org/apache/cassandra/rocksdb/RocksDBCF.java @@ -57,6 +57,7 @@ import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Hex; + import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; import org.rocksdb.CassandraCompactionFilter; @@ -64,7 +65,6 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionPriority; -import org.rocksdb.CompressionType; import org.rocksdb.DBOptions; import org.rocksdb.Env; import org.rocksdb.FlushOptions; @@ -75,6 +75,7 @@ import org.rocksdb.RocksDBException; import org.rocksdb.Statistics; import org.rocksdb.StatsLevel; +import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import static org.apache.cassandra.rocksdb.RocksDBConfigs.MERGE_OPERANDS_LIMIT; @@ -98,7 +99,6 @@ public class RocksDBCF implements RocksDBCFMBean private final CassandraValueMergeOperator mergeOperator; private final ReadOptions readOptions; - private final WriteOptions disableWAL; private final FlushOptions flushOptions; private final int gcGraceSeconds; @@ -152,7 +152,6 @@ public RocksDBCF(ColumnFamilyStore cfs) throws RocksDBException // until compaction happens. However in our case, range deletion is only used to remove ranges // no longer owned by this node. In such case, stale keys would never be quried. readOptions = new ReadOptions().setIgnoreRangeDeletions(true); - disableWAL = new WriteOptions().setDisableWAL(true); flushOptions = new FlushOptions().setWaitForFlush(true); // Register the mbean. @@ -309,17 +308,14 @@ public RocksDBIteratorAdapter newShardIterator(int shardId, ReadOptions options) return new RocksDBIteratorAdapter(rocksDB.newIterator(options), rocksMetrics); } - public void merge(DecoratedKey partitionKey, byte[] key, byte[] value) throws RocksDBException + public void write(DecoratedKey partitionKey, WriteBatch batch, boolean writeCommitLog) throws RocksDBException { - RocksDB rocksDB = getRocksDBFromKey(partitionKey); - if (RocksDBConfigs.DISABLE_WRITE_TO_COMMITLOG) - { - rocksDB.merge(disableWAL, key, value); - } - else - { - rocksDB.merge(key, value); + WriteOptions options = new WriteOptions(); + if(!writeCommitLog) { + options.setDisableWAL(true); } + + getRocksDBFromKey(partitionKey).write(options, batch); } public void deleteRange(byte[] start, byte[] end) throws RocksDBException diff --git a/src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java b/src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java index 2a0d43e0c930..5aa035b7e578 100644 --- a/src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java +++ b/src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java @@ -33,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ListenableFutureTask; +import org.rocksdb.WriteBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,20 +115,64 @@ public void openColumnFamilyStore(ColumnFamilyStore cfs) public void apply(ColumnFamilyStore cfs, PartitionUpdate update, UpdateTransaction indexer, boolean writeCommitLog) { + //TODO: WriteBatch takes a hint for how many bytes to reserve that we might want to use. + WriteBatch batch = new WriteBatch(); DecoratedKey partitionKey = update.partitionKey(); for (Row row : update) { - applyRowToRocksDB(cfs, writeCommitLog, partitionKey, indexer, row); + addWriteToBatch(batch, cfs, partitionKey, row, indexer); } Row staticRow = update.staticRow(); if (!staticRow.isEmpty()) { - applyRowToRocksDB(cfs, writeCommitLog, partitionKey, indexer, staticRow); + addWriteToBatch(batch, cfs, partitionKey, staticRow, indexer); + } + + RocksDBCF dbcf = rocksDBFamily.get(cfs.metadata.cfId); + try { + dbcf.write(partitionKey, batch, writeCommitLog); + } + catch (RocksDBException e) + { + logger.error(e.toString(), e); + } finally { + indexer.commit(); + } + } + + private void addWriteToBatch(WriteBatch b, + ColumnFamilyStore cfs, + DecoratedKey partitionKey, + Row row, + UpdateTransaction indexer) + { + + Clustering clustering = row.clustering(); + + byte[] rocksDBKey = RowKeyEncoder.encode(partitionKey, clustering, cfs.metadata); + byte[] rocksDBValue = RowValueEncoder.encode(cfs.metadata, row); + + b.merge(rocksDBKey, rocksDBValue); + + if (indexer != UpdateTransaction.NO_OP) + { + try + { + secondaryIndexMetrics.rsiTotalInsertions.inc(); + indexer.onInserted(row); + } + catch (RuntimeException e) + { + secondaryIndexMetrics.rsiInsertionFailures.inc(); + logger.error(e.toString(), e); + throw new StorageEngineException("Index update failed", e); + } } } + public UnfilteredRowIterator queryStorage(ColumnFamilyStore cfs, SinglePartitionReadCommand readCommand) { Partition partition = new RocksDBPartition(rocksDBFamily.get(cfs.metadata.cfId), @@ -221,49 +266,6 @@ public AbstractStreamReceiveTask getStreamReceiveTask(StreamSession session, Str return new RocksDBStreamReceiveTask(session, summary.cfId, summary.files, summary.totalSize); } - private void applyRowToRocksDB(ColumnFamilyStore cfs, - boolean writeCommitLog, - DecoratedKey partitionKey, - UpdateTransaction indexer, - Row row) - { - - Clustering clustering = row.clustering(); - - byte[] rocksDBKey = RowKeyEncoder.encode(partitionKey, clustering, cfs.metadata); - byte[] rocksDBValue = RowValueEncoder.encode(cfs.metadata, row); - - try - { - indexer.start(); - rocksDBFamily.get(cfs.metadata.cfId).merge(partitionKey, rocksDBKey, rocksDBValue); - if (indexer != UpdateTransaction.NO_OP) - { - try - { - secondaryIndexMetrics.rsiTotalInsertions.inc(); - indexer.onInserted(row); - } - catch (RuntimeException e) - { - secondaryIndexMetrics.rsiInsertionFailures.inc(); - logger.error(e.toString(), e); - throw new StorageEngineException("Index update failed", e); - } - - } - } - catch (RocksDBException e) - { - logger.error(e.toString(), e); - throw new StorageEngineException("Row merge failed", e); - } - finally - { - indexer.commit(); - } - } - public static RocksDBCF getRocksDBCF(UUID cfId) { ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(cfId); diff --git a/test/unit/org/apache/cassandra/rocksdb/RocksDBCFTest.java b/test/unit/org/apache/cassandra/rocksdb/RocksDBCFTest.java index 1aa03d00c134..6b7fe85181a1 100644 --- a/test/unit/org/apache/cassandra/rocksdb/RocksDBCFTest.java +++ b/test/unit/org/apache/cassandra/rocksdb/RocksDBCFTest.java @@ -35,7 +35,9 @@ import org.apache.cassandra.rocksdb.encoding.value.RowValueEncoder; import org.apache.cassandra.utils.Hex; import org.rocksdb.IndexType; +import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -47,6 +49,13 @@ public class RocksDBCFTest extends RocksDBTestBase { final DecoratedKey dk = Util.dk("test_key"); + public void writeKeyValue(RocksDBCF cf, DecoratedKey partitionKey, byte[] key, byte[] value) throws RocksDBException + { + WriteBatch batch = new WriteBatch(); + batch.merge(key, value); + cf.write(partitionKey, batch, true); + } + @Test public void testMerge() throws RocksDBException { @@ -57,7 +66,7 @@ public void testMerge() throws RocksDBException RocksDBCF rocksDBCF = RocksDBEngine.getRocksDBCF(cfs.metadata.cfId); byte[] key = "test_key".getBytes(); byte[] value = encodeValue(cfs, "test_value"); - rocksDBCF.merge(dk, key, value); + writeKeyValue(rocksDBCF, dk, key, value); assertArrayEquals(value, rocksDBCF.get(dk, key)); } @@ -84,10 +93,10 @@ public void testDeleteRange() throws RocksDBException byte[] d = "d".getBytes(); byte[] value = encodeValue(cfs, "test_value"); - rocksDBCF.merge(dk, a, value); - rocksDBCF.merge(dk, b, value); - rocksDBCF.merge(dk, c, value); - rocksDBCF.merge(dk, d, value); + writeKeyValue(rocksDBCF, dk, a, value); + writeKeyValue(rocksDBCF, dk, b, value); + writeKeyValue(rocksDBCF, dk, c, value); + writeKeyValue(rocksDBCF, dk, d, value); rocksDBCF.deleteRange(b, d); rocksDBCF.compactRange(); @@ -109,7 +118,7 @@ public void testTruncate() throws RocksDBException byte[] key = "test_key".getBytes(); byte[] value = encodeValue(cfs, "test_value"); - rocksDBCF.merge(dk, key, value); + writeKeyValue(rocksDBCF, dk, key, value); assertArrayEquals(value, rocksDBCF.get(dk, key)); rocksDBCF.truncate(); @@ -130,7 +139,7 @@ public void testClose() throws RocksDBException byte[] key = "test_key".getBytes(); byte[] value = encodeValue(cfs, "test_value"); - rocksDBCF.merge(dk, key, value); + writeKeyValue(rocksDBCF, dk, key, value); assertArrayEquals(value, rocksDBCF.get(dk, key)); @@ -157,9 +166,9 @@ public void testDumpPrefix() throws Exception ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); RocksDBCF rocksDBCF = RocksDBEngine.getRocksDBCF(cfs.metadata.cfId); - rocksDBCF.merge(dk, "test_key1".getBytes(), "test_value11".getBytes()); - rocksDBCF.merge(dk, "test_key1".getBytes(), "test_value12".getBytes()); - rocksDBCF.merge(dk, "test_key2".getBytes(), "test_value2".getBytes()); + writeKeyValue(rocksDBCF, dk, "test_key1".getBytes(), "test_value11".getBytes()); + writeKeyValue(rocksDBCF, dk, "test_key1".getBytes(), "test_value12".getBytes()); + writeKeyValue(rocksDBCF, dk, "test_key2".getBytes(), "test_value2".getBytes()); String dump = rocksDBCF.dumpPrefix(dk, "test_key".getBytes(), Integer.MAX_VALUE); assertEquals(2, dump.split("\n").length);