Skip to content

Commit 64e102f

Browse files
committed
fix(ducklake): validate TableKey and where clause before dynamic SQL
Centralize allowlist validation for proto_type/sub_type (GetTableSchemas), strict where parsing with column allowlist, and limit clamping. Apply on MultiTableReader and HTTP handlers; return 400 on client input errors. Addresses CodeQL alert #29 (SQL built from user-controlled sources).
1 parent 9db94d3 commit 64e102f

5 files changed

Lines changed: 492 additions & 44 deletions

File tree

src/storage/ducklake/README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,20 @@ Response:
275275

276276
### List Snapshots (per proto_type)
277277

278+
Allowed `proto_type` / `sub_type` pairs match the canonical DuckLake schemas (see `GetTableSchemas()` in `tables.go`):
279+
280+
| proto_type | sub_type | Table |
281+
|------------|----------|-------|
282+
| 1 (SIP) | `call` (default), `registration`, `default` | SIP call / registration / other |
283+
| 5 | _(empty)_ | RTCP JSON |
284+
| 34, 35 | _(empty)_ | RTCP / RTP |
285+
| 53 | _(empty)_ | DNS |
286+
| 100 | _(empty)_ | LOG |
287+
288+
Invalid keys return **HTTP 400**. `limit` is clamped to 1000.
289+
278290
```bash
279-
GET /api/v1/ducklake/snapshots?proto_type=1&limit=10
291+
GET /api/v1/ducklake/snapshots?proto_type=1&sub_type=call&limit=10
280292
```
281293

282294
Response:
@@ -300,6 +312,12 @@ Response:
300312

301313
### Query
302314

315+
The `where` field is **not** arbitrary SQL. It must be a simple expression built from allowed columns for the target table(s):
316+
317+
- Predicates: `column = 'value'`, comparisons with integers, `column IS NULL`, combined with `AND` / `OR`
318+
- Column names must exist on the schema (e.g. `session_id`, `src_ip`, `timestamp`)
319+
- Max length 512; blocklisted tokens (`;`, `--`, `UNION`, `SELECT`, etc.) return **HTTP 400**
320+
303321
Query specific proto_type:
304322

