Skip to content

Commit 029b019

Browse files
committed
public: Fix postgresql transaction panic.
This corrects a panic triggered by a particular PostgreSQL transaction. The transaction first added a new record to a table and then immediately deleted all records from that same table. The panic was induced by the remove operation on RocksDB. This operation checks if a record exists in RocksDB before queuing the removal operation in the RocksDB WriteBatch object. However, since the insertion and deletion of a specific record were part of the same transaction, the record was never actually written to RocksDB. This change fixes it by removing records from the recordset that would negate each other. This way there won't be a record that has not been written to RocksDB that later needs to be removed. Change-Id: I1270a3da00d352243116d62c237a4787c4d850f8 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5801 Tested-by: Buildkite CI Reviewed-by: Tamas Juhasz <[email protected]> Reviewed-by: Luke Osborne <[email protected]>
1 parent ec5ee70 commit 029b019

File tree

3 files changed

+175
-41
lines changed

3 files changed

+175
-41
lines changed

dataflow-state/src/persistent_state.rs

Lines changed: 70 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,10 @@ impl State for PersistentState {
643643
replication_offset: Option<ReplicationOffset>,
644644
) -> ReadySetResult<()> {
645645
invariant!(partial_tag.is_none(), "PersistentState can't be partial");
646+
647+
// Streamline the records by eliminating pairs that would negate each other.
648+
records.remove_deleted();
649+
646650
if records.len() == 0 && replication_offset.is_none() {
647651
return Ok(());
648652
}
@@ -666,47 +670,7 @@ impl State for PersistentState {
666670
}
667671
}
668672
}
669-
670-
let mut opts = rocksdb::WriteOptions::default();
671-
if self.snapshot_mode.is_enabled()
672-
// if we're setting the replication offset, that means we've snapshot the full table, so
673-
// set sync to true there even if snapshot_mode is enabled, to make sure that makes it
674-
// onto disk (not doing this *will* cause the write to get lost if the server restarts!)
675-
&& replication_offset.is_none()
676-
{
677-
opts.disable_wal(true);
678-
} else {
679-
let db = &self.db.handle();
680-
if self.snapshot_mode.is_enabled() && replication_offset.is_some() {
681-
// We are setting the replication offset, which is great, but all of our previous
682-
// writes are not guaranteed to flush to disk even if the next write is synced. We
683-
// therefore perform a flush before handling the next write.
684-
//
685-
// See: https://github.com/facebook/rocksdb/wiki/RocksDB-FAQhttps://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
686-
// Q: After a write following option.disableWAL=true, I write another record with
687-
// options.sync=true, will it persist the previous write too?
688-
// A: No. After the program crashes, writes with option.disableWAL=true will be
689-
// lost, if they are not flushed to SST files.
690-
for index in self.db.inner().indices.iter() {
691-
db.flush_cf(db.cf_handle(&index.column_family).unwrap())
692-
.map_err(|e| internal_err!("Flush to disk failed: {e}"))?;
693-
}
694-
695-
db.flush()
696-
.map_err(|e| internal_err!("Flush to disk failed: {e}"))?;
697-
}
698-
opts.set_sync(true);
699-
}
700-
701-
if let Some(offset) = replication_offset {
702-
self.set_replication_offset(&mut batch, offset);
703-
}
704-
705-
self.db
706-
.handle()
707-
.write_opt(batch, &opts)
708-
.map_err(|e| internal_err!("Write failed: {e}"))?;
709-
673+
self.write_to_db(batch, &replication_offset)?;
710674
Ok(())
711675
}
712676

