Skip to content

Commit 75e3fff

Browse files
authored
feat: log sql (#365)
* feat(logsql): infra modification * feat(logsql): log sql query * feat(logsql): log sql query * feat(logsql): log data to csv * fix: nil pointer exception * test: log sql * test: log sql * test: log sql * test: log sql
1 parent 81ee5eb commit 75e3fff

File tree

10 files changed

+170
-1
lines changed

10 files changed

+170
-1
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ Types of changes
1414
- `Fixed` for any bug fixes.
1515
- `Security` in case of vulnerabilities.
1616

17+
## [3.2.0]
18+
19+
- `Added` flag `log-sql` (short `l`) to the push command
20+
1721
## [3.1.0]
1822

1923
- `Added` possibility to filter columns via `select` property in ingress descriptors

internal/app/push/cli.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
8585
savepoint string
8686
autoTruncate bool
8787
watch bool
88+
logSQLTo string
8889
)
8990

9091
cmd := &cobra.Command{
@@ -132,6 +133,12 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
132133
os.Exit(1)
133134
}
134135

136+
if logSQLTo != "" {
137+
if e := datadestination.OpenSQLLogger(logSQLTo); e != nil {
138+
log.Warn().Err(e).Msg("error while opening SQL logger")
139+
}
140+
}
141+
135142
plan, e2 := getPlan(idStorageFactory(table, ingressDescriptor), autoTruncate)
136143
if e2 != nil {
137144
fmt.Fprintln(err, e2.Error())
@@ -184,6 +191,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
184191
cmd.Flags().StringVar(&savepoint, "savepoint", "", "Name of a file to write primary keys of effectively processed lines (commit to database)")
185192
cmd.Flags().BoolVarP(&autoTruncate, "autotruncate", "a", false, "Automatically truncate values to the maximum length defined in table.yaml")
186193
cmd.Flags().BoolVarP(&watch, "watch", "w", false, "watch statistics about pushed lines")
194+
cmd.Flags().StringVarP(&logSQLTo, "log-sql", "l", "", "Log SQL requests and data to specified folder (1 file per table)")
187195
cmd.SetOut(out)
188196
cmd.SetErr(err)
189197
cmd.SetIn(in)

internal/infra/push/datadestination_http.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ func (dd *HTTPDataDestination) Commit() *push.Error {
9494
return nil
9595
}
9696

97+
func (dd *HTTPDataDestination) OpenSQLLogger(folderPath string) error {
98+
panic("not implemented")
99+
}
100+
97101
// RowWriter return HTTP table writer
98102
func (dd *HTTPDataDestination) RowWriter(table push.Table) (push.RowWriter, *push.Error) {
99103
rw, ok := dd.rowWriter[table.Name()]

internal/infra/push/datadestination_sql.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type SQLDataDestination struct {
3838
mode push.Mode
3939
disableConstraints bool
4040
dialect SQLDialect
41+
sqlLogger *SQLLogger
4142
}
4243

4344
// NewSQLDataDestination creates a new SQL datadestination.
@@ -180,6 +181,16 @@ func (dd *SQLDataDestination) RowWriter(table push.Table) (push.RowWriter, *push
180181
return rw, nil
181182
}
182183

184+
func (dd *SQLDataDestination) OpenSQLLogger(folderPath string) error {
185+
dd.sqlLogger = NewSQLLogger(folderPath)
186+
if err := dd.sqlLogger.Open(); err != nil {
187+
dd.sqlLogger = nil
188+
return &push.Error{Description: err.Error()}
189+
}
190+
191+
return nil
192+
}
193+
183194
// SQLRowWriter write data to a SQL table.
184195
type SQLRowWriter struct {
185196
table push.Table
@@ -188,6 +199,7 @@ type SQLRowWriter struct {
188199
statement *sql.Stmt
189200
headers ValueHeaders
190201
disabledConstraints []SQLConstraint
202+
sqlLogger *SQLLoggerWriter
191203
}
192204

193205
// NewSQLRowWriter creates a new SQL row writer.
@@ -230,6 +242,7 @@ func (rw *SQLRowWriter) close() *push.Error {
230242
rw.statement = nil
231243
log.Debug().Msg(fmt.Sprintf("close statement %s", rw.dd.mode))
232244
}
245+
rw.sqlLogger.Close()
233246
return nil
234247
}
235248

@@ -285,6 +298,7 @@ func (rw *SQLRowWriter) createStatement(row push.Row, where push.Row) *push.Erro
285298
return &push.Error{Description: err.Error()}
286299
}
287300
rw.statement = stmt
301+
rw.sqlLogger = rw.dd.sqlLogger.OpenWriter(rw.table, prepareStmt)
288302
return nil
289303
}
290304

@@ -347,6 +361,8 @@ func (rw *SQLRowWriter) Write(row push.Row, where push.Row) *push.Error {
347361
}
348362
log.Trace().Stringer("headers", rw.headers).Str("table", rw.table.Name()).Msg(fmt.Sprint(values))
349363

