Skip to content

Commit 803f0ac

Browse files
authored
Disallow colocating tables when collations don't match (#8257)
DESCRIPTION: Disallows creating a distributed table or altering it to be colocated with another table if distribution key collations don't match. Fixes #7683.
2 parents bd72123 + 489a31a commit 803f0ac

29 files changed

+983
-135
lines changed

src/backend/distributed/commands/alter_table.c

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2111,7 +2111,7 @@ CheckAlterDistributedTableConversionParameters(TableConversionState *con)
21112111
{
21122112
ereport(ERROR, (errmsg("cannot colocate with %s and change distribution "
21132113
"column to %s because data type of column %s is "
2114-
"different then the distribution column of the %s",
2114+
"different than the distribution column of the %s",
21152115
con->colocateWith, con->distributionColumn,
21162116
con->distributionColumn, con->colocateWith)));
21172117
}
@@ -2122,6 +2122,23 @@ CheckAlterDistributedTableConversionParameters(TableConversionState *con)
21222122
"distribution column is different than %s",
21232123
con->colocateWith, con->relationName)));
21242124
}
2125+
else if (con->distributionColumn &&
2126+
colocateWithPartKey->varcollid != con->distributionKey->varcollid)
2127+
{
2128+
ereport(ERROR, (errmsg("cannot colocate with %s and change distribution "
2129+
"column to %s because collation of column %s is "
2130+
"different than the distribution column of the %s",
2131+
con->colocateWith, con->distributionColumn,
2132+
con->distributionColumn, con->colocateWith)));
2133+
}
2134+
else if (!con->distributionColumn &&
2135+
colocateWithPartKey->varcollid != con->originalDistributionKey->varcollid
2136+
)
2137+
{
2138+
ereport(ERROR, (errmsg("cannot colocate with %s because collation of its "
2139+
"distribution column is different than %s",
2140+
con->colocateWith, con->relationName)));
2141+
}
21252142
}
21262143

21272144
if (!con->suppressNoticeMessages)

src/backend/distributed/commands/create_distributed_table.c

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -531,8 +531,6 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
531531
Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
532532
distributionColumnName,
533533
NoLock);
534-
Oid distributionColumnType = distributionColumn->vartype;
535-
Oid distributionColumnCollation = distributionColumn->varcollid;
536534

537535
/* get an advisory lock to serialize concurrent default group creations */
538536
if (IsColocateWithDefault(colocateWithTableName))
@@ -547,8 +545,7 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
547545
*/
548546
uint32 colocationId = FindColocateWithColocationId(relationId,
549547
replicationModel,
550-
distributionColumnType,
551-
distributionColumnCollation,
548+
distributionColumn,
552549
shardCount,
553550
shardCountIsStrict,
554551
colocateWithTableName);
@@ -697,19 +694,14 @@ EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
697694
char replicationModel = DecideDistTableReplicationModel(distributionMethod,
698695
colocateWithTableName);
699696

700-
/*
701-
* we fail transaction before local table conversion if the table could not be colocated with
702-
* given table. We should make those checks after local table conversion by acquiring locks to
703-
* the relation because the distribution column can be modified in that period.
704-
*/
705-
Oid distributionColumnType = ColumnTypeIdForRelationColumnName(
706-
relationId,
707-
distributionColumnName);
708-
709697
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
710698
Oid colocateWithTableId = ResolveRelationId(colocateWithTableNameText, false);
699+
700+
Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
701+
distributionColumnName,
702+
NoLock);
711703
EnsureTableCanBeColocatedWith(relationId, replicationModel,
712-
distributionColumnType, colocateWithTableId);
704+
distributionColumn, colocateWithTableId);
713705
}
714706

715707

@@ -1993,10 +1985,11 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
19931985
* until this transaction is committed.
19941986
*/
19951987

1988+
/* distributionColumn can only be null for single-shard tables */
19961989
Oid distributionColumnType =
19971990
distributionColumn ? distributionColumn->vartype : InvalidOid;
19981991
Oid distributionColumnCollation =
1999-
distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid;
1992+
distributionColumn ? distributionColumn->varcollid : InvalidOid;
20001993

20011994
Assert(distributedTableParams->colocationParam.colocationParamType ==
20021995
COLOCATE_WITH_TABLE_LIKE_OPT);
@@ -2011,8 +2004,7 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
20112004