305323
```bash
@@ -308,6 +326,7 @@ Content-Type: application/json
308326

309327
{
310328
"proto_type": 1,
329+
"sub_type": "call",
311330
"where": "session_id = 'abc123@host'",
312331
"limit": 100
313332
}

src/storage/ducklake/api.go

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,18 @@ import (
1414
"time"
1515
)
1616

17+
func writeDuckLakeHandlerError(w http.ResponseWriter, err error, serverMsg string) {
18+
if err == nil {
19+
http.Error(w, serverMsg, http.StatusInternalServerError)
20+
return
21+
}
22+
if IsClientInputError(err) {
23+
http.Error(w, err.Error(), http.StatusBadRequest)
24+
return
25+
}
26+
http.Error(w, serverMsg, http.StatusInternalServerError)
27+
}
28+
1729
// API provides HTTP API for DuckLake operations (legacy single-table)
1830
type API struct {
1931
reader *Reader
@@ -148,12 +160,13 @@ func (a *API) HandleSnapshots(w http.ResponseWriter, r *http.Request) {
148160
return
149161
}
150162

151-
limit := 100
163+
limit := DefaultQueryLimit
152164
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
153-
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 {
165+
if l, err := strconv.Atoi(limitStr); err == nil {
154166
limit = l
155167
}
156168
}
169+
limit = ClampLimit(limit, DefaultQueryLimit, MaxQueryLimit)
157170

158171
snapshots, err := a.reader.ListSnapshots(limit)
159172
if err != nil {
@@ -194,9 +207,7 @@ func (a *API) HandleQuery(w http.ResponseWriter, r *http.Request) {
194207
return
195208
}
196209

197-
if req.Limit <= 0 || req.Limit > 1000 {
198-
req.Limit = 100
199-
}
210+
req.Limit = ClampLimit(req.Limit, DefaultQueryLimit, MaxQueryLimit)
200211

201212
var records []HEPRecord
202213
var err error
@@ -218,7 +229,7 @@ func (a *API) HandleQuery(w http.ResponseWriter, r *http.Request) {
218229
}
219230

220231
if err != nil {
221-
http.Error(w, "Query failed: "+err.Error(), http.StatusInternalServerError)
232+
writeDuckLakeHandlerError(w, err, "Query failed")
222233
return
223234
}
224235

@@ -382,28 +393,32 @@ func (a *MultiTableAPI) HandleSnapshots(w http.ResponseWriter, r *http.Request)
382393
return
383394
}
384395

385-
// Build TableKey from query params
386-
key := TableKey{ProtoType: 1, SubType: SIPTypeCall} // Default to SIP calls
387-
396+
protoType := uint32(ProtoTypeSIP)
388397
if ptStr := r.URL.Query().Get("proto_type"); ptStr != "" {
389-
if pt, err := strconv.ParseUint(ptStr, 10, 32); err == nil {
390-
key.ProtoType = uint32(pt)
398+
pt, err := strconv.ParseUint(ptStr, 10, 32)
399+
if err != nil {
400+
http.Error(w, "invalid proto_type", http.StatusBadRequest)
401+
return
391402
}
403+
protoType = uint32(pt)
392404
}
393-
if subType := r.URL.Query().Get("sub_type"); subType != "" {
394-
key.SubType = subType
405+
key, err := ParseTableKey(protoType, r.URL.Query().Get("sub_type"))
406+
if err != nil {
407+
writeDuckLakeHandlerError(w, err, "invalid table key")
408+
return
395409
}
396410

397-
limit := 100
411+
limit := DefaultQueryLimit
398412
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
399-
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 {
413+
if l, err := strconv.Atoi(limitStr); err == nil {
400414
limit = l
401415
}
402416
}
417+
limit = ClampLimit(limit, DefaultQueryLimit, MaxQueryLimit)
403418

404419
snapshots, err := a.reader.ListSnapshots(key, limit)
405420
if err != nil {
406-
http.Error(w, "Failed to list snapshots", http.StatusInternalServerError)
421+
writeDuckLakeHandlerError(w, err, "Failed to list snapshots")
407422
return
408423
}
409424

@@ -442,24 +457,24 @@ func (a *MultiTableAPI) HandleQuery(w http.ResponseWriter, r *http.Request) {
442457
return
443458
}
444459

445-
if req.Limit <= 0 || req.Limit > 1000 {
446-
req.Limit = 100
447-
}
460+
req.Limit = ClampLimit(req.Limit, DefaultQueryLimit, MaxQueryLimit)
448461

449462
var records []map[string]interface{}
450463
var err error
451464

452465
if req.ProtoType != nil {
453-
// Query specific table by TableKey
454-
key := TableKey{ProtoType: *req.ProtoType, SubType: req.SubType}
466+
key, keyErr := ParseTableKey(*req.ProtoType, req.SubType)
467+
if keyErr != nil {
468+
writeDuckLakeHandlerError(w, keyErr, "invalid table key")
469+
return
470+
}
455471
records, err = a.reader.Query(key, req.Where, req.Limit)
456472
} else {
457-
// Query all tables
458473
records, err = a.reader.QueryAll(req.Where, req.Limit)
459474
}
460475

461476
if err != nil {
462-
http.Error(w, "Query failed: "+err.Error(), http.StatusInternalServerError)
477+
writeDuckLakeHandlerError(w, err, "Query failed")
463478
return
464479
}
465480

src/storage/ducklake/timetravel.go

Lines changed: 83 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ func NewReader(w *Writer) *Reader {
3131

3232
// Query executes a query on current data
3333
func (r *Reader) Query(whereClause string, limit int) ([]HEPRecord, error) {
34+
if err := ValidateWhereClause(whereClause, AllQueryColumns()); err != nil {
35+
return nil, err
36+
}
37+
limit = ClampLimit(limit, DefaultQueryLimit, MaxQueryLimit)
3438
query := fmt.Sprintf("SELECT * FROM %s", r.tableFQN)
3539
if whereClause != "" {
3640
query += " WHERE " + whereClause
@@ -45,6 +49,10 @@ func (r *Reader) Query(whereClause string, limit int) ([]HEPRecord, error) {
4549

4650
// QueryAtSnapshot queries data at a specific snapshot
4751
func (r *Reader) QueryAtSnapshot(snapshotID int64, whereClause string, limit int) ([]HEPRecord, error) {
52+
if err := ValidateWhereClause(whereClause, AllQueryColumns()); err != nil {
53+
return nil, err
54+
}
55+
limit = ClampLimit(limit, DefaultQueryLimit, MaxQueryLimit)
4856
// DuckLake syntax: SELECT * FROM table AT SNAPSHOT snapshot_id
4957
query := fmt.Sprintf("SELECT * FROM %s AT SNAPSHOT %d", r.tableFQN, snapshotID)
5058
if whereClause != "" {
@@ -60,6 +68,10 @@ func (r *Reader) QueryAtSnapshot(snapshotID int64, whereClause string, limit int
6068

6169
// QueryAtTime queries data as it was at a specific timestamp
6270
func (r *Reader) QueryAtTime(asOf time.Time, whereClause string, limit int) ([]HEPRecord, error) {
71+
if err := ValidateWhereClause(whereClause, AllQueryColumns()); err != nil {
72+
return nil, err
73+
}
74+
limit = ClampLimit(limit, DefaultQueryLimit, MaxQueryLimit)
6375
// DuckLake syntax: SELECT * FROM table AT TIMESTAMP 'timestamp'
6476
timestamp := asOf.Format("2006-01-02 15:04:05")
6577
query := fmt.Sprintf("SELECT * FROM %s AT TIMESTAMP '%s'", r.tableFQN, timestamp)
@@ -108,9 +120,7 @@ func (r *Reader) executeQuery(query string) ([]HEPRecord, error) {
108120

109121
// ListSnapshots returns all available snapshots
110122
func (r *Reader) ListSnapshots(limit int) ([]Snapshot, error) {
111-
if limit <= 0 {
112-
limit = 100
113-
}
123+
limit = ClampLimit(limit, DefaultQueryLimit, MaxQueryLimit)
114124

115125
// DuckLake provides snapshot info via system function
116126
query := fmt.Sprintf(`
@@ -259,16 +269,20 @@ func NewMultiTableReader(w *MultiTableWriter) *MultiTableReader {
259269

260270
// allTablesForKey returns the DuckLake table and, when search_buffer is enabled,
261271
// both in-memory buffer tables for a key.
262-
func (r *MultiTableReader) allTablesForKey(key TableKey) []string {
263-
tables := []string{r.writer.GetTableFQN(key)}
272+
func (r *MultiTableReader) allTablesForKey(key TableKey) ([]string, error) {
273+
fqn, err := ResolveTableFQN(r.writer, key)
274+
if err != nil {
275+
return nil, err
276+
}
277+
tables := []string{fqn}
264278
if r.searchBuffer {
265279
if tw := r.writer.GetTable(key); tw != nil {
266280
for _, mem := range tw.MemTableNames() {
267281
tables = append(tables, mem)
268282
}
269283
}
270284
}
271-
return tables
285+
return tables, nil
272286
}
273287

274288
// GetTimeRange returns min/max timestamps across all tables including buffers
@@ -279,7 +293,11 @@ func (r *MultiTableReader) GetTimeRange() (minTs, maxTs int64, err error) {
279293
}
280294

281295
for _, key := range keys {
282-
for _, tbl := range r.allTablesForKey(key) {
296+
tbls, tblErr := r.allTablesForKey(key)
297+
if tblErr != nil {
298+
continue
299+
}
300+
for _, tbl := range tbls {
283301
query := fmt.Sprintf("SELECT MIN(timestamp), MAX(timestamp) FROM %s", tbl)
284302
var minNull, maxNull sql.NullInt64
285303
if err := r.db.QueryRow(query).Scan(&minNull, &maxNull); err != nil {
@@ -299,7 +317,14 @@ func (r *MultiTableReader) GetTimeRange() (minTs, maxTs int64, err error) {
299317

300318
// GetTimeRangeForTableKey returns time range for specific TableKey including buffers
301319
func (r *MultiTableReader) GetTimeRangeForTableKey(key TableKey) (minTs, maxTs int64, err error) {
302-
for _, tbl := range r.allTablesForKey(key) {
320+
if err := ValidateTableKey(key); err != nil {
321+
return 0, 0, err
322+
}
323+
tbls, err := r.allTablesForKey(key)
324+
if err != nil {
325+
return 0, 0, err
326+
}
327+
for _, tbl := range tbls {
303328
query := fmt.Sprintf("SELECT MIN(timestamp), MAX(timestamp) FROM %s", tbl)
304329
var minNull, maxNull sql.NullInt64
305330
if err := r.db.QueryRow(query).Scan(&minNull, &maxNull); err != nil {
@@ -338,7 +363,11 @@ func (r *MultiTableReader) GetRowCount() (int64, error) {
338363
var total int64
339364

340365
for _, key := range keys {
341-
for _, tbl := range r.allTablesForKey(key) {
366+
tbls, tblErr := r.allTablesForKey(key)
367+
if tblErr != nil {
368+
continue
369+
}
370+
for _, tbl := range tbls {
342371
var count int64
343372
query := fmt.Sprintf("SELECT COUNT(*) FROM %s", tbl)
344373
if err := r.db.QueryRow(query).Scan(&count); err != nil {
@@ -353,8 +382,15 @@ func (r *MultiTableReader) GetRowCount() (int64, error) {
353382

354383
// GetRowCountForTableKey returns row count for specific TableKey including buffers
355384
func (r *MultiTableReader) GetRowCountForTableKey(key TableKey) (int64, error) {
385+
if err := ValidateTableKey(key); err != nil {
386+
return 0, err
387+
}
388+
tbls, err := r.allTablesForKey(key)
389+
if err != nil {
390+
return 0, err
391+
}
356392
var total int64
357-
for _, tbl := range r.allTablesForKey(key) {
393+
for _, tbl := range tbls {
358394
var count int64
359395
query := fmt.Sprintf("SELECT COUNT(*) FROM %s", tbl)
360396
if err := r.db.QueryRow(query).Scan(&count); err != nil {
@@ -369,8 +405,12 @@ func (r *MultiTableReader) GetRowCountForTableKey(key TableKey) (int64, error) {
369405
// is enabled, it builds a UNION ALL across the DuckLake persistent table and
370406
// both in-memory buffer tables so queries see the freshest data even before
371407
// flush. When search_buffer is disabled, it queries only the DuckLake table.
372-
func (r *MultiTableReader) buildUnionQuery(key TableKey, whereClause string, limit int) string {
373-
tables := r.allTablesForKey(key)
408+
func (r *MultiTableReader) buildUnionQuery(key TableKey, whereClause string, limit int) (string, error) {
409+
tables, err := r.allTablesForKey(key)
410+
if err != nil {
411+
return "", err
412+
}
413+
limit = ClampLimit(limit, DefaultQueryLimit, MaxQueryLimit)
374414

375415
if len(tables) == 1 {
376416
query := fmt.Sprintf("SELECT * FROM %s", tables[0])
@@ -381,7 +421,7 @@ func (r *MultiTableReader) buildUnionQuery(key TableKey, whereClause string, lim
381421
if limit > 0 {
382422
query += fmt.Sprintf(" LIMIT %d", limit)
383423
}
384-
return query
424+
return query, nil
385425
}
386426

387427
var parts []string
@@ -398,28 +438,51 @@ func (r *MultiTableReader) buildUnionQuery(key TableKey, whereClause string, lim
398438
if limit > 0 {
399439
query += fmt.Sprintf(" LIMIT %d", limit)
400440
}
401-
return query
441+
return query, nil
402442
}
403443

404444
// Query executes a query on specific table by TableKey.
405445
// Includes data from both in-memory buffers via UNION ALL so that
406446
// un-flushed records are visible to search.
407447
func (r *MultiTableReader) Query(key TableKey, whereClause string, limit int) ([]map[string]interface{}, error) {
408-
query := r.buildUnionQuery(key, whereClause, limit)
448+
if err := ValidateTableKey(key); err != nil {
449+
return nil, err
450+
}
451+
cols, err := ColumnsForTableKey(key)
452+
if err != nil {
453+
return nil, err
454+
}
455+
if err := ValidateWhereClause(whereClause, cols); err != nil {
456+
return nil, err
457+
}
458+
query, err := r.buildUnionQuery(key, whereClause, limit)
459+
if err != nil {
460+
return nil, err
461+
}
409462
return r.executeQueryGeneric(query)
410463
}
411464

412465
// QueryAll executes a query across all tables.
413466
// Includes data from both in-memory buffers via UNION ALL.
414467
func (r *MultiTableReader) QueryAll(whereClause string, limit int) ([]map[string]interface{}, error) {
468+
if err := ValidateWhereClause(whereClause, AllQueryColumns()); err != nil {
469+
return nil, err
470+
}
471+
limit = ClampLimit(limit, DefaultQueryLimit, MaxQueryLimit)
415472
keys := r.writer.ListTableKeys()
416473
if len(keys) == 0 {
417474
return nil, nil
418475
}
419476

420477
var allResults []map[string]interface{}
421478
for _, key := range keys {
422-
query := r.buildUnionQuery(key, whereClause, limit)
479+
if err := ValidateTableKey(key); err != nil {
480+
continue
481+
}
482+
query, err := r.buildUnionQuery(key, whereClause, limit)
483+
if err != nil {
484+
continue
485+
}
423486
results, err := r.executeQueryGeneric(query)
424487
if err != nil {
425488
continue
@@ -497,12 +560,12 @@ func extractTimestamp(row map[string]interface{}) time.Time {
497560

498561
// ListSnapshots returns snapshots for a specific table by TableKey
499562
func (r *MultiTableReader) ListSnapshots(key TableKey, limit int) ([]Snapshot, error) {
500-
if limit <= 0 {
501-
limit = 100
563+
limit = ClampLimit(limit, DefaultQueryLimit, MaxQueryLimit)
564+
tableFQN, err := ResolveTableFQN(r.writer, key)
565+
if err != nil {
566+
return nil, err
502567
}
503568

504-
tableFQN := r.writer.GetTableFQN(key)
505-
506569
query := fmt.Sprintf(`
507570
SELECT snapshot_id, snapshot_time, row_count
508571
FROM ducklake_snapshots('%s')

0 commit comments

Comments
 (0)