Skip to content

Commit bd72123

Browse files
authored
Make citus_create_restore_point MX-safe by blocking 2PC commit decisions (#8352)
DESCRIPTION: Make citus_create_restore_point MX-safe by blocking 2PC commit decisions Problem: -------- In coordinator-only mode, citus_create_restore_point() creates consistent restore points by blocking distributed writes at the coordinator level, which is safe because all distributed transactions are coordinated through the coordinator. However, in MX mode (multi-writer), any worker with metadata can initiate distributed transactions. The existing implementation only blocks writes at the coordinator, allowing metadata workers to continue making 2PC commit decisions. This can result in an inconsistent cluster state where restore points on different nodes represent different transaction visibility. Solution: --------- Block distributed transaction commit decisions cluster-wide by acquiring ExclusiveLock on pg_dist_transaction on all metadata nodes (coordinator and MX workers). Additionally, on the coordinator only, lock pg_dist_node and pg_dist_partition to prevent topology and schema changes. This selective locking strategy is based on the MX mode architecture: - DDL operations (topology changes, table creation) can ONLY be executed through the coordinator node, even in MX mode - MX workers can only initiate distributed DML transactions (INSERT/UPDATE/ DELETE) that use 2PC - Therefore, locking pg_dist_transaction on remote metadata nodes is sufficient to block all distributed writes they can perform, while the coordinator's additional locks on pg_dist_node and pg_dist_partition provide cluster-wide protection against DDL changes The implementation: ------------------- 1. Opens connections to all nodes (metadata and non-metadata workers) 2. Begins coordinated transactions on all remote connections 3. Acquires ExclusiveLock on pg_dist_node, pg_dist_partition, and pg_dist_transaction locally on the coordinator via LockRelationOid() 4. Acquires ExclusiveLock on pg_dist_transaction on all remote metadata nodes via SQL LOCK TABLE command (executed in parallel) 5. Creates restore points on all nodes in parallel (both metadata and non-metadata nodes need WAL restore points) 6. Closes remote connections, which releases locks via implicit ROLLBACK Key Insight - Why No Transaction Drainage Is Needed: ----------------------------------------------------- The commit decision in Citus 2PC occurs when LogTransactionRecord() writes to pg_dist_transaction (using RowExclusiveLock for the insert), which happens BEFORE the writer's local commit (in the PRE_COMMIT callback). By holding ExclusiveLock on pg_dist_transaction: - Transactions that have already recorded their commit decision (already inserted their row) will complete normally - Transactions that haven't recorded their commit decision yet will block on the ExclusiveLock (which conflicts with the RowExclusiveLock needed for inserts), preventing them from proceeding This creates a clean cut point for consistency without requiring us to drain in-flight transactions. The restore point captures the exact state of committed transactions across the cluster. Recovery Correctness: --------------------- The maintenance daemon's recovery logic relies on the presence of pg_dist_transaction records to determine whether to COMMIT PREPARED or ROLLBACK PREPARED. Our blocking ensures that: - Prepared transactions WITH commit records will be committed on recovery - Prepared transactions WITHOUT commit records will be rolled back on recovery Since we create restore points while holding these locks, all nodes capture the same set of commit decisions, ensuring cluster-wide consistency. Backward Compatibility: ----------------------- - Return type unchanged: still returns coordinator LSN (pg_lsn) - Coordinator-only mode: unchanged behavior - MX mode: automatic detection and enhanced safety (transparent) - No SQL function signature changes required
1 parent cf28aad commit bd72123

File tree

9 files changed

+521
-19
lines changed

9 files changed

+521
-19
lines changed

.github/workflows/build_and_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ jobs:
144144
${{ needs.params.outputs.pg17_version }},
145145
${{ needs.params.outputs.pg18_version }}
146146
]
147-
make_targets: '["check-split", "check-multi", "check-multi-1", "check-multi-mx", "check-vanilla", "check-isolation", "check-operations", "check-follower-cluster", "check-add-backup-node", "check-columnar", "check-columnar-isolation", "check-enterprise", "check-enterprise-isolation", "check-enterprise-isolation-logicalrep-1", "check-enterprise-isolation-logicalrep-2", "check-enterprise-isolation-logicalrep-3"]'
147+
make_targets: '["check-split", "check-multi", "check-multi-1", "check-multi-mx", "check-vanilla", "check-isolation", "check-operations", "check-follower-cluster", "check-add-backup-node", "check-columnar", "check-columnar-isolation", "check-enterprise", "check-enterprise-isolation", "check-enterprise-isolation-logicalrep-1", "check-enterprise-isolation-logicalrep-2", "check-enterprise-isolation-logicalrep-3", "check-tap"]'
148148
image_suffix: ${{ needs.params.outputs.image_suffix }}
149149
image_name: ${{ needs.params.outputs.test_image_name }}
150150
secrets:

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ lib*.pc
4040
/compile_commands.json
4141
/src/backend/distributed/cdc/build-cdc-*/*
4242
/src/test/cdc/tmp_check/*
43+
/src/test/tap/tmp_check/*
44+
/src/test/tap/log/*
4345

4446

4547
# temporary files vim creates

src/backend/distributed/operations/citus_create_restore_point.c

Lines changed: 165 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,54 @@
3131

3232
#define CREATE_RESTORE_POINT_COMMAND "SELECT pg_catalog.pg_create_restore_point($1::text)"
3333

34+
/*
35+
* BLOCK_DISTRIBUTED_WRITES_COMMAND acquires ExclusiveLock on:
36+
* 1. pg_dist_transaction - blocks 2PC commit decisions
37+
* 2. pg_dist_partition - blocks DDL operations on distributed tables
38+
*
39+
* This ensures both DML (via 2PC) and DDL are blocked on metadata nodes.
40+
*/
41+
#define BLOCK_DISTRIBUTED_WRITES_COMMAND \
42+
"LOCK TABLE pg_catalog.pg_dist_transaction IN EXCLUSIVE MODE; " \
43+
"LOCK TABLE pg_catalog.pg_dist_partition IN EXCLUSIVE MODE"
3444

