Skip to content

Commit 16319a5

Browse files
committed
[#30703] YSQL: Handle non-BEGIN/COMMIT/INSERT/DELETE records in notifications poller process
Summary: Currently (before this fix), the notifications poller process throws an error if it encounters a record with an action type other than BEGIN/COMMIT/INSERT/DELETE. Because of the error, the process crashes, a new process starts, receives the same record (since the previous record was not processed and no ack was sent), and the cycle repeats. Though rare, we can have other record types in future (for instance, DDL record if we add a column to the table). Handle these records by just logging a warning. Backport-through: 2025.2 Test Plan: ./yb_build.sh --java-test 'org.yb.pgsql.TestPgListenNotify#testNotifyWorksAfterUpdateOnNotificationsTable' Reviewers: jason Reviewed By: jason Subscribers: yql Differential Revision: https://phorge.dev.yugabyte.com/D53006
1 parent 7857384 commit 16319a5

2 files changed

Lines changed: 54 additions & 2 deletions

File tree

java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgListenNotify.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,58 @@ public void testQueueFullTerminatesSlowestListener() throws Exception {
936936
}
937937
}
938938

939+
/**
940+
* Verifies that LISTEN/NOTIFY works even after a non-INSERT/DELETE record (i.e., an UPDATE)
941+
* appears in the pg_yb_notifications CDC stream. The notifications poller consumes the CDC
942+
* stream for pg_yb_notifications; an UPDATE record is unexpected (users never UPDATE this
943+
* table) and must not break the poller.
944+
*
945+
* Scenario:
946+
* 1. Session starts LISTEN.
947+
* 2. NOTIFY is sent and received by the listener.
948+
* 3. A dummy row is INSERTed into pg_yb_notifications and then UPDATEd (producing a
949+
* non-INSERT/DELETE CDC record).
950+
* 4. Another NOTIFY is sent and received by the listener, proving the poller survived.
951+
*/
952+
@Test
953+
public void testNotifyWorksAfterUpdateOnNotificationsTable() throws Exception {
954+
final String channel = "update_test";
955+
956+
try (Connection listenerConn = getConnectionBuilder().connect();
957+
Connection notifierConn = getConnectionBuilder().connect()) {
958+
try (Statement stmt = listenerConn.createStatement()) {
959+
stmt.execute("LISTEN " + channel);
960+
}
961+
962+
// First NOTIFY: baseline check.
963+
try (Statement stmt = notifierConn.createStatement()) {
964+
stmt.execute("NOTIFY " + channel + ", 'before_update'");
965+
}
966+
waitForNotification(listenerConn, channel, "before_update");
967+
LOG.info("Received notification before UPDATE on pg_yb_notifications");
968+
969+
// INSERT a dummy row into pg_yb_notifications and UPDATE it.
970+
try (Connection ybSystemConn =
971+
getConnectionBuilder().withDatabase("yb_system").connect();
972+
Statement stmt = ybSystemConn.createStatement()) {
973+
stmt.execute("INSERT INTO pg_yb_notifications"
974+
+ " (notif_uuid, sender_node_uuid, sender_pid, db_oid, is_listen, data)"
975+
+ " VALUES ('00000000-0000-0000-0000-000000000001',"
976+
+ " '00000000-0000-0000-0000-000000000002', 0, 0, false, '\\x00')");
977+
stmt.execute("UPDATE pg_yb_notifications SET sender_pid = 1"
978+
+ " WHERE notif_uuid = '00000000-0000-0000-0000-000000000001'");
979+
LOG.info("Inserted and updated dummy row in pg_yb_notifications");
980+
}
981+
982+
// Second NOTIFY: must still work after the UPDATE record hit the CDC stream.
983+
try (Statement stmt = notifierConn.createStatement()) {
984+
stmt.execute("NOTIFY " + channel + ", 'after_update'");
985+
}
986+
waitForNotification(listenerConn, channel, "after_update");
987+
LOG.info("Received notification after UPDATE on pg_yb_notifications");
988+
}
989+
}
990+
939991
private void setFatalAfterNotifsQueueWriteFlag(boolean value) throws Exception {
940992
String v = value ? "true" : "false";
941993
for (HostAndPort tserver : miniCluster.getTabletServers().keySet()) {

src/postgres/src/backend/commands/async.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3243,9 +3243,9 @@ ybNotifsPollerProcessRecord(const YbcPgRowMessage *record)
32433243
break;
32443244

32453245
default:
3246-
ereport(ERROR,
3246+
ereport(WARNING,
32473247
(errcode(ERRCODE_INTERNAL_ERROR),
3248-
errmsg("invalid record found by notification poller process")));
3248+
errmsg("ignoring record with action type %d in notification poller process", record->action)));
32493249
}
32503250
}
32513251

0 commit comments

Comments
 (0)