Skip to content

Commit 82cb06f

Browse files
youenadrienaury
andauthored
feat: Add upsert operation mode (#400)
* feat: Add `upsert` operation mode * test(push): fix upsert test --------- Co-authored-by: Adrien Aury <[email protected]>
1 parent 5da0ca5 commit 82cb06f

14 files changed

+366
-28
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ Types of changes
1414
- `Fixed` for any bug fixes.
1515
- `Security` in case of vulnerabilities.
1616

17-
## [3.5.1]
17+
## [3.6.0]
1818

19+
- `Added` new push mode `upsert` to insert or update rows based on primary keys
1920
- `Fixed` relations extraction with Oracle database create duplicated relations
2021

2122
## [3.5.0]

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,18 @@ lino push update source --table actor <<<'{"actor_id":998,"last_name":"CHASE","_
449449

450450
The `__usingpk__` field can also be used with an ingress descriptor at any level in the data. The name of this field can be changed to another value with the `--using-pk-field` flag.
451451

452+
### Upsert
453+
454+
The `upsert` mode allows to insert new rows or update existing ones based on the primary key.
455+
456+
```bash
457+
$ lino push upsert dest < data.jsonl
458+
```
459+
460+
If a row with the same primary key already exists in the destination table, it will be updated with the new values. If it does not exist, it will be inserted.
461+
462+
**Note:** This feature is currently supported for PostgreSQL and Oracle databases.
463+
452464
### How to recover from error
453465

454466
Use options `lino pull --exclude-from-file` (shortcut `-X`) and `lino push --savepoint` combined to handle error recovery. The process will restart where it failed if an error has interrupted it in a previous run.

internal/app/push/cli.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
102102
)
103103

104104
cmd := &cobra.Command{
105-
Use: "push {<truncate>|<insert>|<update>|<delete>} [Data Connector Name]",
105+
Use: "push {<truncate>|<insert>|<update>|<delete>|<upsert>} [Data Connector Name]",
106106
Short: "Push data to a database with a pushing mode (insert by default)",
107107
Long: "",
108108
Example: fmt.Sprintf(" %[1]s push truncate dstdatabase\n %[1]s push dstdatabase", fullName),

internal/infra/push/datadestination_db2.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ func (d Db2Dialect) InsertStatement(tableName string, selectValues []ValueDescri
9191
return sql.String(), selectValues
9292
}
9393

94+
func (d Db2Dialect) UpsertStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
95+
return "", nil, &push.Error{Description: "upsert not implemented for db2"}
96+
}
97+
9498
// UpdateStatement
9599
func (d Db2Dialect) UpdateStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
96100
sql := &strings.Builder{}

internal/infra/push/datadestination_db2_dummy.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ func (d Db2Dialect) InsertStatement(tableName string, selectValues []ValueDescri
6767
panic(fmt.Errorf("Not implemented"))
6868
}
6969

70+
// UpsertStatement
71+
func (d Db2Dialect) UpsertStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
72+
panic(fmt.Errorf("Not implemented"))
73+
}
74+
7075
// UpdateStatement
7176
func (d Db2Dialect) UpdateStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
7277
panic(fmt.Errorf("Not implemented"))

internal/infra/push/datadestination_mariadb.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ func (d MariadbDialect) InsertStatement(tableName string, selectValues []ValueDe
8989
return sql.String(), selectValues
9090
}
9191

92+
func (d MariadbDialect) UpsertStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
93+
return "", nil, &push.Error{Description: "upsert not implemented for mariadb"}
94+
}
95+
9296
func (d MariadbDialect) UpdateStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
9397
sql := &strings.Builder{}
9498
sql.WriteString("UPDATE ")

internal/infra/push/datadestination_oracle.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,63 @@ func (d OracleDialect) InsertStatement(tableName string, selectValues []ValueDes
124124
return sql.String(), selectValues
125125
}
126126

127+
// UpsertStatement
128+
func (d OracleDialect) UpsertStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
129+
sql := &strings.Builder{}
130+
sql.WriteString("MERGE INTO ")
131+
sql.WriteString(tableName)
132+
sql.WriteString(" target USING (SELECT ")
133+
134+
for i, col := range selectValues {
135+
if i > 0 {
136+
sql.WriteString(", ")
137+
}
138+
sql.WriteString(d.Placeholder(i + 1))
139+
sql.WriteString(" AS \"")
140+
sql.WriteString(col.name)
141+
sql.WriteString("\"")
142+
}
143+
sql.WriteString(" FROM dual) source ON (")
144+
145+
for i, pk := range primaryKeys {
146+
if i > 0 {
147+
sql.WriteString(" AND ")
148+
}
149+
sql.WriteString(fmt.Sprintf("target.\"%s\" = source.\"%s\"", pk, pk))
150+
}
151+
sql.WriteString(") WHEN MATCHED THEN UPDATE SET ")
152+
153+
first := true
154+
for _, col := range selectValues {
155+
if isAPrimaryKey(col.name, primaryKeys) {
156+
continue
157+
}
158+
if !first {
159+
sql.WriteString(", ")
160+
}
161+
sql.WriteString(fmt.Sprintf("target.\"%s\" = source.\"%s\"", col.name, col.name))
162+
first = false
163+
}
164+
165+
sql.WriteString(" WHEN NOT MATCHED THEN INSERT (")
166+
for i, col := range selectValues {
167+
if i > 0 {
168+
sql.WriteString(", ")
169+
}
170+
sql.WriteString(fmt.Sprintf("\"%s\"", col.name))
171+
}
172+
sql.WriteString(") VALUES (")
173+
for i, col := range selectValues {
174+
if i > 0 {
175+
sql.WriteString(", ")
176+
}
177+
sql.WriteString(fmt.Sprintf("source.\"%s\"", col.name))
178+
}
179+
sql.WriteString(")")
180+
181+
return sql.String(), selectValues, nil
182+
}
183+
127184
// UpdateStatement
128185
func (d OracleDialect) UpdateStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
129186
sql := &strings.Builder{}