364+
rw.sqlLogger.Write(values)
365+
350366
_, err2 := rw.statement.Exec(values...)
351367
if err2 != nil {
352368
// reset statement after error

internal/infra/push/datadestination_ws.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ func (dd *WebSocketDataDestination) Commit() *push.Error {
207207
return nil
208208
}
209209

210+
func (dd *WebSocketDataDestination) OpenSQLLogger(folderPath string) error {
211+
panic("not implemented")
212+
}
213+
210214
// RowWriter return web socket table writer
211215
func (dd *WebSocketDataDestination) RowWriter(table push.Table) (push.RowWriter, *push.Error) {
212216
return &WebSocketRowWriter{dd, table}, nil

internal/infra/push/sql_logger.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package push
2+
3+
import (
4+
"encoding/csv"
5+
"fmt"
6+
"os"
7+
8+
"github.com/cgi-fr/lino/pkg/push"
9+
"github.com/rs/zerolog/log"
10+
)
11+
12+
type SQLLogger struct {
13+
folderPath string
14+
}
15+
16+
func NewSQLLogger(folderPath string) *SQLLogger {
17+
return &SQLLogger{
18+
folderPath: folderPath,
19+
}
20+
}
21+
22+
func (s *SQLLogger) Open() error {
23+
if s == nil {
24+
// SQLLogger is not set.
25+
return nil
26+
}
27+
28+
// Check if the folder exists
29+
if _, err := os.Stat(s.folderPath); os.IsNotExist(err) {
30+
// Create the folder if it doesn't exist
31+
if err := os.MkdirAll(s.folderPath, 0o755); err != nil { //nolint:mnd
32+
return err
33+
}
34+
}
35+
// Check if we have permission to write in the folder
36+
if err := os.WriteFile(s.folderPath+"/.test", []byte("test"), 0o600); err != nil {
37+
return err
38+
}
39+
// Remove the test file
40+
_ = os.Remove(s.folderPath + "/.test")
41+
return nil
42+
}
43+
44+
type SQLLoggerWriter struct {
45+
writer *csv.Writer
46+
}
47+
48+
func (s *SQLLogger) OpenWriter(table push.Table, sqlquery string) *SQLLoggerWriter {
49+
if s == nil {
50+
// SQLLogger is not set.
51+
return nil
52+
}
53+
54+
filename := s.folderPath + "/" + table.Name() + ".csv"
55+
writer, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
56+
if err != nil {
57+
log.Warn().Err(err).Msgf("Cannot open file %v for SQL logger", filename)
58+
}
59+
60+
logger := &SQLLoggerWriter{
61+
writer: csv.NewWriter(writer),
62+
}
63+
64+
if _, err := writer.WriteString("# " + sqlquery + "\n"); err != nil {
65+
log.Warn().Err(err).Msgf("Cannot write into file %v for SQL logger", filename)
66+
}
67+
68+
return logger
69+
}
70+
71+
func (w *SQLLoggerWriter) Write(data []any) {
72+
if w == nil {
73+
// SQLLoggerWriter is not set.
74+
return
75+
}
76+
77+
// Write the data to the file in CSV format
78+
if err := w.writer.Write(toStrings(data)); err != nil {
79+
log.Warn().Err(err).Msgf("Cannot log SQL statement")
80+
}
81+
}
82+
83+
func toStrings(data []any) []string {
84+
strings := make([]string, len(data))
85+
for i, v := range data {
86+
strings[i] = toString(v)
87+
}
88+
return strings
89+
}
90+
91+
func toString(data any) string {
92+
return fmt.Sprintf("%v", data)
93+
}
94+
95+
func (w *SQLLoggerWriter) Close() {
96+
if w == nil {
97+
// SQLLoggerWriter is not set.
98+
return
99+
}
100+
101+
w.writer.Flush()
102+
if err := w.writer.Error(); err != nil {
103+
log.Warn().Err(err).Msgf("Cannot flush SQL logger")
104+
}
105+
}

pkg/push/driven.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type DataDestination interface {
2727
Open(plan Plan, mode Mode, disableConstraints bool) *Error
2828
Commit() *Error
2929
RowWriter(table Table) (RowWriter, *Error)
30+
OpenSQLLogger(folderPath string) error
3031
Close() *Error
3132
}
3233

pkg/push/driven_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ func (mdd *memoryDataDestination) RowWriter(table push.Table) (push.RowWriter, *
6161
return mdd.tables[table.Name()], nil
6262
}
6363

64+
func (mdd *memoryDataDestination) OpenSQLLogger(string) error {
65+
return nil
66+
}
67+
6468
func (mdd *memoryDataDestination) Open(pla push.Plan, mode push.Mode, disableConstraints bool) *push.Error {
6569
mdd.opened = true
6670
return nil

tests/suites/push/parent_relation.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,29 @@ testcases:
3131
- script: lino id set-parent-lookup city_country_id_fkey true
3232
- script: lino pull source -f address_id=1 > store1.json
3333

34+
- name: log-sql feature
35+
steps:
36+
- script: lino pull source --limit 10 | lino push truncate -d dest --log-sql logs
37+
assertions:
38+
- result.code ShouldEqual 0
39+
- result.systemout ShouldBeEmpty
40+
- result.systemerr ShouldBeEmpty
41+
- script: cat logs/address.csv | wc -l
42+
assertions:
43+
- result.code ShouldEqual 0
44+
- result.systemout ShouldEqual 11
45+
- result.systemerr ShouldBeEmpty
46+
- script: cat logs/city.csv | wc -l
47+
assertions:
48+
- result.code ShouldEqual 0
49+
- result.systemout ShouldEqual 11
50+
- result.systemerr ShouldBeEmpty
51+
- script: cat logs/country.csv | wc -l
52+
assertions:
53+
- result.code ShouldEqual 0
54+
- result.systemout ShouldEqual 11
55+
- result.systemerr ShouldBeEmpty
56+
3457
- name: push truncate
3558
steps:
3659
- script: cat store1.json | lino push truncate -d dest

tests/suites/push/single_relation_delete.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ testcases:
2020
- name: prepare test
2121
steps:
2222
# Clean working directory
23-
- script: rm -f *
23+
- script: rm -rf *
2424
- script: lino dataconnector add --read-only source postgresql://postgres:sakila@source:5432/postgres?sslmode=disable
2525
- script: lino relation extract source
2626
- script: lino table extract source --only-tables

0 commit comments

Comments
 (0)