Skip to content

Commit 321ac3d

Browse files
committed
feat(ingest): support setting fields for csv without header row + csv batching
1 parent c4e9464 commit 321ac3d

File tree

1 file changed

+26
-7
lines changed

1 file changed

+26
-7
lines changed

internal/cmd/ingest/ingest.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ type options struct {
6565
// Labels attached to every event, server-side.
6666
Labels []ingest.Option
6767
labels []string // for the flag value
68+
// CSVFields are the field names for the CSV data. This is handy if the data
69+
// to ingest does not have a header row.
70+
CSVFields []ingest.Option
71+
csvFields []string
6872
// ContinueOnError will continue ingesting, even if an error is returned
6973
// from the server.
7074
ContinueOnError bool
@@ -135,6 +139,11 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
135139
# to every events, so there is no need to add them to the data,
136140
# locally:
137141
$ cat log*.json.gz | axiom ingest http-logs -t=json -e=gzip -l=env:prod -l=app:webserver
142+
143+
# Send a CSV file to a dataset called "sec-logs". The CSV file does
144+
# not have a header row, so the field names are set manually. This
145+
# also comes in handy as the file is now automatically batched.
146+
$ axiom ingest sec-logs -f sec-logs.csv -t=csv --csv-fields=timestamp,source,severity,message
138147
`),
139148

140149
Annotations: map[string]string{
@@ -176,10 +185,15 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
176185
opts.Labels = append(opts.Labels, ingest.SetEventLabel(splits[0], splits[1]))
177186
}
178187

188+
// Populate the CSV fields.
189+
for _, field := range opts.csvFields {
190+
opts.CSVFields = append(opts.CSVFields, ingest.AddCSVField(field))
191+
}
192+
179193
if err := complete(cmd.Context(), opts); err != nil {
180194
return err
181195
}
182-
return run(cmd.Context(), opts, cmd.Flag("flush-every").Changed)
196+
return run(cmd.Context(), opts, cmd.Flag("flush-every").Changed, cmd.Flag("csv-fields").Changed)
183197
},
184198
}
185199

@@ -191,6 +205,7 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
191205
cmd.Flags().StringVarP(&opts.contentType, "content-type", "t", "", "Content type of the data to ingest (will auto-detect if not set, must be set if content encoding is set and content type is not identity)")
192206
cmd.Flags().StringVarP(&opts.contentEncoding, "content-encoding", "e", axiom.Identity.String(), "Content encoding of the data to ingest")
193207
cmd.Flags().StringSliceVarP(&opts.labels, "label", "l", nil, "Labels to attach to the ingested events, server side")
208+
cmd.Flags().StringSliceVar(&opts.csvFields, "csv-fields", nil, "CSV header fields to use as event field names, server side (e.g. if there is no header row)")
194209
cmd.Flags().BoolVar(&opts.ContinueOnError, "continue-on-error", false, "Don't fail on ingest errors (use with care!)")
195210

196211
_ = cmd.RegisterFlagCompletionFunc("timestamp-field", cmdutil.NoCompletion)
@@ -250,7 +265,7 @@ func complete(ctx context.Context, opts *options) error {
250265
}, &opts.Dataset, opts.IO.SurveyIO())
251266
}
252267

253-
func run(ctx context.Context, opts *options, flushEverySet bool) error {
268+
func run(ctx context.Context, opts *options, flushEverySet, csvFieldsSet bool) error {
254269
client, err := opts.Client(ctx)
255270
if err != nil {
256271
return err
@@ -297,9 +312,12 @@ func run(ctx context.Context, opts *options, flushEverySet bool) error {
297312
return cmdutil.NewFlagErrorf("--delimier/-d not valid when content type is not CSV")
298313
}
299314

300-
var ingestRes *ingest.Status
301-
if filename == "stdin" && typ == axiom.NDJSON && opts.ContentEncoding == axiom.Identity {
302-
ingestRes, err = ingestEvery(ctx, client, r, opts)
315+
var (
316+
batchable = typ == axiom.NDJSON || (typ == axiom.CSV && csvFieldsSet)
317+
ingestRes *ingest.Status
318+
)
319+
if filename == "stdin" && batchable && opts.ContentEncoding == axiom.Identity {
320+
ingestRes, err = ingestEvery(ctx, client, r, typ, opts)
303321
} else {
304322
ingestRes, err = ingestReader(ctx, client, r, typ, opts)
305323
}
@@ -352,7 +370,7 @@ func run(ctx context.Context, opts *options, flushEverySet bool) error {
352370
return lastErr
353371
}
354372

355-
func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, opts *options) (*ingest.Status, error) {
373+
func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, typ axiom.ContentType, opts *options) (*ingest.Status, error) {
356374
t := time.NewTicker(opts.FlushEvery)
357375
defer t.Stop()
358376

@@ -424,7 +442,7 @@ func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, opts *o
424442

425443
var res ingest.Status
426444
for r := range readers {
427-
ingestRes, err := ingestReader(ctx, client, r, axiom.NDJSON, opts)
445+
ingestRes, err := ingestReader(ctx, client, r, typ, opts)
428446
if err != nil {
429447
if opts.ContinueOnError {
430448
fmt.Fprintf(opts.IO.ErrOut(), "%s Failed to ingest: %v, continuing...\n",
@@ -463,6 +481,7 @@ func ingestReader(ctx context.Context, client *axiom.Client, r io.Reader, typ ax
463481
ingestOptions = append(ingestOptions, ingest.SetCSVDelimiter(v))
464482
}
465483
ingestOptions = append(ingestOptions, opts.Labels...)
484+
ingestOptions = append(ingestOptions, opts.CSVFields...)
466485

467486
res, err := client.Datasets.Ingest(ctx, opts.Dataset, r, typ, enc, ingestOptions...)
468487
if err != nil {

0 commit comments

Comments
 (0)