@@ -1964,6 +1928,57 @@ impl PersistentState {
19641928
) -> Vec<RecordResult<'a>> {
19651929
self.db.lookup_multi(columns, keys)
19661930
}
1931+
1932+
/// Takes the provided batch and optionally a replication offset and writes to the RocksDB
1933+
/// database.
1934+
fn write_to_db(
1935+
&mut self,
1936+
batch: WriteBatch,
1937+
replication_offset: &Option<ReplicationOffset>,
1938+
) -> ReadySetResult<()> {
1939+
let mut batch = batch;
1940+
let mut write_options = rocksdb::WriteOptions::default();
1941+
if self.snapshot_mode.is_enabled()
1942+
// if we're setting the replication offset, that means we've snapshot the full table, so
1943+
// set sync to true there even if snapshot_mode is enabled, to make sure that makes it
1944+
// onto disk (not doing this *will* cause the write to get lost if the server restarts!)
1945+
&& replication_offset.is_none()
1946+
{
1947+
write_options.disable_wal(true);
1948+
} else {
1949+
let db = &self.db.handle();
1950+
if self.snapshot_mode.is_enabled() && replication_offset.is_some() {
1951+
// We are setting the replication offset, which is great, but all of our previous
1952+
// writes are not guaranteed to flush to disk even if the next write is synced. We
1953+
// therefore perform a flush before handling the next write.
1954+
//
1955+
// See: https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
1956+
// Q: After a write following option.disableWAL=true, I write another record with
1957+
// options.sync=true, will it persist the previous write too?
1958+
// A: No. After the program crashes, writes with option.disableWAL=true will be
1959+
// lost, if they are not flushed to SST files.
1960+
for index in self.db.inner().indices.iter() {
1961+
db.flush_cf(db.cf_handle(&index.column_family).unwrap())
1962+
.map_err(|e| internal_err!("Flush to disk failed: {e}"))?;
1963+
}
1964+
1965+
db.flush()
1966+
.map_err(|e| internal_err!("Flush to disk failed: {e}"))?;
1967+
}
1968+
write_options.set_sync(true);
1969+
}
1970+
1971+
if let Some(offset) = replication_offset {
1972+
self.set_replication_offset(&mut batch, offset.clone());
1973+
}
1974+
1975+
self.db
1976+
.handle()
1977+
.write_opt(batch, &write_options)
1978+
.map_err(|e| internal_err!("Write failed: {e}"))?;
1979+
1980+
Ok(())
1981+
}
19671982
}
19681983

19691984
/// Checks if the given index is unique for this base table.
@@ -2264,6 +2279,20 @@ mod tests {
22642279
}
22652280
}
22662281