internal/infra/push/datadestination_postgres.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,49 @@ func (d PostgresDialect) InsertStatement(tableName string, selectValues []ValueD
100100
return sql.String(), selectValues
101101
}
102102

103+
func (d PostgresDialect) UpsertStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
104+
protectedColumns := []string{}
105+
for _, c := range selectValues {
106+
protectedColumns = append(protectedColumns, fmt.Sprintf("\"%s\"", c.name))
107+
}
108+
109+
sql := &strings.Builder{}
110+
sql.WriteString("INSERT INTO ")
111+
sql.WriteString(tableName)
112+
sql.WriteString("(")
113+
sql.WriteString(strings.Join(protectedColumns, ","))
114+
sql.WriteString(") VALUES (")
115+
for i := 1; i <= len(selectValues); i++ {
116+
sql.WriteString(d.Placeholder(i))
117+
if i < len(selectValues) {
118+
sql.WriteString(", ")
119+
}
120+
}
121+
122+
if len(primaryKeys) > 0 {
123+
sql.WriteString(") ON CONFLICT (")
124+
sql.WriteString(strings.Join(primaryKeys, ","))
125+
sql.WriteString(") DO UPDATE SET ")
126+
127+
first := true
128+
for _, column := range selectValues {
129+
// Skip primary keys in update set
130+
if isAPrimaryKey(column.name, primaryKeys) {
131+
continue
132+
}
133+
if !first {
134+
sql.WriteString(", ")
135+
}
136+
sql.WriteString(fmt.Sprintf("\"%s\" = EXCLUDED.\"%s\"", column.name, column.name))
137+
first = false
138+
}
139+
} else {
140+
sql.WriteString(")")
141+
}
142+
143+
return sql.String(), selectValues, nil
144+
}
145+
103146
func (d PostgresDialect) UpdateStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
104147
sql := &strings.Builder{}
105148
sql.WriteString("UPDATE ")

internal/infra/push/datadestination_sql.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,12 @@ func (rw *SQLRowWriter) createStatement(row push.Row, where push.Row) *push.Erro
299299
return pusherr
300300
}
301301

302+
case rw.dd.mode == push.Upsert:
303+
prepareStmt, rw.headers, pusherr = rw.dd.dialect.UpsertStatement(rw.tableName(), selectValues, whereValues, rw.table.PrimaryKey())
304+
if pusherr != nil {
305+
return pusherr
306+
}
307+
302308
default: // Insert:
303309
prepareStmt, rw.headers = rw.dd.dialect.InsertStatement(rw.tableName(), selectValues, rw.table.PrimaryKey())
304310
}
@@ -376,6 +382,7 @@ func (rw *SQLRowWriter) Write(row push.Row, where push.Row) *push.Error {
376382
rw.sqlLogger.Write(values)
377383

378384
_, err2 := rw.statement.Exec(values...)
385+
log.Trace().AnErr("error", err2).Msg("push error")
379386
if err2 != nil {
380387
// reset statement after error
381388
if err := rw.close(); err != nil {
@@ -483,6 +490,7 @@ type SQLDialect interface {
483490
EnableConstraintsStatement(tableName string) string
484491
TruncateStatement(tableName string) string
485492
InsertStatement(tableName string, selectValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor)
493+
UpsertStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error)
486494
UpdateStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error)
487495
IsDuplicateError(error) bool
488496
ConvertValue(push.Value, ValueDescriptor) push.Value

internal/infra/push/datadestination_sqlserver.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ func (d SQLServerDialect) InsertStatement(tableName string, selectValues []Value
8787
return sql.String(), selectValues
8888
}
8989

90+
func (d SQLServerDialect) UpsertStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
91+
return "", nil, &push.Error{Description: "upsert not implemented for sqlserver"}
92+
}
93+
9094
func (d SQLServerDialect) UpdateStatement(tableName string, selectValues []ValueDescriptor, whereValues []ValueDescriptor, primaryKeys []string) (statement string, headers []ValueDescriptor, err *push.Error) {
9195
sql := &strings.Builder{}
9296
sql.WriteString("UPDATE ")

0 commit comments

Comments
 (0)