Skip to content

Commit ddf5f37

Browse files
Classify missing table on PG (#2906)
Co-authored-by: Kevin Biju <[email protected]>
1 parent 4b45b15 commit ddf5f37

File tree

3 files changed

+21
-3
lines changed

3 files changed

+21
-3
lines changed

flow/alerting/classifier.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/jackc/pgx/v5/pgconn"
2020
"golang.org/x/crypto/ssh"
2121

22+
"github.com/PeerDB-io/peerdb/flow/shared"
2223
"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
2324
)
2425

@@ -89,6 +90,9 @@ var (
8990
ErrorNotifySlotInvalid = ErrorClass{
9091
Class: "NOTIFY_SLOT_INVALID", action: NotifyUser,
9192
}
93+
ErrorNotifySourceTableMissing = ErrorClass{
94+
Class: "NOTIFY_SOURCE_TABLE_MISSING", action: NotifyUser,
95+
}
9296
ErrorNotifyPublicationMissing = ErrorClass{
9397
Class: "NOTIFY_PUBLICATION_MISSING", action: NotifyUser,
9498
}
@@ -210,6 +214,12 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
210214
}
211215
}
212216

217+
if errors.Is(err, shared.ErrTableDoesNotExist) {
218+
return ErrorNotifySourceTableMissing, ErrorInfo{
219+
Source: ErrorSourcePostgres,
220+
Code: "TABLE_DOES_NOT_EXIST",
221+
}
222+
}
213223
if pgErr != nil {
214224
switch pgErr.Code {
215225
case pgerrcode.InvalidAuthorizationSpecification,

flow/connectors/postgres/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,6 @@ const (
100100
ReplicaIdentityNothing ReplicaIdentityType = 'n'
101101
)
102102

103-
var ErrSlotAlreadyExists error = errors.New("slot already exists")
104-
105103
// getRelIDForTable returns the relation ID for a table.
106104
func (c *PostgresConnector) getRelIDForTable(ctx context.Context, schemaTable *utils.SchemaTable) (uint32, error) {
107105
var relID pgtype.Uint32
@@ -110,6 +108,9 @@ func (c *PostgresConnector) getRelIDForTable(ctx context.Context, schemaTable *u
110108
ON n.oid = c.relnamespace WHERE n.nspname=$1 AND c.relname=$2`,
111109
schemaTable.Schema, schemaTable.Table).Scan(&relID)
112110
if err != nil {
111+
if errors.Is(err, pgx.ErrNoRows) {
112+
return 0, shared.ErrTableDoesNotExist
113+
}
113114
return 0, fmt.Errorf("error getting relation ID for table %s: %w", schemaTable, err)
114115
}
115116

@@ -440,7 +441,7 @@ func (c *PostgresConnector) createSlotAndPublication(
440441
c.logger.Info(fmt.Sprintf("Replication slot '%s' already exists", slot))
441442
var err error
442443
if doInitialCopy {
443-
err = ErrSlotAlreadyExists
444+
err = shared.ErrSlotAlreadyExists
444445
}
445446
return model.SetupReplicationResult{SlotName: slot}, err
446447
}

flow/shared/err_types.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
package shared
22

3+
import "errors"
4+
5+
var (
6+
ErrSlotAlreadyExists error = errors.New("slot already exists")
7+
ErrTableDoesNotExist error = errors.New("table does not exist")
8+
)
9+
310
type ErrType string
411

512
const (

0 commit comments

Comments
 (0)