Skip to content

Commit 9a76c2e

Browse files
committed
Properly create/configure the data stream
1 parent d18d1fb commit 9a76c2e

File tree

1 file changed

+79
-25
lines changed

1 file changed

+79
-25
lines changed

libbeat/outputs/elasticsearch/client_integration_test.go

Lines changed: 79 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@ package elasticsearch
2222
import (
2323
"context"
2424
"fmt"
25+
"io"
2526
"math/rand/v2"
27+
"net/http"
28+
"strings"
2629
"testing"
2730
"time"
2831

2932
"go.elastic.co/apm/v2/apmtest"
3033
"go.uber.org/zap"
3134

35+
"github.com/gofrs/uuid/v5"
3236
"github.com/stretchr/testify/assert"
3337
"github.com/stretchr/testify/require"
3438

@@ -466,44 +470,97 @@ func randomClient(grp outputs.Group) outputs.NetworkClient {
466470
return client.(outputs.NetworkClient) //nolint:errcheck //This is a test file, can ignore
467471
}
468472

469-
func deleteDatastream(t *testing.T, client *Client, ds string) {
470-
status, _, err := client.conn.Request("DELETE", fmt.Sprintf("/_data_stream/%s", ds), "", nil, nil)
473+
func configureDatastream(t *testing.T, client *Client, ds string) {
474+
// Define the request body
475+
requestBody := `{
476+
"index_patterns": ["` + ds + `*"],
477+
"data_stream": { },
478+
"template": {
479+
"data_stream_options": {
480+
"failure_store": {
481+
"enabled": true
482+
}
483+
}
484+
}
485+
}`
486+
487+
// Create the HTTP request
488+
req, err := http.NewRequest("PUT", fmt.Sprintf("%s/_index_template/idx-tmpl-"+ds, client.conn.URL), strings.NewReader(requestBody))
471489
if err != nil {
472-
t.Fatalf("failed to delete datastream %s: %v", ds, err)
490+
t.Fatalf("failed to create HTTP request: %v", err)
473491
}
492+
req.Header.Set("Content-Type", "application/json")
493+
req.SetBasicAuth(client.conn.Username, client.conn.Password)
474494

475-
if status != 200 && status != 404 {
476-
t.Fatalf("unexpected status code %d while deleting datastream %s", status, ds)
495+
// Send the HTTP request
496+
resp, err := client.conn.HTTP.Do(req)
497+
if err != nil {
498+
t.Fatalf("failed to send HTTP request: %v", err)
499+
}
500+
defer resp.Body.Close()
501+
502+
// Check for non-2xx status codes
503+
if resp.StatusCode >= 300 {
504+
body, _ := io.ReadAll(resp.Body)
505+
t.Fatalf("unexpected status code %d while configuring datastream %s: %s", resp.StatusCode, ds, body)
506+
}
507+
}
508+
509+
func createDatastream(t *testing.T, client *Client, ds string) {
510+
configureDatastream(t, client, ds)
511+
timestamp := time.Now().Format(time.RFC3339)
512+
bulkRequest := fmt.Sprintf(`{"create":{}}
513+
{"@timestamp":"%s","foo":1234}
514+
`, timestamp)
515+
516+
// Create the HTTP request
517+
req, err := http.NewRequest(
518+
"POST",
519+
fmt.Sprintf("%s/%s/_bulk", client.conn.URL, ds),
520+
strings.NewReader(bulkRequest))
521+
522+
if err != nil {
523+
t.Fatalf("failed to create HTTP request: %v", err)
524+
}
525+
req.Header.Set("Content-Type", "application/x-ndjson")
526+
req.SetBasicAuth(client.conn.Username, client.conn.Password)
527+
528+
// Send the HTTP request
529+
resp, err := client.conn.HTTP.Do(req)
530+
if err != nil {
531+
t.Fatalf("failed to send HTTP request: %v", err)
532+
}
533+
defer resp.Body.Close()
534+
535+
// Check for non-2xx status codes
536+
if resp.StatusCode >= 300 {
537+
body, _ := io.ReadAll(resp.Body)
538+
t.Fatalf("unexpected status code %d while creating datastream %s: %s", resp.StatusCode, ds, body)
477539
}
478540
}
479541

480542
func TestFoo(t *testing.T) {
481-
index := "my-datastream-test"
543+
ds := uuid.Must(uuid.NewV4()).String()
482544
registry := monitoring.NewRegistry()
483545

484546
cfg := map[string]any{
485-
"index": index,
547+
"index": ds,
486548
}
487549
output, client := connectTestEs(t, cfg, outputs.NewStats(registry, logp.NewNopLogger()))
488550

489-
// drop old index preparing test
490-
// _, _, _ = client.conn.Delete(index, "", "", nil)
491-
492-
// deleteDatastream(t, client, index)
551+
createDatastream(t, client, ds)
493552

494553
batch := encodeBatch(client, outest.NewBatch(
495554
beat.Event{
496555
Timestamp: time.Now(),
497556
Fields: mapstr.M{
498-
"type": "test foo",
499557
"foo": "invalid type",
500558
"message": "this one works",
501559
},
502560
},
503561
beat.Event{
504562
Timestamp: time.Now(),
505563
Fields: mapstr.M{
506-
"type": "test foo",
507564
"foo": 42,
508565
"mesage": "success event",
509566
},
@@ -515,25 +572,22 @@ func TestFoo(t *testing.T) {
515572
t.Fatal(err)
516573
}
517574

518-
_, _, err = client.conn.Refresh(index)
575+
_, _, err = client.conn.Refresh(ds)
519576
if err != nil {
520577
t.Fatal(err)
521578
}
522579

523-
// _, resp, err := client.conn.CountSearchURI(index, "", nil)
524-
// if err != nil {
525-
// t.Fatal(err)
526-
// }
580+
_, resp, err := client.conn.CountSearchURI(ds, "", nil)
581+
if err != nil {
582+
t.Fatal(err)
583+
}
527584

528-
// assert.Equal(t, 2, resp.Count)
585+
// Expect two events in the index: one from the batch and another
586+
// from when we created the datastream
587+
assert.Equal(t, 2, resp.Count)
529588

530589
outputSnapshot := monitoring.CollectFlatSnapshot(registry, monitoring.Full, true)
590+
// Ensure the correct number of events was acked and sent to the failure store
531591
assert.EqualValues(t, 1, outputSnapshot.Ints["events.failure_store"], "failure store metric was not incremented")
532592
assert.EqualValues(t, 2, outputSnapshot.Ints["events.acked"], "wrong number of acked events")
533-
534-
assert.Greater(t, outputSnapshot.Ints["write.bytes"], int64(0), "output.events.write.bytes must be greater than 0")
535-
assert.Greater(t, outputSnapshot.Ints["read.bytes"], int64(0), "output.events.read.bytes must be greater than 0")
536-
assert.Equal(t, int64(0), outputSnapshot.Ints["write.errors"])
537-
assert.Equal(t, int64(0), outputSnapshot.Ints["read.errors"])
538-
539593
}

0 commit comments

Comments
 (0)