3545
/* local functions forward declarations */
3646
static List * OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode);
3747
static void BlockDistributedTransactions(void);
3848
static void CreateRemoteRestorePoints(char *restoreName, List *connectionList);
49+
static void BlockDistributedTransactionsOnAllMetadataNodes(List *connectionList);
3950

4051

4152
/* exports for SQL callable functions */
4253
PG_FUNCTION_INFO_V1(citus_create_restore_point);
4354

4455

4556
/*
46-
* citus_create_restore_point blocks writes to distributed tables and then
47-
* runs pg_create_restore_point on all nodes. This creates a consistent
48-
* restore point under the assumption that there are no other writers
49-
* than the coordinator.
57+
* citus_create_restore_point creates a cluster-consistent restore point
58+
* across all nodes in the Citus cluster.
59+
*
60+
* In coordinator-only mode, this function blocks new distributed writes
61+
* at the coordinator and creates restore points on all worker nodes.
62+
*
63+
* In MX mode (multi-writer), this function blocks both DML and DDL
64+
* operations on all metadata nodes by acquiring ExclusiveLock on:
65+
* - pg_dist_transaction: blocks 2PC commit decisions (DML)
66+
* - pg_dist_partition: blocks DDL on distributed tables
67+
*
68+
* This prevents new distributed transactions from recording commit decisions
69+
* and blocks schema changes, ensuring all restore points represent the same
70+
* consistent cluster state.
71+
*
72+
* The function returns the LSN of the restore point on the coordinator,
73+
* maintaining backward compatibility with the original implementation.
74+
*
75+
* Key insight: We do NOT need to drain in-flight transactions. The commit
76+
* decision in Citus 2PC happens when LogTransactionRecord() writes to
77+
* pg_dist_transaction, which occurs BEFORE the writer's local commit.
78+
* By blocking writes to pg_dist_transaction, we prevent commit decisions
79+
* from being made. Transactions that have already recorded their commit
80+
* decision will complete normally, while those that haven't will
81+
* be blocked. This creates a clean cut point for consistency.
5082
*/
5183
Datum
5284
citus_create_restore_point(PG_FUNCTION_ARGS)
@@ -88,22 +120,56 @@ citus_create_restore_point(PG_FUNCTION_ARGS)
88120
* ShareLock prevents new nodes being added, rendering connectionList incomplete
89121
*/
90122
List *connectionList = OpenConnectionsToAllWorkerNodes(ShareLock);
123+
XLogRecPtr localRestorePoint = InvalidXLogRecPtr;
91124

