Skip to content

Commit 38fb3a9

Browse files
authored
[v0.6] Prevent queries being interrupted by the sqlite layer (#897)
1 parent 68fccf9 commit 38fb3a9

File tree

6 files changed

+33
-133
lines changed

6 files changed

+33
-133
lines changed

pkg/sqlcache/db/client.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,16 @@ func (c *client) QueryForRows(ctx context.Context, stmt transaction.Stmt, params
208208
c.connLock.RLock()
209209
defer c.connLock.RUnlock()
210210

211-
return stmt.QueryContext(ctx, params...)
211+
// The underlying sqlite implementation seems to not cleanly close transactions when a query is interrupted
212+
// We use context.Background to let the query finish correctly, immediately checking the original context afterward and properly closing Rows if canceled
213+
rows, err := stmt.QueryContext(context.Background(), params...)
214+
if err != nil {
215+
return nil, err
216+
} else if ctx.Err() != nil {
217+
rows.Close()
218+
return nil, ctx.Err()
219+
}
220+
return rows, nil
212221
}
213222

214223
// CloseStmt will call close on the given Closable. It is intended to be used with a sql statement. This function is meant

pkg/sqlcache/db/transaction/transaction.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,5 @@ func (c client) Stmt(stmt *sql.Stmt) Stmt {
3939
// rationale: allow mocking
4040
type Stmt interface {
4141
Exec(args ...any) (sql.Result, error)
42-
Query(args ...any) (*sql.Rows, error)
4342
QueryContext(ctx context.Context, args ...any) (*sql.Rows, error)
44-
QueryRowContext(ctx context.Context, args ...any) *sql.Row
4543
}

pkg/sqlcache/db/transaction_mocks_test.go

Lines changed: 0 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/sqlcache/informer/listoption_indexer.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -363,23 +363,32 @@ func (l *ListOptionIndexer) Watch(ctx context.Context, opts WatchOptions, events
363363
}
364364

365365
if err := l.WithTransaction(ctx, false, func(tx transaction.Client) error {
366-
rowIDRow := tx.Stmt(l.findEventsRowByRVStmt).QueryRowContext(ctx, targetRV)
367-
if err := rowIDRow.Err(); err != nil {
368-
return &db.QueryError{QueryString: l.findEventsRowByRVQuery, Err: err}
369-
}
370-
371366
var rowID int
372-
err := rowIDRow.Scan(&rowID)
373-
if errors.Is(err, sql.ErrNoRows) {
374-
if targetRV != latestRV {
375-
return ErrTooOld
367+
// use a closure to ensure rows is always closed immediately after it's needed
368+
if err := func() error {
369+
rows, err := l.QueryForRows(ctx, tx.Stmt(l.findEventsRowByRVStmt), targetRV)
370+
if err != nil {
371+
return &db.QueryError{QueryString: l.findEventsRowByRVQuery, Err: err}
372+
}
373+
defer rows.Close()
374+
375+
if !rows.Next() {
376+
// query returned no results
377+
if targetRV != latestRV {
378+
return ErrTooOld
379+
}
380+
return nil
381+
}
382+
if err := rows.Scan(&rowID); err != nil {
383+
return fmt.Errorf("failed scan rowid: %w", err)
376384
}
377-
} else if err != nil {
378-
return fmt.Errorf("failed scan rowid: %w", err)
385+
return nil
386+
}(); err != nil {
387+
return err
379388
}
380389

381390
// Backfilling previous events from resourceVersion
382-
rows, err := tx.Stmt(l.listEventsAfterStmt).QueryContext(ctx, rowID)
391+
rows, err := l.QueryForRows(ctx, tx.Stmt(l.listEventsAfterStmt), rowID)
383392
if err != nil {
384393
return &db.QueryError{QueryString: l.listEventsAfterQuery, Err: err}
385394
}
@@ -1027,9 +1036,8 @@ func (l *ListOptionIndexer) executeQuery(ctx context.Context, queryInfo *QueryIn
10271036

10281037
var items []any
10291038
err = l.WithTransaction(ctx, false, func(tx transaction.Client) error {
1030-
txStmt := tx.Stmt(stmt)
10311039
now := time.Now()
1032-
rows, err := txStmt.QueryContext(ctx, queryInfo.params...)
1040+
rows, err := l.QueryForRows(ctx, tx.Stmt(stmt), queryInfo.params...)
10331041
if err != nil {
10341042
return &db.QueryError{QueryString: queryInfo.query, Err: err}
10351043
}
@@ -1049,9 +1057,8 @@ func (l *ListOptionIndexer) executeQuery(ctx context.Context, queryInfo *QueryIn
10491057
err = errors.Join(err, &db.QueryError{QueryString: queryInfo.countQuery, Err: cerr})
10501058
}
10511059
}()
1052-
txStmt := tx.Stmt(countStmt)
10531060
now = time.Now()
1054-
rows, err := txStmt.QueryContext(ctx, queryInfo.countParams...)
1061+
rows, err := l.QueryForRows(ctx, tx.Stmt(countStmt), queryInfo.countParams...)
10551062
if err != nil {
10561063
return &db.QueryError{QueryString: queryInfo.countQuery, Err: err}
10571064
}

pkg/sqlcache/informer/transaction_mocks_test.go

Lines changed: 0 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/sqlcache/store/transaction_mocks_test.go

Lines changed: 0 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)