Skip to content

Commit 9cf017a

Browse files
committed
[opendistro destination] Fix datasource get in odfe>=1.11.0
1 parent cb71889 commit 9cf017a

File tree

2 files changed

+85
-21
lines changed

2 files changed

+85
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22
## Unreleased
33
### Changed
4+
- [opendistro destination] Use API to get in odfe>=1.11.0 (#158)
45

56
### Added
67

es/data_source_elasticsearch_opendistro_destination.go

Lines changed: 84 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"log"
99

1010
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
11+
"github.com/olivere/elastic/uritemplates"
1112

1213
elastic7 "github.com/olivere/elastic/v7"
1314
elastic6 "gopkg.in/olivere/elastic.v6"
@@ -47,19 +48,26 @@ func dataSourceElasticsearchOpenDistroDestination() *schema.Resource {
4748
func dataSourceElasticsearchOpenDistroDestinationRead(d *schema.ResourceData, m interface{}) error {
4849
destinationName := d.Get("name").(string)
4950

50-
// See https://github.com/opendistro-for-elasticsearch/alerting/issues/70, no tags or API endpoint for searching destination
5151
var id string
52-
var body *json.RawMessage
52+
var destination map[string]interface{}
5353
var err error
5454
esClient, err := getClient(m.(*ProviderConf))
5555
if err != nil {
5656
return err
5757
}
5858
switch client := esClient.(type) {
5959
case *elastic7.Client:
60-
id, body, err = destinationElasticsearch7Search(client, DESTINATION_INDEX, destinationName)
60+
// See https://github.com/opendistro-for-elasticsearch/alerting/issues/70,
61+
// no tags or API endpoint for searching destination. In ODFE >= 1.11.0,
62+
// the index has become a "system index", so it cannot be searched:
63+
// https://opendistro.github.io/for-elasticsearch-docs/docs/alerting/settings/#alerting-indices
64+
// instead we paginate through all destinations to find the first name match :|
65+
id, destination, err = destinationElasticsearch7GetAll(client, destinationName)
66+
if err != nil {
67+
id, destination, err = destinationElasticsearch7Search(client, DESTINATION_INDEX, destinationName)
68+
}
6169
case *elastic6.Client:
62-
id, body, err = destinationElasticsearch6Search(client, DESTINATION_INDEX, destinationName)
70+
id, destination, err = destinationElasticsearch6Search(client, DESTINATION_INDEX, destinationName)
6371
default:
6472
err = errors.New("destination resource not implemented prior to Elastic v6")
6573
}
@@ -71,17 +79,13 @@ func dataSourceElasticsearchOpenDistroDestinationRead(d *schema.ResourceData, m
7179
return nil
7280
}
7381

74-
destination := make(map[string]interface{})
75-
if err := json.Unmarshal(*body, &destination); err != nil {
76-
return fmt.Errorf("error unmarshalling destination body: %+v: %+v", err, body)
77-
}
78-
7982
d.SetId(id)
8083

8184
// we get a non-uniform map[string]interface{} back for the body, terraform
82-
// only accepts a mapping of string to primitive values
85+
// only accepts a mapping of string to primitive values. We want to save
86+
// this as a map so that attributes are accessible
8387
simplifiedBody := map[string]string{}
84-
for key, value := range destination["destination"].(map[string]interface{}) {
88+
for key, value := range destination {
8589
if stringified, ok := value.(string); ok {
8690
simplifiedBody[key] = stringified
8791
} else {
@@ -92,40 +96,99 @@ func dataSourceElasticsearchOpenDistroDestinationRead(d *schema.ResourceData, m
9296
return err
9397
}
9498

95-
func destinationElasticsearch7Search(client *elastic7.Client, index string, name string) (string, *json.RawMessage, error) {
99+
func destinationElasticsearch7Search(client *elastic7.Client, index string, name string) (string, map[string]interface{}, error) {
96100
termQuery := elastic7.NewTermQuery(DESTINATION_NAME_FIELD, name)
97101
result, err := client.Search().
98102
Index(index).
99103
Query(termQuery).
100104
Do(context.TODO())
101105

106+
destination := make(map[string]interface{})
102107
if err != nil {
103-
return "", nil, err
108+
return "", destination, err
104109
}
105110
if result.TotalHits() == 1 {
106-
return result.Hits.Hits[0].Id, &result.Hits.Hits[0].Source, nil
111+
if err := json.Unmarshal(result.Hits.Hits[0].Source, &destination); err != nil {
112+
return "", destination, fmt.Errorf("error unmarshalling destination body: %+v", err)
113+
}
114+
115+
return result.Hits.Hits[0].Id, destination["destination"].(map[string]interface{}), nil
107116
} else if result.TotalHits() < 1 {
108-
return "", nil, err
117+
return "", destination, err
109118
} else {
110-
return "", nil, fmt.Errorf("1 result expected, found %d.", result.TotalHits())
119+
return "", destination, fmt.Errorf("1 result expected, found %d.", result.TotalHits())
111120
}
112121
}
113122

114-
func destinationElasticsearch6Search(client *elastic6.Client, index string, name string) (string, *json.RawMessage, error) {
123+
func destinationElasticsearch6Search(client *elastic6.Client, index string, name string) (string, map[string]interface{}, error) {
115124
termQuery := elastic6.NewTermQuery(DESTINATION_NAME_FIELD, name)
116125
result, err := client.Search().
117126
Index(index).
118127
Query(termQuery).
119128
Do(context.TODO())
120129

130+
destination := make(map[string]interface{})
121131
if err != nil {
122-
return "", nil, err
132+
return "", destination, err
123133
}
124134
if result.TotalHits() == 1 {
125-
return result.Hits.Hits[0].Id, result.Hits.Hits[0].Source, nil
135+
if err := json.Unmarshal(*result.Hits.Hits[0].Source, &destination); err != nil {
136+
return "", destination, fmt.Errorf("error unmarshalling destination body: %+v", err)
137+
}
138+
139+
return result.Hits.Hits[0].Id, destination["destination"].(map[string]interface{}), nil
126140
} else if result.TotalHits() < 1 {
127-
return "", nil, err
141+
return "", destination, err
128142
} else {
129-
return "", nil, fmt.Errorf("1 result expected, found %d.", result.TotalHits())
143+
return "", destination, fmt.Errorf("1 result expected, found %d.", result.TotalHits())
130144
}
131145
}
146+
147+
func destinationElasticsearch7GetAll(client *elastic7.Client, name string) (string, map[string]interface{}, error) {
148+
offset := 0
149+
pageSize := 1000
150+
destination := make(map[string]interface{})
151+
for {
152+
path, err := uritemplates.Expand("/_opendistro/_alerting/destinations?startIndex={startIndex}&size={size}", map[string]string{
153+
"startIndex": fmt.Sprint(offset),
154+
"size": fmt.Sprint(pageSize),
155+
})
156+
if err != nil {
157+
return "", destination, fmt.Errorf("error building URL path for destination: %+v", err)
158+
}
159+
160+
httpResponse, err := client.PerformRequest(context.TODO(), elastic7.PerformRequestOptions{
161+
Method: "GET",
162+
Path: path,
163+
})
164+
if err != nil {
165+
return "", destination, err
166+
}
167+
168+
var drg destinationResponseGet
169+
if err := json.Unmarshal(httpResponse.Body, &drg); err != nil {
170+
return "", destination, fmt.Errorf("error unmarshalling destination body: %+v", err)
171+
}
172+
173+
for _, d := range drg.Destinations {
174+
if d.Name == name {
175+
j, err := json.Marshal(d)
176+
if err != nil {
177+
return "", destination, fmt.Errorf("error marshalling destination: %+v", err)
178+
}
179+
if err := json.Unmarshal(j, &destination); err != nil {
180+
return "", destination, fmt.Errorf("error unmarshalling destination body: %+v", err)
181+
}
182+
return d.ID, destination, nil
183+
}
184+
}
185+
186+
if drg.Total > offset {
187+
offset += pageSize
188+
} else {
189+
break
190+
}
191+
}
192+
193+
return "", destination, fmt.Errorf("destination not found")
194+
}

0 commit comments

Comments
 (0)