92-
/*
93-
* Send a BEGIN to bust through pgbouncer. We won't actually commit since
94-
* that takes time. Instead we just close the connections and roll back,
95-
* which doesn't undo pg_create_restore_point.
96-
*/
97-
RemoteTransactionListBegin(connectionList);
125+
PG_TRY();
126+
{
127+
/*
128+
* Send a BEGIN to bust through pgbouncer. We won't actually commit since
129+
* that takes time. Instead we just close the connections and roll back,
130+
* which doesn't undo pg_create_restore_point.
131+
*/
132+
RemoteTransactionListBegin(connectionList);
133+
134+
/* DANGER: finish as quickly as possible after this */
135+
BlockDistributedTransactions();
98136

99-
/* DANGER: finish as quickly as possible after this */
100-
BlockDistributedTransactions();
137+
BlockDistributedTransactionsOnAllMetadataNodes(connectionList);
101138

102-
/* do local restore point first to bail out early if something goes wrong */
103-
XLogRecPtr localRestorePoint = XLogRestorePoint(restoreNameString);
139+
/* do local restore point first to bail out early if something goes wrong */
140+
localRestorePoint = XLogRestorePoint(restoreNameString);
104141

105-
/* run pg_create_restore_point on all nodes */
106-
CreateRemoteRestorePoints(restoreNameString, connectionList);
142+
/* run pg_create_restore_point on all nodes */
143+
CreateRemoteRestorePoints(restoreNameString, connectionList);
144+
145+
/* close connections to all nodes and
146+
* all locks gets released as part of the transaction rollback
147+
*/
148+
MultiConnection *conn = NULL;
149+
foreach_declared_ptr(conn, connectionList)
150+
{
151+
ForgetResults(conn);
152+
CloseConnection(conn);
153+
}
154+
connectionList = NIL;
155+
}
156+
PG_CATCH();
157+
{
158+
/*
159+
* On error, ensure we clean up connections and release locks.
160+
* Rolling back the metadata node transactions releases the
161+
* ExclusiveLocks on pg_dist_transaction cluster-wide.
162+
*/
163+
MultiConnection *conn = NULL;
164+
foreach_declared_ptr(conn, connectionList)
165+
{
166+
ForgetResults(conn);
167+
CloseConnection(conn);
168+
}
169+
connectionList = NIL;
170+
PG_RE_THROW();
171+
}
172+
PG_END_TRY();
107173

108174
PG_RETURN_LSN(localRestorePoint);
109175
}
@@ -152,6 +218,89 @@ BlockDistributedTransactions(void)
152218
}
153219

154220

