Skip to content

Commit 8ad4ecb

Browse files
committed
feat: add output format and limit options for conns command; enhance Arrow IPC streaming support
1 parent aeb770a commit 8ad4ecb

4 files changed

Lines changed: 196 additions & 17 deletions

File tree

cmd/sling/sling_cli.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,12 @@ var cliConns = &g.CliSC{
264264
Type: "bool",
265265
Description: "Show column level metadata.",
266266
},
267+
{
268+
Name: "output",
269+
ShortName: "o",
270+
Type: "string",
271+
Description: "Output format: text (default), csv, json. Overrides SLING_OUTPUT.",
272+
},
267273
{
268274
Name: "debug",
269275
ShortName: "d",
@@ -390,6 +396,18 @@ var cliConns = &g.CliSC{
390396
},
391397
},
392398
Flags: []g.Flag{
399+
{
400+
Name: "output",
401+
ShortName: "o",
402+
Type: "string",
403+
Description: "Output format: text (default), csv, json, arrow. Overrides SLING_OUTPUT.",
404+
},
405+
{
406+
Name: "limit",
407+
ShortName: "l",
408+
Type: "string",
409+
Description: "Maximum number of rows to return. Default 100. Use 0 for no limit.",
410+
},
393411
{
394412
Name: "debug",
395413
ShortName: "d",

cmd/sling/sling_conns.go

Lines changed: 130 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"encoding/csv"
45
"fmt"
56
"os"
67
"strings"
@@ -12,6 +13,7 @@ import (
1213
"github.com/slingdata-io/sling-cli/core/dbio"
1314
"github.com/slingdata-io/sling-cli/core/dbio/connection"
1415
"github.com/slingdata-io/sling-cli/core/dbio/database"
16+
"github.com/slingdata-io/sling-cli/core/dbio/iop"
1517
"github.com/slingdata-io/sling-cli/core/env"
1618
"github.com/slingdata-io/sling-cli/core/sling"
1719
"github.com/spf13/cast"
@@ -32,7 +34,8 @@ func processConns(c *g.CliSC) (ok bool, err error) {
3234

3335
ef := env.LoadSlingEnvFile()
3436
ec := connection.EnvFileConns{EnvFile: &ef}
35-
asJSON := os.Getenv("SLING_OUTPUT") == "json"
37+
// resolved per-subcommand below; default falls back to SLING_OUTPUT
38+
var asJSON, asArrow, asCSV bool
3639

3740
entries := connection.GetLocalConns(true)
3841
defer connection.CloseAll()
@@ -86,6 +89,27 @@ func processConns(c *g.CliSC) (ok bool, err error) {
8689
case "exec":
8790
env.SetTelVal("task", g.Marshal(g.M("type", sling.ConnExec)))
8891

92+
var output string
93+
output, err = ResolveOutputFormat(c, "json", "csv", "arrow")
94+
if err != nil {
95+
return ok, err
96+
}
97+
asJSON = output == "json"
98+
asCSV = output == "csv"
99+
asArrow = output == "arrow"
100+
101+
// --limit: default 100, "0" means no limit. String type so we can
102+
// distinguish "not provided" (use default) from explicit 0.
103+
limit := uint64(100)
104+
if raw := strings.TrimSpace(cast.ToString(c.Vals["limit"])); raw != "" {
105+
n, parseErr := cast.ToUint64E(raw)
106+
if parseErr != nil {
107+
return ok, g.Error("invalid --limit %q; expected a non-negative integer", raw)
108+
}
109+
limit = n
110+
}
111+
queryOpts := g.M("limit", limit)
112+
89113
name := cast.ToString(c.Vals["name"])
90114
conn := entries.Get(name)
91115
if conn.Name == "" {
@@ -126,22 +150,97 @@ func processConns(c *g.CliSC) (ok bool, err error) {
126150

127151
if len(database.ParseSQLMultiStatements(query)) == 1 && (!sQuery.IsQuery() || (strings.Contains(strings.ToLower(query), "select") && !strings.Contains(strings.ToLower(query), "insert")) || g.In(conn.Connection.Type, dbio.TypeDbPrometheus, dbio.TypeDbMongoDB, dbio.TypeDbElasticsearch)) {
128152

129-
sql := sQuery.Select(database.SelectOptions{Limit: g.Ptr(100)})
130-
if sQuery.IsQuery() || sQuery.IsProcedural() {
131-
sql = sQuery.Raw
153+
// Limit handling:
154+
// - limit > 0: wrap the SQL with the dialect's limit_sql template via
155+
// sQuery.Select(...) so the database truncates server-side. This
156+
// works for both bare tables and raw SELECT queries.
157+
// - limit == 0: unlimited; for raw queries fall back to sQuery.Raw so
158+
// we don't wrap with `LIMIT 0`.
159+
// - procedural calls (stored procs, etc.) cannot be wrapped, so they
160+
// always use sQuery.Raw and ignore the limit.
161+
var selectOpts database.SelectOptions
162+
if limit > 0 {
163+
n := int(limit)
164+
selectOpts.Limit = &n
132165
}
133-
data, err := dbConn.Query(sql)
134-
if err != nil {
135-
return ok, g.Error(err, "cannot execute query")
166+
sql := sQuery.Select(selectOpts)
167+
if sQuery.IsProcedural() || (limit == 0 && sQuery.IsQuery()) {
168+
sql = sQuery.Raw
136169
}
137170

138-
if asJSON {
139-
fmt.Println(g.Marshal(g.M("fields", data.GetFields(), "rows", data.Rows)))
171+
if asArrow || asCSV {
172+
// Streaming path: pull rows from StreamRowsContext and write them
173+
// directly to stdout (Arrow IPC stream or CSV). Logs go to stderr
174+
// so the output stays clean. Memory stays bounded for large queries.
175+
ds, err := dbConn.Self().StreamRowsContext(dbConn.Context().Ctx, sql, queryOpts)
176+
if err != nil {
177+
return ok, g.Error(err, "cannot execute query")
178+
}
179+
if err := ds.WaitReady(); err != nil {
180+
return ok, g.Error(err, "datastream not ready")
181+
}
182+
183+
var rowCount int64
184+
if asArrow {
185+
aw, err := iop.NewArrowStreamWriter(os.Stdout, ds.Columns)
186+
if err != nil {
187+
return ok, g.Error(err, "could not create arrow writer")
188+
}
189+
for row := range ds.Rows() {
190+
if err := aw.WriteRow(row); err != nil {
191+
aw.Close()
192+
return ok, g.Error(err, "could not write arrow row")
193+
}
194+
rowCount++
195+
}
196+
if err := ds.Err(); err != nil {
197+
aw.Close()
198+
return ok, g.Error(err, "error while streaming rows")
199+
}
200+
if err := aw.Close(); err != nil {
201+
return ok, g.Error(err, "could not close arrow writer")
202+
}
203+
} else { // asCSV
204+
w := csv.NewWriter(os.Stdout)
205+
if err := w.Write(ds.Columns.Names()); err != nil {
206+
return ok, g.Error(err, "could not write csv header")
207+
}
208+
rec := make([]string, len(ds.Columns))
209+
for row := range ds.Rows() {
210+
for i, val := range row {
211+
if i >= len(ds.Columns) {
212+
break
213+
}
214+
rec[i] = ds.Sp.CastToStringCSV(i, val, ds.Columns[i].Type)
215+
}
216+
if err := w.Write(rec); err != nil {
217+
return ok, g.Error(err, "could not write csv row")
218+
}
219+
rowCount++
220+
}
221+
w.Flush()
222+
if err := w.Error(); err != nil {
223+
return ok, g.Error(err, "csv writer error")
224+
}
225+
if err := ds.Err(); err != nil {
226+
return ok, g.Error(err, "error while streaming rows")
227+
}
228+
}
229+
totalAffected = rowCount
140230
} else {
141-
fmt.Println(g.PrettyTable(data.GetFields(), data.Rows))
142-
}
231+
data, err := dbConn.Query(sql, queryOpts)
232+
if err != nil {
233+
return ok, g.Error(err, "cannot execute query")
234+
}
143235

144-
totalAffected = cast.ToInt64(len(data.Rows))
236+
if asJSON {
237+
fmt.Println(g.Marshal(g.M("fields", data.GetFields(), "rows", data.Rows)))
238+
} else {
239+
fmt.Println(g.PrettyTable(data.GetFields(), data.Rows))
240+
}
241+
242+
totalAffected = cast.ToInt64(len(data.Rows))
243+
}
145244
} else {
146245
if len(queries) > 1 {
147246
if strings.HasPrefix(query, "file://") {
@@ -176,7 +275,7 @@ func processConns(c *g.CliSC) (ok bool, err error) {
176275

177276
case "list":
178277
fields, rows := entries.List()
179-
if asJSON {
278+
if os.Getenv("SLING_OUTPUT") == "json" {
180279
fmt.Println(g.Marshal(g.M("fields", fields, "rows", rows)))
181280
} else {
182281
fmt.Println(g.PrettyTable(fields, rows))
@@ -201,7 +300,7 @@ func processConns(c *g.CliSC) (ok bool, err error) {
201300
err = g.Error(err, "could not test %s", name)
202301
}
203302

204-
if asJSON {
303+
if os.Getenv("SLING_OUTPUT") == "json" {
205304
fmt.Println(g.Marshal(g.M("success", err == nil, "error", g.ErrMsg(err))))
206305
return
207306
}
@@ -222,3 +321,20 @@ func processConns(c *g.CliSC) (ok bool, err error) {
222321
}
223322
return ok, nil
224323
}
324+
325+
// ResolveOutputFormat resolves the output format for `conns` subcommands.
326+
func ResolveOutputFormat(c *g.CliSC, allowed ...string) (string, error) {
327+
output := strings.ToLower(strings.TrimSpace(cast.ToString(c.Vals["output"])))
328+
if output == "" {
329+
output = strings.ToLower(strings.TrimSpace(os.Getenv("SLING_OUTPUT")))
330+
}
331+
if output == "" || output == "text" {
332+
return "", nil
333+
}
334+
for _, a := range allowed {
335+
if output == a {
336+
return output, nil
337+
}
338+
}
339+
return "", g.Error("invalid --output %q; expected one of: text, %s", output, strings.Join(allowed, ", "))
340+
}

core/dbio/iop/arrow.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,16 @@ retry:
350350
return false
351351
}
352352

353+
// arrowRecordWriter is the subset of *ipc.FileWriter / *ipc.Writer we use,
354+
// so ArrowWriter can hold either format behind one field.
355+
type arrowRecordWriter interface {
356+
Write(arrow.RecordBatch) error
357+
Close() error
358+
}
359+
353360
// ArrowWriter is an arrow writer object using arrow v18
354361
type ArrowWriter struct {
355-
Writer *ipc.FileWriter
362+
Writer arrowRecordWriter
356363
columns Columns
357364
arrowSchema *arrow.Schema
358365
mem memory.Allocator
@@ -399,6 +406,37 @@ func NewArrowWriter(w io.Writer, columns Columns, opts ...ipc.Option) (a *ArrowW
399406
return a, nil
400407
}
401408

409+
// NewArrowStreamWriter creates an Arrow IPC stream writer (vs. the file format
410+
// produced by NewArrowWriter). Use this when the destination is not seekable
411+
// — e.g. stdout, a network socket, or a pipe. Stream consumers read it with
412+
// pyarrow.ipc.open_stream / arrow::ipc::RecordBatchStreamReader.
413+
func NewArrowStreamWriter(w io.Writer, columns Columns, opts ...ipc.Option) (a *ArrowWriter, err error) {
414+
415+
for i, col := range columns {
416+
if col.IsDecimal() {
417+
columns[i].DbPrecision = lo.Ternary(col.DbPrecision < env.DdlMinDecLength, int(env.DdlMinDecLength), col.DbPrecision)
418+
columns[i].DbScale = lo.Ternary(col.DbScale < env.DdlMinDecScale, env.DdlMinDecScale, col.DbScale)
419+
}
420+
}
421+
422+
a = &ArrowWriter{
423+
columns: columns,
424+
mem: memory.NewGoAllocator(),
425+
}
426+
427+
a.arrowSchema = ColumnsToArrowSchema(columns)
428+
429+
writerOpts := append([]ipc.Option{ipc.WithSchema(a.arrowSchema), ipc.WithAllocator(a.mem)}, opts...)
430+
a.Writer = ipc.NewWriter(w, writerOpts...)
431+
432+
a.builders = make([]array.Builder, len(columns))
433+
for i, field := range a.arrowSchema.Fields() {
434+
a.builders[i] = a.createBuilder(field.Type)
435+
}
436+
437+
return a, nil
438+
}
439+
402440
func (a *ArrowWriter) createBuilder(dtype arrow.DataType) array.Builder {
403441
switch dtype.ID() {
404442
case arrow.BOOL:
@@ -465,7 +503,7 @@ func (a *ArrowWriter) flushBatch() error {
465503
}
466504

467505
// Create record batch
468-
batch := array.NewRecord(a.arrowSchema, arrays, int64(a.rowsBuffered))
506+
batch := array.NewRecordBatch(a.arrowSchema, arrays, int64(a.rowsBuffered))
469507
defer batch.Release()
470508

471509
// Write the batch

justfile

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,15 @@ test-python-arrow-true:
107107
export SLING_BINARY="$PWD/cmd/sling/sling"
108108
cd ../sling-python/sling && SLING_USE_ARROW=true python -m pytest tests/test_sling_class.py -v && cd -
109109

110+
# Test Python Connection class (sling conns exec/test, arrow IPC, CSV streaming, limit)
111+
test-python-conns:
112+
#!/usr/bin/env bash
113+
echo "TESTING Python Connection class"
114+
export SLING_BINARY="$PWD/cmd/sling/sling"
115+
cd ../sling-python/sling && python -m pytest tests/test_connection.py -v && cd -
116+
110117
# Run all Python tests
111-
test-python: test-python-main test-python-arrow-false test-python-arrow-true
118+
test-python: test-python-main test-python-arrow-false test-python-arrow-true test-python-conns
112119

113120
test-cdc-basic:
114121
#!/usr/bin/env bash

0 commit comments

Comments
 (0)