20122005
colocationId = FindColocateWithColocationId(relationId,
20132006
citusTableParams.replicationModel,
2014-
distributionColumnType,
2015-
distributionColumnCollation,
2007+
distributionColumn,
20162008
distributedTableParams->shardCount,
20172009
distributedTableParams->
20182010
shardCountIsStrict,

src/backend/distributed/sql/citus--13.2-1--14.0-1.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@
99
#include "udfs/coord_binary_combine_agg_ffunc/14.0-1.sql"
1010
#include "udfs/worker_binary_partial_agg/14.0-1.sql"
1111
#include "udfs/coord_binary_combine_agg/14.0-1.sql"
12+
13+
#include "udfs/fix_pre_citus14_colocation_group_collation_mismatches/14.0-1.sql"
14+
#include "udfs/citus_finish_citus_upgrade/14.0-1.sql"

src/backend/distributed/sql/downgrades/citus--14.0-1--13.2-1.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ DROP AGGREGATE IF EXISTS pg_catalog.coord_binary_combine_agg(oid, bytea, anyelem
99
DROP FUNCTION IF EXISTS pg_catalog.worker_binary_partial_agg_ffunc(internal);
1010
DROP FUNCTION IF EXISTS pg_catalog.coord_binary_combine_agg_sfunc(internal, oid, bytea, anyelement);
1111
DROP FUNCTION IF EXISTS pg_catalog.coord_binary_combine_agg_ffunc(internal, oid, bytea, anyelement);
12+
13+
#include "../udfs/citus_finish_citus_upgrade/11.0-2.sql"
14+
DROP FUNCTION IF EXISTS pg_catalog.fix_pre_citus14_colocation_group_collation_mismatches();

src/backend/distributed/sql/udfs/citus_finish_citus_upgrade/14.0-1.sql

Lines changed: 54 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/backend/distributed/sql/udfs/citus_finish_citus_upgrade/latest.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ BEGIN
3333
performed_upgrade := true;
3434
END IF;
3535

36+
IF last_upgrade_major_version < 14 THEN
37+
PERFORM fix_pre_citus14_colocation_group_collation_mismatches();
38+
performed_upgrade := true;
39+
END IF;
40+
3641
-- add new upgrade steps here
3742

3843
IF NOT performed_upgrade THEN

src/backend/distributed/sql/udfs/fix_pre_citus14_colocation_group_collation_mismatches/14.0-1.sql

Lines changed: 83 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
CREATE OR REPLACE FUNCTION pg_catalog.fix_pre_citus14_colocation_group_collation_mismatches()
2+
RETURNS VOID AS $func$
3+
DECLARE
4+
v_colocationid oid;
5+
v_tables_to_move_out_grouped_by_collation json;
6+
v_collationid oid;
7+
v_tables_to_move_out oid[];
8+
v_table_to_move_out oid;
9+
v_first_table oid;
10+
BEGIN
11+
SET LOCAL search_path TO pg_catalog;
12+
13+
FOR v_colocationid, v_tables_to_move_out_grouped_by_collation
14+
IN
15+
WITH colocation_groups_and_tables_with_collation_mismatches AS (
16+
SELECT pdc.colocationid, pa.attcollation as distkeycollation, pdp.logicalrelid
17+
FROM pg_dist_colocation pdc
18+
JOIN pg_dist_partition pdp
19+
ON pdc.colocationid = pdp.colocationid
20+
JOIN pg_attribute pa
21+
ON pa.attrelid = pdp.logicalrelid
22+
AND pa.attname = column_to_column_name(pdp.logicalrelid, pdp.partkey)
23+
-- column_to_column_name() returns NULL if partkey is NULL, so we're already
24+
-- implicitly ignoring the tables that don't have a distribution column, such
25+
-- as reference tables, but let's still explicitly discard such tables below.
26+
WHERE pdp.partkey IS NOT NULL
27+
-- ignore the table if its distribution column collation matches the collation saved for the colocation group
28+
AND pdc.distributioncolumncollation != pa.attcollation
29+
)
30+
SELECT
31+
colocationid,
32+
json_object_agg(distkeycollation, rels) AS tables_to_move_out_grouped_by_collation
33+
FROM (
34+
SELECT colocationid,
35+
distkeycollation,
36+
array_agg(logicalrelid::oid) AS rels
37+
FROM colocation_groups_and_tables_with_collation_mismatches
38+
GROUP BY colocationid, distkeycollation
39+
) q
40+
GROUP BY colocationid
41+
LOOP
42+
RAISE DEBUG 'Processing colocation group with id %', v_colocationid;
43+
44+
FOR v_collationid, v_tables_to_move_out
45+
IN
46+
SELECT key::oid AS collationid,
47+
array_agg(elem::oid) AS tables_to_move_out
48+
FROM json_each(v_tables_to_move_out_grouped_by_collation) AS e(key, value),
49+
LATERAL json_array_elements_text(e.value) AS elem
50+
GROUP BY key
51+
LOOP
52+
RAISE DEBUG 'Moving out tables with collation id % from colocation group %', v_collationid, v_colocationid;
53+
54+
v_first_table := NULL;
55+
56+
FOR v_table_to_move_out IN SELECT unnest(v_tables_to_move_out)
57+
LOOP
58+
IF v_first_table IS NULL then
59+
-- Move the first table out to start a new colocation group.
60+
--
61+
-- Could check if there is an appropriate colocation group to move to instead of 'none',
62+
-- but this won't be super easy. Plus, even if we had such a colocation group, the user
63+
-- was anyways okay with having this in a different colocation group in the first place.
64+
65+
RAISE DEBUG 'Moving out table with oid % to a new colocation group', v_table_to_move_out;
66+
PERFORM update_distributed_table_colocation(v_table_to_move_out, colocate_with => 'none');
67+
68+
-- save the first table to colocate the rest of the tables with it
69+
v_first_table := v_table_to_move_out;
70+
ELSE
71+
-- Move the rest of the tables to colocate with the first table.
72+
73+
RAISE DEBUG 'Moving out table with oid % to colocate with table with oid %', v_table_to_move_out, v_first_table;
74+
PERFORM update_distributed_table_colocation(v_table_to_move_out, colocate_with => v_first_table::regclass::text);
75+
END IF;
76+
END LOOP;
77+
END LOOP;
78+
END LOOP;
79+
END;
80+
$func$
81+
LANGUAGE plpgsql;
82+
COMMENT ON FUNCTION pg_catalog.fix_pre_citus14_colocation_group_collation_mismatches()
83+
IS 'Fix distributed tables whose colocation group collations do not match their distribution columns by moving them to new colocation groups';

0 commit comments

Comments
 (0)