221+
/*
222+
* BlockDistributedTransactionsOnAllMetadataNodes blocks distributed transactions
223+
* on all metadata nodes by executing pg_lock_table remotely.
224+
*
225+
* This is the MX-mode equivalent of BlockDistributedTransactions(), extended
226+
* to all nodes capable of initiating distributed transactions. We must hold
227+
* these locks across the cluster to prevent commit decisions from being made
228+
* on any node.
229+
*
230+
* The function expects that connections are already in a transaction block
231+
* (BEGIN has been sent). The locks will be held until the transaction is
232+
* rolled back or committed.
233+
*/
234+
static void
235+
BlockDistributedTransactionsOnAllMetadataNodes(List *connectionList)
236+
{
237+
/*
238+
* Send LOCK TABLE commands to all metadata nodes in parallel. We use
239+
* standard SQL LOCK TABLE syntax to acquire ExclusiveLock on catalog
240+
* tables, mirroring what BlockDistributedTransactions() does on the
241+
* coordinator via LockRelationOid().
242+
*
243+
* The BLOCK_DISTRIBUTED_WRITES_COMMAND acquires:
244+
* 1. ExclusiveLock on pg_dist_transaction (blocks 2PC commit decisions)
245+
* 2. ExclusiveLock on pg_dist_partition (blocks DDL on distributed tables)
246+
*
247+
* Note: Unlike the local coordinator lock which also locks pg_dist_node,
248+
* we don't lock pg_dist_node on remote nodes because node management
249+
* operations (adding/removing nodes) are still coordinator-only.
250+
*
251+
* These locks naturally serialize concurrent restore point operations
252+
* cluster-wide, so no additional advisory lock is needed.
253+
*/
254+
255+
/* Build list of remote metadata node connections */
256+
List *metadataConnectionList = NIL;
257+
MultiConnection *connection = NULL;
258+
foreach_declared_ptr(connection, connectionList)
259+
{
260+
WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port);
261+
bool isRemoteMetadataNode = workerNode != NULL &&
262+
NodeIsPrimaryAndRemote(workerNode);
263+
264+
if (isRemoteMetadataNode)
265+
{
266+
metadataConnectionList = lappend(metadataConnectionList, connection);
267+
}
268+
}
269+
270+
/* Send lock commands in parallel to all remote metadata nodes */
271+
foreach_declared_ptr(connection, metadataConnectionList)
272+
{
273+
/*
274+
* We could use ExecuteCriticalRemoteCommand instead, but it would
275+
* not allow us to execute the commands in parallel. So for sake of
276+
* performance, we use SendRemoteCommand and send lock commands in parallel
277+
* to all metadata nodes, and later wait for all lock acquisitions to complete.
278+
*/
279+
int querySent = SendRemoteCommand(connection, BLOCK_DISTRIBUTED_WRITES_COMMAND);
280+
if (querySent == 0)
281+
{
282+
ReportConnectionError(connection, ERROR);
283+
}
284+
}
285+
286+
/*
287+
* Wait for all lock acquisitions to complete. If any node fails to
288+
* acquire locks (e.g., due to a conflicting lock), this will error out.
289+
*/
290+
foreach_declared_ptr(connection, metadataConnectionList)
291+
{
292+
PGresult *result = GetRemoteCommandResult(connection, true);
293+
if (!IsResponseOK(result))
294+
{
295+
ReportResultError(connection, result, ERROR);
296+
}
297+
298+
PQclear(result);
299+
ForgetResults(connection);
300+
}
301+
}
302+
303+
155304
/*
156305
* CreateRemoteRestorePoints creates a restore point via each of the
157306
* connections in the list in parallel.
@@ -186,6 +335,5 @@ CreateRemoteRestorePoints(char *restoreName, List *connectionList)
186335
PQclear(result);
187336

188337
ForgetResults(connection);
189-
CloseConnection(connection);
190338
}
191339
}

src/test/regress/Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ vanilla_diffs_file = $(citus_abs_srcdir)/pg_vanilla_outputs/$(MAJORVERSION)/regr
5252
# intermediate, for muscle memory backward compatibility.
5353
check: check-full check-enterprise-full
5454
# check-full triggers all tests that ought to be run routinely
55-
check-full: check-multi check-multi-mx check-multi-1 check-operations check-add-backup-node check-follower-cluster check-isolation check-failure check-split check-vanilla check-columnar check-columnar-isolation check-pg-upgrade check-arbitrary-configs check-citus-upgrade check-citus-upgrade-mixed check-citus-upgrade-local check-citus-upgrade-mixed-local check-pytest check-query-generator
55+
check-full: check-multi check-multi-mx check-multi-1 check-operations check-add-backup-node check-follower-cluster check-isolation check-failure check-split check-vanilla check-columnar check-columnar-isolation check-pg-upgrade check-arbitrary-configs check-citus-upgrade check-citus-upgrade-mixed check-citus-upgrade-local check-citus-upgrade-mixed-local check-pytest check-query-generator check-tap
5656
# check-enterprise-full triggers all enterprise specific tests
5757
check-enterprise-full: check-enterprise check-enterprise-isolation check-enterprise-failure check-enterprise-isolation-logicalrep-1 check-enterprise-isolation-logicalrep-2 check-enterprise-isolation-logicalrep-3
5858

@@ -221,6 +221,9 @@ check-multi-mx: all
221221
$(pg_regress_multi_check) --load-extension=citus \
222222
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_mx_schedule $(EXTRA_TESTS)
223223

224+
check-tap:
225+
$(MAKE) -C $(citus_top_srcdir)/src/test/tap installcheck
226+
224227
check-follower-cluster: all
225228
$(pg_regress_multi_check) --load-extension=citus --follower-cluster \
226229
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_follower_schedule $(EXTRA_TESTS)

src/test/regress/expected/multi_utilities.out

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,10 @@ NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_
301301
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
302302
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
303303
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
304+
NOTICE: issuing LOCK TABLE pg_catalog.pg_dist_transaction IN EXCLUSIVE MODE; LOCK TABLE pg_catalog.pg_dist_partition IN EXCLUSIVE MODE
305+
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
306+
NOTICE: issuing LOCK TABLE pg_catalog.pg_dist_transaction IN EXCLUSIVE MODE; LOCK TABLE pg_catalog.pg_dist_partition IN EXCLUSIVE MODE
307+
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
304308
NOTICE: issuing SELECT pg_catalog.pg_create_restore_point($1::text)
305309
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
306310
NOTICE: issuing SELECT pg_catalog.pg_create_restore_point($1::text)

src/test/tap/Makefile

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
subdir = src/test/tap
2+
citus_top_builddir = ../../..
3+
include $(citus_top_builddir)/Makefile.global
4+
5+
pg_version = $(shell $(PG_CONFIG) --version 2>/dev/null)
6+
pg_whole_version = $(shell echo "$(pg_version)"| sed -e 's/^PostgreSQL \([0-9]*\)\(\.[0-9]*\)\{0,1\}\(.*\)/\1\2/')
7+
pg_major_version = $(shell echo "$(pg_whole_version)"| sed -e 's/^\([0-9]\{2\}\)\(.*\)/\1/')
8+
export pg_major_version
9+
10+
test_path = t/*.pl
11+
12+
ifeq ($(enable_tap_tests),yes)
13+
14+
define citus_prove_installcheck
15+
rm -rf '$(CURDIR)'/tmp_check
16+
$(MKDIR_P) '$(CURDIR)'/tmp_check
17+
cd $(srcdir) && \
18+
TESTDIR='$(CURDIR)' \
19+
PATH="$(bindir):$$PATH" \
20+
PGPORT='6$(DEF_PGPORT)' \
21+
top_builddir='$(CURDIR)/$(top_builddir)' \
22+
PG_REGRESS='$(pgxsdir)/src/test/regress/pg_regress' \
23+
TEMP_CONFIG='$(CURDIR)'/postgresql.conf \
24+
$(PROVE) $(PG_PROVE_FLAGS) $(PROVE_FLAGS) $(if $(PROVE_TESTS),$(PROVE_TESTS),$(test_path))
25+
endef
26+
27+
else
28+
citus_prove_installcheck = @echo "TAP tests not enabled when postgres was compiled"
29+
endif
30+
31+
installcheck:
32+
$(citus_prove_installcheck)
33+
34+
clean distclean maintainer-clean:
35+
rm -rf tmp_check

src/test/tap/citus_helpers.pm

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use strict;
2+
use warnings;
3+
4+
my $pg_major_version = int($ENV{'pg_major_version'} || 0);
5+
if ($pg_major_version >= 15) {
6+
eval "use PostgreSQL::Test::Cluster";
7+
eval "use PostgreSQL::Test::Utils";
8+
} else {
9+
eval "use PostgresNode";
10+
}
11+
12+
our $NODE_TYPE_COORDINATOR = 1;
13+
our $NODE_TYPE_WORKER = 2;
14+
15+
sub create_node {
16+
my ($name,$node_type,$host, $port, $config) = @_;
17+
my $node;
18+
if ($pg_major_version >= 15) {
19+
$PostgreSQL::Test::Cluster::use_unix_sockets = 0;
20+
$PostgreSQL::Test::Cluster::use_tcp = 1;
21+
$PostgreSQL::Test::Cluster::test_pghost = 'localhost';
22+
my %params = ( host => 'localhost' );
23+
$params{port} = $port if defined $port;
24+
$node = PostgreSQL::Test::Cluster->new($name, %params);
25+
} else {
26+
$PostgresNode::use_tcp = 1;
27+
$PostgresNode::test_pghost = '127.0.0.1';
28+
my %params = ( host => 'localhost' );
29+
$params{port} = $port if defined $port;
30+
$node = get_new_node($name, %params);
31+
}
32+
$port++ if defined $port;
33+
34+
my $citus_config_options = "
35+
max_connections = 100
36+
max_wal_senders = 100
37+
max_replication_slots = 100
38+
log_statement = 'all'
39+
ssl = off
40+
logging_collector = on
41+
log_directory = 'log'
42+
log_filename = 'postgresql.log'
43+
";
44+
45+
if ($config) {
46+
$citus_config_options .= $config;
47+
}
48+
49+
$node->init(allows_streaming => 'logical');
50+
if ($node_type == $NODE_TYPE_COORDINATOR || $node_type == $NODE_TYPE_WORKER) {
51+
$node->append_conf("postgresql.conf", "shared_preload_libraries = 'citus'\n" . $citus_config_options);
52+
$node->start();
53+
if (-f $node->data_dir . '/server.key') {
54+
chmod 0600, $node->data_dir . '/server.key';
55+
}
56+
$node->safe_psql('postgres', "CREATE EXTENSION citus;");
57+
}
58+
59+
return $node;
60+
}
61+
62+
sub create_citus_cluster {
63+
my ($num_workers,$host,$port,$citus_config) = @_;
64+
my @workers = ();
65+
my $node_coordinator;
66+
$node_coordinator = create_node('coordinator', $NODE_TYPE_COORDINATOR,$host, $port, $citus_config);
67+
my $coord_host = $node_coordinator->host();
68+
my $coord_port = $node_coordinator->port();
69+
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.citus_set_coordinator_host('$coord_host', $coord_port);");
70+
for (my $i = 0; $i < $num_workers; $i++) {
71+
$port = $port ? $port + 1 : undef;
72+
my $node_worker = create_node("worker$i", $NODE_TYPE_WORKER,"localhost", $port, $citus_config);
73+
my $node_worker_host = $node_worker->host();
74+
my $node_worker_port = $node_worker->port();
75+
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.citus_add_node('$node_worker_host', $node_worker_port);");
76+
push @workers, $node_worker;
77+
}
78+
return $node_coordinator, @workers;
79+
}
80+
81+
1;

src/test/tap/postgresql.conf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Minimal config for TAP tests
2+
shared_preload_libraries = 'citus'
3+
max_wal_senders = 10
4+
max_replication_slots = 10
5+
max_connections = 100
6+
log_statement = 'all'
7+
ssl = off

0 commit comments

Comments
 (0)