diff --git a/internal/db/postgres/context/table.go b/internal/db/postgres/context/table.go index ad14a4b9..67143a55 100644 --- a/internal/db/postgres/context/table.go +++ b/internal/db/postgres/context/table.go @@ -267,7 +267,9 @@ func getColumnsConfig(ctx context.Context, tx pgx.Tx, oid toolkit.Oid, version i if ok { column.CanonicalTypeName = canonicalType.Name } - res = append(res, &column) + if !column.IsGenerated { + res = append(res, &column) + } idx++ } diff --git a/internal/db/postgres/dumpers/table.go b/internal/db/postgres/dumpers/table.go index 48cc97e1..db9fa89b 100644 --- a/internal/db/postgres/dumpers/table.go +++ b/internal/db/postgres/dumpers/table.go @@ -74,6 +74,7 @@ func (td *TableDumper) dumper(ctx context.Context, eg *errgroup.Group, w io.Writ return fmt.Errorf("cannot initialize validation pipeline: %w", err) } } else { + log.Debug().Msg("table has transformers") pipeline, err = NewTransformationPipeline(ctx, eg, td.table, w) if err != nil { return fmt.Errorf("cannot initialize transformation pipeline: %w", err) @@ -127,7 +128,7 @@ func (td *TableDumper) process(ctx context.Context, tx pgx.Tx, w io.WriteCloser, frontend := tx.Conn().PgConn().Frontend() query, err := td.table.GetCopyFromStatement() - log.Debug(). + log.Warn(). Str("query", query). Msgf("dumping table %s.%s using pgcopy query", td.table.Schema, td.table.Name) if err != nil { diff --git a/internal/db/postgres/dumpers/transformation_pipeline.go b/internal/db/postgres/dumpers/transformation_pipeline.go index f083253e..98bc3378 100644 --- a/internal/db/postgres/dumpers/transformation_pipeline.go +++ b/internal/db/postgres/dumpers/transformation_pipeline.go @@ -143,6 +143,7 @@ func (tp *TransformationPipeline) TransformSync(ctx context.Context, r *toolkit. func (tp *TransformationPipeline) TransformAsync(ctx context.Context, r *toolkit.Record) (*toolkit.Record, error) { var err error + log.Debug().Msg("transforming async") for _, w := range tp.transformationWindows { _, err = w.Transform(ctx, r) if err != nil { @@ -172,6 +173,15 @@ func (tp *TransformationPipeline) Dump(ctx context.Context, data []byte) (err er return NewDumpError(tp.table.Schema, tp.table.Name, tp.line, fmt.Errorf("error encoding RowDriver to []byte: %w", err)) } + // Print out the table column names + // log.Debug().Msg(fmt.Sprintf("Table columns: %s", tp.table.Columns)) + // log.Debug().Msg(fmt.Sprintf("Table schema: %s", tp.table.Schema)) + // Print out the table schema from the record + // log.Debug().Msg(fmt.Sprintf("Record schema: %s", tp.record.Driver.Table.Schema)) + // log.Debug().Msg(tp.row.Decode(data[:len(data)-1]).Error()) + // Print out the row + // log.Debug().Msg(fmt.Sprintf("Row: %s", tp.row)) + _, err = tp.w.Write(res) if err != nil { return NewDumpError(tp.table.Schema, tp.table.Name, tp.line, fmt.Errorf("error writing dumped data: %w", err)) diff --git a/internal/db/postgres/entries/table.go b/internal/db/postgres/entries/table.go index bc389a40..f7969f4a 100644 --- a/internal/db/postgres/entries/table.go +++ b/internal/db/postgres/entries/table.go @@ -24,6 +24,7 @@ import ( "github.com/greenmaskio/greenmask/internal/db/postgres/transformers/custom" "github.com/greenmaskio/greenmask/internal/db/postgres/transformers/utils" "github.com/greenmaskio/greenmask/pkg/toolkit" + "github.com/rs/zerolog/log" ) // Table - godoc @@ -83,6 +84,8 @@ func (t *Table) Entry() (*toc.Entry, error) { columns = append(columns, fmt.Sprintf(`"%s"`, column.Name)) } } + log.Warn().Msg("columns here") + log.Warn().Msg(strings.Join(columns, "")) var query = `COPY "%s"."%s" (%s) FROM stdin` var schemaName, tableName string @@ -121,6 +124,7 @@ func (t *Table) Entry() (*toc.Entry, error) { Owner: &owner, Desc: &toc.TableDataDesc, CopyStmt: ©Stmt, + Columns: columns, Dependencies: dependencies, NDeps: int32(len(dependencies)), FileName: &fileName, @@ -130,7 +134,15 @@ func (t *Table) Entry() (*toc.Entry, error) { } func (t *Table) GetCopyFromStatement() (string, error) { - query := fmt.Sprintf("COPY \"%s\".\"%s\" TO STDOUT", t.Schema, t.Name) + + columns := make([]string, 0, len(t.Columns)) + for _, column := range t.Columns { + if !column.IsGenerated { + columns = append(columns, fmt.Sprintf(`"%s"`, column.Name)) + } + } + + query := fmt.Sprintf("COPY \"%s\".\"%s\" (%s) TO STDOUT", t.Schema, t.Name, strings.Join(columns, ", ")) if t.Query != "" { query = fmt.Sprintf("COPY (%s) TO STDOUT", t.Query) } diff --git a/internal/db/postgres/restorers/table_insert_format.go b/internal/db/postgres/restorers/table_insert_format.go index 5700493e..09f00555 100644 --- a/internal/db/postgres/restorers/table_insert_format.go +++ b/internal/db/postgres/restorers/table_insert_format.go @@ -168,7 +168,7 @@ func (td *TableRestorerInsertFormat) streamInsertData(ctx context.Context, conn func (td *TableRestorerInsertFormat) generateInsertStmt(row *pgcopy.Row, onConflictDoNothing bool) string { var placeholders []string - for i := 0; i < row.Length(); i++ { + for i := 0; i < len(td.Entry.Columns); i++ { placeholders = append(placeholders, fmt.Sprintf("$%d", i+1)) } var onConflict string @@ -176,10 +176,16 @@ func (td *TableRestorerInsertFormat) generateInsertStmt(row *pgcopy.Row, onConfl onConflict = " ON CONFLICT DO NOTHING" } + // var columns []string = make([]string, row.Length()) + // for i := 0; i < row.Length(); i++ { + // columns[i] = fmt.Sprintf(`"%s"`, row.GetColumnName(i)) + // } + res := fmt.Sprintf( - `INSERT INTO %s.%s VALUES (%s)%s`, + `INSERT INTO %s.%s (%s) VALUES (%s)%s`, *td.Entry.Namespace, *td.Entry.Tag, + strings.Join(td.Entry.Columns, ", "), strings.Join(placeholders, ", "), onConflict, ) @@ -192,6 +198,22 @@ func (td *TableRestorerInsertFormat) insertDataOnConflictDoNothing( if td.query == "" { td.query = td.generateInsertStmt(row, td.doNothing) } + // log.Warn().Msg(td.query) + // log.Warn().Msg(fmt.Sprintf("%v", getAllArguments(row))) + // log.Warn().Msg(fmt.Sprintf("%v", td.Entry.Columns)) + + // log.Warn().Msg(fmt.Sprintf("%v", row.Length())) + // log.Warn().Msg(fmt.Sprintf("%v", len(td.Entry.Columns))) + // log.Warn().Msg(fmt.Sprintf("%v", len(getAllArguments(row)))) + + // array to string with comma separated values + + var displayString string + for i := 0; i < len(getAllArguments(row)); i++ { + displayString += fmt.Sprintf("%v, ", getAllArguments(row)[i]) + } + + // log.Warn().Msg(displayString) // TODO: The implementation based on pgx.Conn.Exec is not efficient for bulk inserts. // Consider rewrite to string literal that contains generated statement instead of using prepared statement diff --git a/internal/db/postgres/toc/entry.go b/internal/db/postgres/toc/entry.go index af902ed3..4abe1bc6 100644 --- a/internal/db/postgres/toc/entry.go +++ b/internal/db/postgres/toc/entry.go @@ -49,6 +49,7 @@ type Entry struct { Defn *string DropStmt *string CopyStmt *string + Columns []string Dependencies []int32 /* dumpIds of objects this one depends on */ NDeps int32 /* number of Dependencies */ FileName *string @@ -94,6 +95,10 @@ func (e *Entry) Copy() *Entry { if e.FileName != nil { res.FileName = NewObj(*e.FileName) } + if e.Columns != nil { + res.Columns = make([]string, len(e.Columns)) + copy(res.Columns, e.Columns) + } return res } diff --git a/internal/db/postgres/toc/reader.go b/internal/db/postgres/toc/reader.go index 264661d0..2caab6bc 100644 --- a/internal/db/postgres/toc/reader.go +++ b/internal/db/postgres/toc/reader.go @@ -20,6 +20,7 @@ import ( "io" "slices" "strconv" + "strings" "github.com/rs/zerolog/log" ) @@ -402,6 +403,20 @@ func (r *Reader) readEntries() ([]*Entry, error) { return nil, fmt.Errorf("cannot read Defn: %w", err) } entry.CopyStmt = copyStmt + + entry.Columns = make([]string, 0) + // pull column names from copyStmt + // example: COPY "public"."user_community_notification_preference" ("id", "person_id", "community_id", "notification_preference", "created_at", "updated_at", "church_slug") FROM stdin + if copyStmt != nil { + columns := strings.Split(*copyStmt, "(") + if len(columns) > 1 { + columns = strings.Split(columns[1], ")") + if len(columns) > 0 { + entry.Columns = strings.Split(columns[0], ",") + } + } + } + } if r.version >= BackupVersions["1.6"] {