Skip to content

Commit dd57f51

Browse files
co42disaster37
andauthored
Add resource data stream (#11)
* Add resource data stream * Update resource_elasticsearch_data_stream.go Use the right elasticsearch version Co-authored-by: disaster37 <linuxworkgroup@hotmail.com>
1 parent 1bfdad5 commit dd57f51

2 files changed

Lines changed: 161 additions & 0 deletions

File tree

es/provider.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func Provider() *schema.Provider {
8181
"elasticsearch_snapshot_repository": resourceElasticsearchSnapshotRepository(),
8282
"elasticsearch_snapshot_lifecycle_policy": resourceElasticsearchSnapshotLifecyclePolicy(),
8383
"elasticsearch_watcher": resourceElasticsearchWatcher(),
84+
"elasticsearch_data_stream": resourceElasticsearchDataStream(),
8485
"elasticsearch_transform": resourceElasticsearchTransform(),
8586
"elasticsearch_ingest_pipeline": resourceElasticsearchIngestPipeline(),
8687
},
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
// Manage data stream in Elasticsearch
2+
// API documentation: https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-apis.html
3+
// Supported version:
4+
// - v7
5+
6+
package es
7+
8+
import (
9+
"context"
10+
"encoding/json"
11+
"fmt"
12+
elastic "github.com/elastic/go-elasticsearch/v8"
13+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
14+
"github.com/pkg/errors"
15+
log "github.com/sirupsen/logrus"
16+
"io/ioutil"
17+
)
18+
19+
type IndicesGetDataStreamResponse struct {
20+
DataStreams []interface{} `json:"data_streams,omitempty"`
21+
}
22+
23+
// resourceElasticsearchDataStream handle the data stream API call
24+
func resourceElasticsearchDataStream() *schema.Resource {
25+
return &schema.Resource{
26+
Create: resourceElasticsearchDataStreamCreate,
27+
Read: resourceElasticsearchDataStreamRead,
28+
Delete: resourceElasticsearchDataStreamDelete,
29+
30+
Importer: &schema.ResourceImporter{
31+
State: schema.ImportStatePassthrough,
32+
},
33+
34+
Schema: map[string]*schema.Schema{
35+
"name": {
36+
Type: schema.TypeString,
37+
ForceNew: true,
38+
Required: true,
39+
},
40+
},
41+
}
42+
}
43+
44+
// resourceElasticsearchDataStreamCreate create data stream
45+
func resourceElasticsearchDataStreamCreate(d *schema.ResourceData, meta interface{}) error {
46+
47+
err := createDataStream(d, meta)
48+
if err != nil {
49+
return err
50+
}
51+
d.SetId(d.Get("name").(string))
52+
return resourceElasticsearchDataStreamRead(d, meta)
53+
}
54+
55+
// resourceElasticsearchDataStreamRead read data stream
56+
func resourceElasticsearchDataStreamRead(d *schema.ResourceData, meta interface{}) error {
57+
id := d.Id()
58+
59+
client := meta.(*elastic.Client)
60+
res, err := client.API.Indices.GetDataStream(
61+
client.API.Indices.GetDataStream.WithName(id),
62+
client.API.Indices.GetDataStream.WithContext(context.Background()),
63+
client.API.Indices.GetDataStream.WithPretty(),
64+
)
65+
if err != nil {
66+
return err
67+
}
68+
defer res.Body.Close()
69+
if res.IsError() {
70+
if res.StatusCode == 404 {
71+
fmt.Printf("[WARN] Data stream %s not found - removing from state", id)
72+
log.Warnf("Data stream %s not found - removing from state", id)
73+
d.SetId("")
74+
return nil
75+
}
76+
return errors.Errorf("Error when get data stream %s: %s", id, res.String())
77+
78+
}
79+
b, err := ioutil.ReadAll(res.Body)
80+
if err != nil {
81+
return err
82+
}
83+
dataStream := IndicesGetDataStreamResponse{}
84+
if err := json.Unmarshal(b, &dataStream); err != nil {
85+
return err
86+
}
87+
88+
if len(dataStream.DataStreams) == 0 {
89+
fmt.Printf("[WARN] Data stream %s not found - removing from state", id)
90+
log.Warnf("Data stream %s not found - removing from state", id)
91+
d.SetId("")
92+
return nil
93+
}
94+
95+
dataStreamJSON, err := json.Marshal(dataStream.DataStreams[0])
96+
if err != nil {
97+
return err
98+
}
99+
100+
log.Debugf("Get data stream %s successfully:%+v", id, dataStreamJSON)
101+
d.Set("name", d.Id())
102+
return nil
103+
}
104+
105+
// resourceElasticsearchDataStreamDelete delete data stream
106+
func resourceElasticsearchDataStreamDelete(d *schema.ResourceData, meta interface{}) error {
107+
108+
id := d.Id()
109+
110+
client := meta.(*elastic.Client)
111+
res, err := client.API.Indices.DeleteDataStream(
112+
[]string{id},
113+
client.API.Indices.DeleteDataStream.WithContext(context.Background()),
114+
client.API.Indices.DeleteDataStream.WithPretty(),
115+
)
116+
117+
if err != nil {
118+
return err
119+
}
120+
121+
defer res.Body.Close()
122+
123+
if res.IsError() {
124+
if res.StatusCode == 404 {
125+
fmt.Printf("[WARN] Data stream %s not found - removing from state", id)
126+
log.Warnf("Data stream %s not found - removing from state", id)
127+
d.SetId("")
128+
return nil
129+
}
130+
return errors.Errorf("Error when delete data stream %s: %s", id, res.String())
131+
132+
}
133+
134+
d.SetId("")
135+
return nil
136+
}
137+
138+
// createDataStream create a data stream
139+
func createDataStream(d *schema.ResourceData, meta interface{}) error {
140+
name := d.Get("name").(string)
141+
142+
client := meta.(*elastic.Client)
143+
res, err := client.API.Indices.CreateDataStream(
144+
name,
145+
client.API.Indices.CreateDataStream.WithContext(context.Background()),
146+
client.API.Indices.CreateDataStream.WithPretty(),
147+
)
148+
149+
if err != nil {
150+
return err
151+
}
152+
153+
defer res.Body.Close()
154+
155+
if res.IsError() {
156+
return errors.Errorf("Error when add data stream %s: %s", name, res.String())
157+
}
158+
159+
return nil
160+
}

0 commit comments

Comments
 (0)