Skip to content

Commit b8e4870

Browse files
nicarlenzo-cappacyrilgdn
authored
feat: Add resource for subscriptions (#244)
Co-authored-by: Enzo Cappa <[email protected]> Co-authored-by: Cyril Gaudin <[email protected]>
1 parent 26ec0cf commit b8e4870

6 files changed

+695
-1
lines changed

postgresql/config.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ var (
104104

105105
// publication is Supported
106106
featurePublication: semver.MustParseRange(">=10.0.0"),
107+
107108
// We do not support CREATE FUNCTION for Postgresql < 8.4
108109
featureFunction: semver.MustParseRange(">=8.4.0"),
109110
// CREATE SERVER support
@@ -277,7 +278,8 @@ func (c *Client) Connect() (*DBConnection, error) {
277278
db, err = postgres.Open(context.Background(), dsn)
278279
}
279280
if err != nil {
280-
return nil, fmt.Errorf("Error connecting to PostgreSQL server %s (scheme: %s): %w", c.config.Host, c.config.Scheme, err)
281+
errString := strings.Replace(err.Error(), c.config.Password, "XXXX", 2)
282+
return nil, fmt.Errorf("Error connecting to PostgreSQL server %s (scheme: %s): %s", c.config.Host, c.config.Scheme, errString)
281283
}
282284

283285
// We don't want to retain connection

postgresql/provider.go

+1
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ func Provider() *schema.Provider {
171171
"postgresql_grant_role": resourcePostgreSQLGrantRole(),
172172
"postgresql_replication_slot": resourcePostgreSQLReplicationSlot(),
173173
"postgresql_publication": resourcePostgreSQLPublication(),
174+
"postgresql_subscription": resourcePostgreSQLSubscription(),
174175
"postgresql_physical_replication_slot": resourcePostgreSQLPhysicalReplicationSlot(),
175176
"postgresql_schema": resourcePostgreSQLSchema(),
176177
"postgresql_role": resourcePostgreSQLRole(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
package postgresql
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"log"
7+
"strings"
8+
9+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
10+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
11+
"github.com/lib/pq"
12+
)
13+
14+
func resourcePostgreSQLSubscription() *schema.Resource {
15+
return &schema.Resource{
16+
Create: PGResourceFunc(resourcePostgreSQLSubscriptionCreate),
17+
Read: PGResourceFunc(resourcePostgreSQLSubscriptionRead),
18+
Delete: PGResourceFunc(resourcePostgreSQLSubscriptionDelete),
19+
Exists: PGResourceExistsFunc(resourcePostgreSQLSubscriptionExists),
20+
Importer: &schema.ResourceImporter{StateContext: schema.ImportStatePassthroughContext},
21+
22+
Schema: map[string]*schema.Schema{
23+
"name": {
24+
Type: schema.TypeString,
25+
Required: true,
26+
ForceNew: true,
27+
Description: "The name of the subscription",
28+
ValidateFunc: validation.StringIsNotEmpty,
29+
},
30+
"database": {
31+
Type: schema.TypeString,
32+
Optional: true,
33+
Computed: true,
34+
ForceNew: true,
35+
Description: "Sets the database to add the subscription for",
36+
},
37+
"conninfo": {
38+
Type: schema.TypeString,
39+
Required: true,
40+
ForceNew: true,
41+
Sensitive: true,
42+
Description: "The connection string to the publisher. It should follow the keyword/value format (https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING)",
43+
ValidateFunc: validation.StringIsNotEmpty,
44+
},
45+
"publications": {
46+
Type: schema.TypeSet,
47+
Required: true,
48+
ForceNew: true,
49+
Elem: &schema.Schema{Type: schema.TypeString},
50+
Description: "Names of the publications on the publisher to subscribe to",
51+
},
52+
"create_slot": {
53+
Type: schema.TypeBool,
54+
Optional: true,
55+
ForceNew: true,
56+
Default: true,
57+
Description: "Specifies whether the command should create the replication slot on the publisher",
58+
},
59+
"slot_name": {
60+
Type: schema.TypeString,
61+
Optional: true,
62+
ForceNew: true,
63+
Description: "Name of the replication slot to use. The default behavior is to use the name of the subscription for the slot name",
64+
ValidateFunc: validation.StringIsNotEmpty,
65+
},
66+
},
67+
}
68+
}
69+
70+
func resourcePostgreSQLSubscriptionCreate(db *DBConnection, d *schema.ResourceData) error {
71+
subName := d.Get("name").(string)
72+
databaseName := getDatabaseForSubscription(d, db.client.databaseName)
73+
74+
publications, err := getPublicationsForSubscription(d)
75+
if err != nil {
76+
return fmt.Errorf("could not get publications: %w", err)
77+
}
78+
connInfo, err := getConnInfoForSubscription(d)
79+
if err != nil {
80+
return fmt.Errorf("could not get conninfo: %w", err)
81+
}
82+
83+
optionalParams := getOptionalParameters(d)
84+
85+
// Creating of a subscription can not be done in an transaction
86+
client := db.client.config.NewClient(databaseName)
87+
conn, err := client.Connect()
88+
if err != nil {
89+
return fmt.Errorf("could not establish database connection: %w", err)
90+
}
91+
92+
sql := fmt.Sprintf("CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s %s;",
93+
pq.QuoteIdentifier(subName),
94+
pq.QuoteLiteral(connInfo),
95+
publications,
96+
optionalParams,
97+
)
98+
if _, err := conn.Exec(sql); err != nil {
99+
return fmt.Errorf("could not execute sql: %w", err)
100+
}
101+
102+
d.SetId(generateSubscriptionID(d, databaseName))
103+
104+
return resourcePostgreSQLSubscriptionReadImpl(db, d)
105+
}
106+
107+
func resourcePostgreSQLSubscriptionRead(db *DBConnection, d *schema.ResourceData) error {
108+
return resourcePostgreSQLSubscriptionReadImpl(db, d)
109+
}
110+
111+
func resourcePostgreSQLSubscriptionReadImpl(db *DBConnection, d *schema.ResourceData) error {
112+
databaseName, subName, err := getDBSubscriptionName(d, db.client)
113+
if err != nil {
114+
return fmt.Errorf("could not get subscription name: %w", err)
115+
}
116+
117+
txn, err := startTransaction(db.client, databaseName)
118+
if err != nil {
119+
return fmt.Errorf("could not start transaction: %w", err)
120+
}
121+
defer deferredRollback(txn)
122+
123+
var publications []string
124+
var connInfo string
125+
var slotName string
126+
127+
var subExists bool
128+
queryExists := "SELECT TRUE FROM pg_catalog.pg_stat_subscription WHERE subname = $1"
129+
err = txn.QueryRow(queryExists, pqQuoteLiteral(subName)).Scan(&subExists)
130+
if err != nil {
131+
return fmt.Errorf("Failed to check subscription: %w", err)
132+
}
133+
134+
if !subExists {
135+
log.Printf("[WARN] PostgreSQL Subscription (%s) not found for database %s", subName, databaseName)
136+
d.SetId("")
137+
return nil
138+
}
139+
140+
// pg_subscription requires superuser permissions, it is okay to fail here
141+
query := "SELECT subconninfo, subpublications, subslotname FROM pg_catalog.pg_subscription WHERE subname = $1"
142+
err = txn.QueryRow(query, pqQuoteLiteral(subName)).Scan(&connInfo, pq.Array(&publications), &slotName)
143+
144+
if err != nil {
145+
// we already checked that the subscription exists
146+
connInfo, err := getConnInfoForSubscription(d)
147+
if err != nil {
148+
return fmt.Errorf("could not get conninfo: %w", err)
149+
}
150+
d.Set("conninfo", connInfo)
151+
152+
setPublications, ok := d.GetOk("publications")
153+
if !ok {
154+
return fmt.Errorf("Attribute publications is not set")
155+
}
156+
publications := setPublications.(*schema.Set).List()
157+
d.Set("publications", publications)
158+
} else {
159+
d.Set("conninfo", connInfo)
160+
d.Set("publications", publications)
161+
}
162+
d.Set("name", subName)
163+
d.Set("database", databaseName)
164+
d.SetId(generateSubscriptionID(d, databaseName))
165+
166+
createSlot, okCreate := d.GetOkExists("create_slot") //nolint:staticcheck
167+
if okCreate {
168+
d.Set("create_slot", createSlot.(bool))
169+
}
170+
_, okSlotName := d.GetOk("slot_name")
171+
if okSlotName {
172+
d.Set("slot_name", slotName)
173+
}
174+
175+
return nil
176+
}
177+
178+
func resourcePostgreSQLSubscriptionDelete(db *DBConnection, d *schema.ResourceData) error {
179+
subName := d.Get("name").(string)
180+
createSlot := d.Get("create_slot").(bool)
181+
182+
databaseName := getDatabaseForSubscription(d, db.client.databaseName)
183+
184+
// Dropping a subscription can not be done in a transaction
185+
client := db.client.config.NewClient(databaseName)
186+
conn, err := client.Connect()
187+
if err != nil {
188+
return fmt.Errorf("could not establish database connection: %w", err)
189+
}
190+
191+
// disable subscription and unset the slot before dropping in order to keep the replication slot
192+
if !createSlot {
193+
sql := fmt.Sprintf("ALTER SUBSCRIPTION %s DISABLE", pq.QuoteIdentifier(subName))
194+
if _, err := conn.Exec(sql); err != nil {
195+
return fmt.Errorf("could not execute sql: %w", err)
196+
}
197+
sql = fmt.Sprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", pq.QuoteIdentifier(subName))
198+
if _, err := conn.Exec(sql); err != nil {
199+
return fmt.Errorf("could not execute sql: %w", err)
200+
}
201+
}
202+
203+
sql := fmt.Sprintf("DROP SUBSCRIPTION %s", pq.QuoteIdentifier(subName))
204+
205+
if _, err := conn.Exec(sql); err != nil {
206+
return fmt.Errorf("could not execute sql: %w", err)
207+
}
208+
209+
d.SetId("")
210+
211+
return nil
212+
}
213+
214+
func resourcePostgreSQLSubscriptionExists(db *DBConnection, d *schema.ResourceData) (bool, error) {
215+
var subName string
216+
217+
database, subName, err := getDBSubscriptionName(d, db.client)
218+
if err != nil {
219+
return false, err
220+
}
221+
222+
// Check if the database exists
223+
exists, err := dbExists(db, database)
224+
if err != nil || !exists {
225+
return false, err
226+
}
227+
228+
txn, err := startTransaction(db.client, database)
229+
if err != nil {
230+
return false, err
231+
}
232+
defer deferredRollback(txn)
233+
234+
query := "SELECT subname from pg_catalog.pg_stat_subscription WHERE subname = $1"
235+
err = txn.QueryRow(query, pqQuoteLiteral(subName)).Scan(&subName)
236+
237+
switch {
238+
case err == sql.ErrNoRows:
239+
return false, nil
240+
case err != nil:
241+
return false, err
242+
}
243+
244+
return true, nil
245+
}
246+
247+
func getPublicationsForSubscription(d *schema.ResourceData) (string, error) {
248+
var publicationsString string
249+
setPublications, ok := d.GetOk("publications")
250+
251+
if !ok {
252+
return publicationsString, fmt.Errorf("Attribute publications is not set")
253+
}
254+
255+
publications := setPublications.(*schema.Set).List()
256+
var plist []string
257+
if elem, ok := isUniqueArr(publications); !ok {
258+
return publicationsString, fmt.Errorf("'%s' is duplicated for attribute publications", elem.(string))
259+
}
260+
for _, p := range publications {
261+
plist = append(plist, pq.QuoteIdentifier(p.(string)))
262+
}
263+
264+
return strings.Join(plist, ", "), nil
265+
}
266+
267+
func getConnInfoForSubscription(d *schema.ResourceData) (string, error) {
268+
var connInfo string
269+
setConnInfo, ok := d.GetOk("conninfo")
270+
if !ok {
271+
return connInfo, fmt.Errorf("Attribute conninfo is not set")
272+
}
273+
return setConnInfo.(string), nil
274+
}
275+
276+
func generateSubscriptionID(d *schema.ResourceData, databaseName string) string {
277+
return strings.Join([]string{databaseName, d.Get("name").(string)}, ".")
278+
}
279+
280+
func getDatabaseForSubscription(d *schema.ResourceData, databaseName string) string {
281+
if v, ok := d.GetOk("database"); ok {
282+
databaseName = v.(string)
283+
}
284+
285+
return databaseName
286+
}
287+
288+
// getDBSubscriptionName returns database and subscription name. If we are importing this
289+
// resource, they will be parsed from the resource ID (it will return an error if parsing failed)
290+
// otherwise they will be simply get from the state.
291+
func getDBSubscriptionName(d *schema.ResourceData, client *Client) (string, string, error) {
292+
database := getDatabaseForSubscription(d, client.databaseName)
293+
subName := d.Get("name").(string)
294+
295+
// When importing, we have to parse the ID to find subscription and database names.
296+
if subName == "" {
297+
parsed := strings.Split(d.Id(), ".")
298+
if len(parsed) != 2 {
299+
return "", "", fmt.Errorf("Subscription ID %s has not the expected format 'database.subscriptionName': %v", d.Id(), parsed)
300+
}
301+
database = parsed[0]
302+
subName = parsed[1]
303+
}
304+
305+
return database, subName, nil
306+
}
307+
308+
// slotName and createSlot require recreation of the subscription, only return WITH ...
309+
func getOptionalParameters(d *schema.ResourceData) string {
310+
parameterSQLTemplate := "WITH (%s)"
311+
returnValue := ""
312+
313+
createSlot, okCreate := d.GetOkExists("create_slot") //nolint:staticcheck
314+
slotName, okName := d.GetOk("slot_name")
315+
316+
if !okCreate && !okName {
317+
// use default behavior, no WITH statement
318+
return ""
319+
}
320+
321+
var params []string
322+
if okCreate {
323+
params = append(params, fmt.Sprintf("%s = %t", "create_slot", createSlot.(bool)))
324+
}
325+
if okName {
326+
params = append(params, fmt.Sprintf("%s = %s", "slot_name", pq.QuoteLiteral(slotName.(string))))
327+
}
328+
329+
returnValue = fmt.Sprintf(parameterSQLTemplate, strings.Join(params, ", "))
330+
return returnValue
331+
}
332+
333+
func getSubscriptionNameFromID(ID string) string {
334+
splitted := strings.Split(ID, ".")
335+
return splitted[0]
336+
}

0 commit comments

Comments
 (0)