Skip to content

Commit 573c2f2

Browse files
karakanbclaude
andcommitted
Stabilize NetSuite SuiteQL pagination with best-effort ORDER BY
Offset/limit pagination over SuiteQL is only deterministic with a stable ORDER BY. Plain table reads now sort by the incremental key (when set) followed by the primary keys as a unique tie-breaker, so multi-page extracts no longer skip or duplicate rows. Ordering is best-effort and never blocks a load: if NetSuite rejects the auto-added ORDER BY with a 400 (e.g. a table that exposes no sortable column), the read retries once without it, warns that multi-page results may be inconsistent, and continues. Custom (query:) reads are sent as-is. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent c977c7c commit 573c2f2

4 files changed

Lines changed: 147 additions & 17 deletions

File tree

docs/supported-sources/netsuite.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,6 @@ ingestr ingest \
6565
--dest-table "main.netsuite_transactions"
6666
```
6767

68-
For plain table names, ingestr runs `SELECT * FROM <source-table>` through SuiteQL. For joins, selected columns, built-in functions, or aliases, use the `query:` source-table form.
68+
For plain table names, ingestr runs `SELECT * FROM <source-table>` through SuiteQL, appending an `ORDER BY` on the primary key (and incremental key when set) so that offset-based pagination returns a stable result set. If the table exposes no such column, ingestr retries without the `ORDER BY` and still loads the data, warning that results spanning more than one page (1000 rows) may be inconsistent; pass `--primary-key` to point at a sortable unique key. For joins, selected columns, built-in functions, or aliases, use the `query:` source-table form; custom queries are sent as-is, so add your own `ORDER BY` when the result spans more than one page.
6969

7070
NetSuite SuiteQL responses do not expose a static schema through this connector, so ingestr infers the schema from extracted rows.

pkg/source/netsuite/client.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,18 @@ type SuiteQLResponse struct {
266266
Items []map[string]interface{} `json:"items"`
267267
}
268268

269+
// SuiteQLError is returned when the SuiteQL endpoint responds with a non-success
270+
// status. StatusCode lets callers distinguish client errors (e.g. a 400 from an
271+
// invalid ORDER BY column) from transient failures.
272+
type SuiteQLError struct {
273+
StatusCode int
274+
Body string
275+
}
276+
277+
func (e *SuiteQLError) Error() string {
278+
return fmt.Sprintf("netsuite suiteql returned status %d: %s", e.StatusCode, e.Body)
279+
}
280+
269281
func (c *Client) SuiteQL(ctx context.Context, query string, limit, offset int) (*SuiteQLResponse, error) {
270282
if limit <= 0 {
271283
limit = maxSuiteQLPageSize
@@ -294,7 +306,7 @@ func (c *Client) SuiteQL(ctx context.Context, query string, limit, offset int) (
294306
return nil, fmt.Errorf("netsuite suiteql request failed: %w", err)
295307
}
296308
if !resp.IsSuccess() {
297-
return nil, fmt.Errorf("netsuite suiteql returned status %d: %s", resp.StatusCode(), resp.String())
309+
return nil, &SuiteQLError{StatusCode: resp.StatusCode(), Body: resp.String()}
298310
}
299311

300312
var out SuiteQLResponse

pkg/source/netsuite/netsuite.go

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package netsuite
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"net/http"
68
"net/url"
79
"strings"
810
"time"
@@ -93,14 +95,14 @@ func (s *NetSuiteSource) GetTable(ctx context.Context, req source.TableRequest)
9395
return nil, fmt.Errorf("netsuite source does not have a predefined schema; schema inference is required")
9496
},
9597
ReadFn: func(ctx context.Context, opts source.ReadOptions) (<-chan source.RecordBatchResult, error) {
96-
return s.readTable(ctx, tableName, opts)
98+
return s.readTable(ctx, tableName, primaryKeys, opts)
9799
},
98100
}, nil
99101
}
100102

101-
func (s *NetSuiteSource) readTable(ctx context.Context, tableName string, opts source.ReadOptions) (<-chan source.RecordBatchResult, error) {
102-
query := buildSuiteQL(tableName, opts)
103-
return s.readSuiteQL(ctx, query, opts)
103+
func (s *NetSuiteSource) readTable(ctx context.Context, tableName string, primaryKeys []string, opts source.ReadOptions) (<-chan source.RecordBatchResult, error) {
104+
query, fallback := buildSuiteQL(tableName, primaryKeys, opts)
105+
return s.readSuiteQL(ctx, query, fallback, opts)
104106
}
105107

106108
func (s *NetSuiteSource) ExecuteCustomQuery(ctx context.Context, query string, opts source.ReadOptions) (<-chan source.RecordBatchResult, error) {
@@ -109,10 +111,10 @@ func (s *NetSuiteSource) ExecuteCustomQuery(ctx context.Context, query string, o
109111
if query == "" {
110112
return nil, fmt.Errorf("netsuite custom query cannot be empty")
111113
}
112-
return s.readSuiteQL(ctx, query, opts)
114+
return s.readSuiteQL(ctx, query, "", opts)
113115
}
114116

115-
func (s *NetSuiteSource) readSuiteQL(ctx context.Context, query string, opts source.ReadOptions) (<-chan source.RecordBatchResult, error) {
117+
func (s *NetSuiteSource) readSuiteQL(ctx context.Context, query, fallbackQuery string, opts source.ReadOptions) (<-chan source.RecordBatchResult, error) {
116118
if s.client == nil {
117119
return nil, fmt.Errorf("netsuite source is not connected")
118120
}
@@ -137,6 +139,17 @@ func (s *NetSuiteSource) readSuiteQL(ctx context.Context, query string, opts sou
137139

138140
resp, err := s.client.SuiteQL(ctx, query, pageSize, offset)
139141
if err != nil {
142+
// Ordering is best-effort: if NetSuite rejects the auto-added
143+
// ORDER BY (e.g. the table exposes no such column), retry once
144+
// without it so the load still succeeds. Pagination may then be
145+
// inconsistent across pages, so warn the user.
146+
if fallbackQuery != "" && offset == 0 && isInvalidQueryError(err) {
147+
fmt.Printf("Warning: netsuite rejected the ordered query; retrying without ORDER BY. Multi-page results may be inconsistent—pass --primary-key to set a sortable key.\n")
148+
config.Debug("[NETSUITE] Falling back to unordered query: %s", fallbackQuery)
149+
query = fallbackQuery
150+
fallbackQuery = ""
151+
continue
152+
}
140153
results <- source.RecordBatchResult{Err: err}
141154
return
142155
}
@@ -267,8 +280,14 @@ func (c uriConfig) authProvider() (AuthProvider, []interface{ Close() error }, e
267280
return provider, []interface{ Close() error }{provider}, nil
268281
}
269282

270-
func buildSuiteQL(tableName string, opts source.ReadOptions) string {
271-
query := fmt.Sprintf("SELECT * FROM %s", strings.TrimSpace(tableName))
283+
// buildSuiteQL returns the query to run and an unordered fallback. Offset/limit
284+
// pagination is only stable with a deterministic ORDER BY, so the primary query
285+
// sorts by the incremental key first, then the primary keys as a unique
286+
// tie-breaker. The fallback drops the ORDER BY (but keeps any WHERE filter) so a
287+
// table without a sortable key can still be loaded; it is empty when no ordering
288+
// was added.
289+
func buildSuiteQL(tableName string, primaryKeys []string, opts source.ReadOptions) (string, string) {
290+
base := fmt.Sprintf("SELECT * FROM %s", strings.TrimSpace(tableName))
272291

273292
var conditions []string
274293
if opts.IncrementalKey != "" {
@@ -280,11 +299,40 @@ func buildSuiteQL(tableName string, opts source.ReadOptions) string {
280299
}
281300
}
282301
if len(conditions) > 0 {
283-
query += " WHERE " + strings.Join(conditions, " AND ")
284-
query += " ORDER BY " + opts.IncrementalKey + " ASC"
302+
base += " WHERE " + strings.Join(conditions, " AND ")
303+
}
304+
305+
orderBy := buildOrderBy(opts.IncrementalKey, primaryKeys)
306+
if orderBy == "" {
307+
return base, ""
308+
}
309+
return base + " ORDER BY " + orderBy, base
310+
}
311+
312+
func isInvalidQueryError(err error) bool {
313+
var apiErr *SuiteQLError
314+
return errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusBadRequest
315+
}
316+
317+
func buildOrderBy(incrementalKey string, primaryKeys []string) string {
318+
seen := make(map[string]bool)
319+
var columns []string
320+
add := func(col string) {
321+
col = strings.TrimSpace(col)
322+
key := strings.ToLower(col)
323+
if col == "" || seen[key] {
324+
return
325+
}
326+
seen[key] = true
327+
columns = append(columns, col+" ASC")
328+
}
329+
330+
add(incrementalKey)
331+
for _, pk := range primaryKeys {
332+
add(pk)
285333
}
286334

287-
return query
335+
return strings.Join(columns, ", ")
288336
}
289337

290338
func suiteQLTimestampLiteral(t time.Time) string {

pkg/source/netsuite/netsuite_test.go

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,32 @@ func TestBuildSuiteQL(t *testing.T) {
7878
start := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC)
7979
end := time.Date(2026, 1, 3, 3, 4, 5, 0, time.UTC)
8080

81-
got := buildSuiteQL("transaction", source.ReadOptions{
81+
got, fallback := buildSuiteQL("transaction", []string{"id"}, source.ReadOptions{
8282
IncrementalKey: "lastmodifieddate",
8383
IntervalStart: &start,
8484
IntervalEnd: &end,
8585
})
8686

87-
assert.Equal(t, `SELECT * FROM transaction WHERE lastmodifieddate >= TO_TIMESTAMP_TZ('2026-01-02T03:04:05.000 +00:00', 'YYYY-MM-DD"T"HH24:MI:SS.FF TZH:TZM') AND lastmodifieddate < TO_TIMESTAMP_TZ('2026-01-03T03:04:05.000 +00:00', 'YYYY-MM-DD"T"HH24:MI:SS.FF TZH:TZM') ORDER BY lastmodifieddate ASC`, got)
87+
where := `SELECT * FROM transaction WHERE lastmodifieddate >= TO_TIMESTAMP_TZ('2026-01-02T03:04:05.000 +00:00', 'YYYY-MM-DD"T"HH24:MI:SS.FF TZH:TZM') AND lastmodifieddate < TO_TIMESTAMP_TZ('2026-01-03T03:04:05.000 +00:00', 'YYYY-MM-DD"T"HH24:MI:SS.FF TZH:TZM')`
88+
assert.Equal(t, where+" ORDER BY lastmodifieddate ASC, id ASC", got)
89+
assert.Equal(t, where, fallback)
90+
}
91+
92+
func TestBuildSuiteQLOrdersByPrimaryKeyWithoutIncrementalKey(t *testing.T) {
93+
got, fallback := buildSuiteQL("customer", []string{"id"}, source.ReadOptions{})
94+
assert.Equal(t, "SELECT * FROM customer ORDER BY id ASC", got)
95+
assert.Equal(t, "SELECT * FROM customer", fallback)
96+
}
97+
98+
func TestBuildSuiteQLDedupesIncrementalKeyInOrderBy(t *testing.T) {
99+
got, _ := buildSuiteQL("customer", []string{"id"}, source.ReadOptions{IncrementalKey: "id"})
100+
assert.Equal(t, "SELECT * FROM customer ORDER BY id ASC", got)
101+
}
102+
103+
func TestBuildSuiteQLWithoutPrimaryKeyHasNoFallback(t *testing.T) {
104+
got, fallback := buildSuiteQL("customer", nil, source.ReadOptions{})
105+
assert.Equal(t, "SELECT * FROM customer", got)
106+
assert.Empty(t, fallback)
88107
}
89108

90109
func TestNetSuiteSourceGetTable(t *testing.T) {
@@ -115,7 +134,7 @@ func TestNetSuiteSourceReadUsesSuiteQLPagination(t *testing.T) {
115134

116135
var body map[string]string
117136
require.NoError(t, json.NewDecoder(r.Body).Decode(&body))
118-
assert.Equal(t, "SELECT * FROM customer", body["q"])
137+
assert.Equal(t, "SELECT * FROM customer ORDER BY id ASC", body["q"])
119138

120139
w.Header().Set("Content-Type", "application/json")
121140
call := requestCount.Add(1)
@@ -194,7 +213,7 @@ func TestNetSuiteSourceReadHonorsLimit(t *testing.T) {
194213
}
195214
defer func() { _ = s.Close(context.Background()) }()
196215

197-
ch, err := s.readTable(context.Background(), "customer", source.ReadOptions{Limit: 2})
216+
ch, err := s.readTable(context.Background(), "customer", []string{"id"}, source.ReadOptions{Limit: 2})
198217
require.NoError(t, err)
199218

200219
batches := collectResults(t, ch)
@@ -203,6 +222,57 @@ func TestNetSuiteSourceReadHonorsLimit(t *testing.T) {
203222
assert.Equal(t, int32(1), requestCount.Load())
204223
}
205224

225+
func TestNetSuiteSourceReadFallsBackWhenOrderByRejected(t *testing.T) {
226+
var orderedAttempts atomic.Int32
227+
var unorderedRequests atomic.Int32
228+
229+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
230+
var body map[string]string
231+
require.NoError(t, json.NewDecoder(r.Body).Decode(&body))
232+
233+
w.Header().Set("Content-Type", "application/json")
234+
switch body["q"] {
235+
case "SELECT * FROM customrecord ORDER BY id ASC":
236+
orderedAttempts.Add(1)
237+
w.WriteHeader(http.StatusBadRequest)
238+
_ = json.NewEncoder(w).Encode(map[string]interface{}{
239+
"o:errorDetails": []map[string]interface{}{
240+
{"detail": "Search error occurred: Field 'id' was not found."},
241+
},
242+
})
243+
case "SELECT * FROM customrecord":
244+
unorderedRequests.Add(1)
245+
_ = json.NewEncoder(w).Encode(map[string]interface{}{
246+
"count": 1,
247+
"offset": 0,
248+
"hasMore": false,
249+
"totalResults": 1,
250+
"items": []map[string]interface{}{
251+
{"name": "row-1"},
252+
},
253+
})
254+
default:
255+
t.Errorf("unexpected query: %q", body["q"])
256+
w.WriteHeader(http.StatusInternalServerError)
257+
}
258+
}))
259+
defer server.Close()
260+
261+
s := &NetSuiteSource{
262+
client: NewClient(server.URL, NewStaticTokenProvider("test-token")),
263+
}
264+
defer func() { _ = s.Close(context.Background()) }()
265+
266+
ch, err := s.readTable(context.Background(), "customrecord", []string{"id"}, source.ReadOptions{})
267+
require.NoError(t, err)
268+
269+
batches := collectResults(t, ch)
270+
require.Len(t, batches, 1)
271+
assert.Equal(t, int64(1), batches[0].Batch.NumRows())
272+
assert.Equal(t, int32(1), orderedAttempts.Load())
273+
assert.Equal(t, int32(1), unorderedRequests.Load())
274+
}
275+
206276
func TestExecuteCustomQueryTrimsSemicolon(t *testing.T) {
207277
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
208278
var body map[string]string

0 commit comments

Comments
 (0)