2282+
#[test]
2283+
fn persistent_state_add_remove_same_record() {
2284+
let mut state = setup_persistent("persistent_state_multiple_indices", None);
2285+
let first: Vec<DfValue> = vec![10.into(), "Cat".into(), 1.into()];
2286+
let second: Vec<DfValue> = vec![10.into(), "Cat".into(), 1.into()];
2287+
let mut records: Records = Default::default();
2288+
records.push(Record::Positive(first));
2289+
records.push(Record::Negative(second));
2290+
2291+
state.add_key(Index::new(IndexType::HashMap, vec![0]), None);
2292+
state.add_key(Index::new(IndexType::HashMap, vec![1, 2]), None);
2293+
state.process_records(&mut records, None, None).unwrap();
2294+
}
2295+
22672296
#[test]
22682297
fn empty_column_set() {
22692298
let mut state = setup_persistent("empty_column_set", None);

readyset-common/src/records.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,28 @@ impl Records {
148148
{
149149
self.has(q, false)
150150
}
151+
152+
// This function checks every Negative record and ensures that there isn't a Positive record
153+
// before it that matches its content. If there is, then both the Negative and Positive
154+
// records are removed. This will prevent unnecessary writes to RocksDB.
155+
pub fn remove_deleted(&mut self) {
156+
let mut i = 0;
157+
while i < self.0.len() {
158+
if let Record::Negative(val) = &self.0[i] {
159+
for j in (0..i).rev() {
160+
if let Record::Positive(pos_val) = &self.0[j] {
161+
if pos_val == val {
162+
self.0.remove(j);
163+
i -= 1;
164+
self.0.remove(i); // index decreased due to previous removal
165+
break;
166+
}
167+
}
168+
}
169+
}
170+
i += 1;
171+
}
172+
}
151173
}
152174

153175
impl Deref for Records {
@@ -200,4 +222,55 @@ mod tests {
200222
Record::Positive(vec![1.into(), 2.into()]) < Record::Negative(vec![1.into(), 2.into()])
201223
)
202224
}
225+
226+
// Transactions sometimes include records that negate each other. The following test
227+
// ensures that the simplify function handles them correctly.
228+
#[test]
229+
fn test_simplify() {
230+
let mut records: Records = vec![
231+
Record::Positive(vec![1.into(), "2".into(), 3.into()]),
232+
Record::Negative(vec![1.into(), "2".into(), 3.into()]),
233+
Record::Positive(vec![4.into(), "5".into(), 6.into()]),
234+
Record::Negative(vec![4.into(), "5".into(), 6.into()]),
235+
Record::Positive(vec!["last".into(), 8.into(), 9.into()]),
236+
]
237+
.into();
238+
239+
records.remove_deleted();
240+
241+
let mut result: Records =
242+
vec![Record::Positive(vec!["last".into(), 8.into(), 9.into()])].into();
243+
244+
assert_eq!(records, result);
245+
246+
records = vec![
247+
Record::Positive(vec![1.into(), "2".into(), 3.into()]),
248+
Record::Negative(vec![9.into(), "2".into(), 3.into()]),
249+
Record::Positive(vec![7.into(), "5".into(), 6.into()]),
250+
Record::Negative(vec![1.into(), "2".into(), 3.into()]),
251+
Record::Positive(vec!["last".into(), 8.into(), 9.into()]),
252+
]
253+
.into();
254+
255+
records.remove_deleted();
256+
257+
result = vec![
258+
Record::Negative(vec![9.into(), "2".into(), 3.into()]),
259+
Record::Positive(vec![7.into(), "5".into(), 6.into()]),
260+
Record::Positive(vec!["last".into(), 8.into(), 9.into()]),
261+
]
262+
.into();
263+
264+
assert_eq!(records, result);
265+
266+
records = vec![
267+
Record::Positive(vec![1.into(), "2".into(), 3.into()]),
268+
Record::Negative(vec![1.into(), "2".into(), 3.into()]),
269+
]
270+
.into();
271+
272+
records.remove_deleted();
273+
274+
assert!(records.is_empty());
275+
}
203276
}

readyset-psql/tests/fallback.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1406,3 +1406,35 @@ async fn show_proxied_queries_show_caches_query_text_matches() {
14061406

14071407
shutdown_tx.shutdown().await;
14081408
}
1409+
1410+
#[tokio::test(flavor = "multi_thread")]
1411+
#[serial]
1412+
async fn insert_delete_a_record_in_the_same_transaction() {
1413+
readyset_tracing::init_test_logging();
1414+
let (config, _handle, shutdown_tx) = setup().await;
1415+
let mut client = connect(config).await;
1416+
client.simple_query("create table t(a int)").await.unwrap();
1417+
{
1418+
let transaction = client.transaction().await.unwrap();
1419+
// Begin transaction
1420+
transaction.batch_execute("BEGIN").await.unwrap();
1421+
1422+
// Value to be inserted
1423+
let val = 1;
1424+
1425+
transaction
1426+
.execute("INSERT INTO t VALUES($1)", &[&val])
1427+
.await
1428+
.unwrap();
1429+
transaction.execute("delete from t", &[]).await.unwrap();
1430+
1431+
// Commit the transaction
1432+
transaction.batch_execute("COMMIT").await.unwrap();
1433+
}
1434+
1435+
// Check if all the records have been deleted
1436+
let rows = client.query("SELECT COUNT(*) FROM t", &[]).await.unwrap();
1437+
let count: i64 = rows[0].get(0);
1438+
assert_eq!(count, 0);
1439+
shutdown_tx.shutdown().await;
1440+
}

0 commit comments

Comments
 (0)