Skip to content

Commit 2cd4257

Browse files
author
vkropotko
committed
feat(esClientDrain): enhance Drain ES Client function
Add an ability to set parameters for retry logic with Resty lib Add handling for not finishned shards migration Signed-off-by: vkropotko <[email protected]>
1 parent 97eefb7 commit 2cd4257

File tree

7 files changed

+227
-44
lines changed

7 files changed

+227
-44
lines changed

main.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,28 @@ const (
2626
defaultMetricsAddress = ":7979"
2727
defaultClientGoTimeout = 30 * time.Second
2828
defaultClusterDNSZone = "cluster.local."
29+
defaultRetryCount = "999"
30+
defaultRetryWaitTime = "10s"
31+
defaultRetryMaxWaitTime = "30s"
2932
)
3033

3134
var (
3235
config struct {
33-
Interval time.Duration
34-
AutoscalerInterval time.Duration
35-
APIServer *url.URL
36-
PodSelectors Labels
37-
PriorityNodeSelectors Labels
38-
MetricsAddress string
39-
ClientGoTimeout time.Duration
40-
Debug bool
41-
OperatorID string
42-
Namespace string
43-
ClusterDNSZone string
44-
ElasticsearchEndpoint *url.URL
36+
Interval time.Duration
37+
AutoscalerInterval time.Duration
38+
APIServer *url.URL
39+
PodSelectors Labels
40+
PriorityNodeSelectors Labels
41+
MetricsAddress string
42+
ClientGoTimeout time.Duration
43+
Debug bool
44+
OperatorID string
45+
Namespace string
46+
ClusterDNSZone string
47+
ElasticsearchEndpoint *url.URL
48+
EsClientRetryCount int
49+
EsClientRetryWaitTime time.Duration
50+
EsClientRetryMaxWaitTime time.Duration
4551
}
4652
)
4753

@@ -71,6 +77,9 @@ func main() {
7177
URLVar(&config.ElasticsearchEndpoint)
7278
kingpin.Flag("namespace", "Limit operator to a certain namespace").
7379
Default(v1.NamespaceAll).StringVar(&config.Namespace)
80+
kingpin.Flag("esclient-retry-count", "Count of retry operations conducted by EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryCount).IntVar(&config.EsClientRetryCount)
81+
kingpin.Flag("esclient-retry-waittime", "Wait time between two operations of EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryWaitTime).DurationVar(&config.EsClientRetryWaitTime)
82+
kingpin.Flag("esclient-retry-max-waittime", "Max wait time between two operations of EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryMaxWaitTime).DurationVar(&config.EsClientRetryMaxWaitTime)
7483

7584
kingpin.Parse()
7685

@@ -98,6 +107,9 @@ func main() {
98107
config.Namespace,
99108
config.ClusterDNSZone,
100109
config.ElasticsearchEndpoint,
110+
config.EsClientRetryCount,
111+
config.EsClientRetryWaitTime,
112+
config.EsClientRetryMaxWaitTime,
101113
)
102114

103115
go handleSigterm(cancel)

operator/elasticsearch.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ type ElasticsearchOperator struct {
4747
elasticsearchEndpoint *url.URL
4848
operating map[types.UID]operatingEntry
4949
sync.Mutex
50-
recorder kube_record.EventRecorder
50+
recorder kube_record.EventRecorder
51+
esClientRestyConfig *RestyConfig
5152
}
5253

5354
type operatingEntry struct {
@@ -66,6 +67,9 @@ func NewElasticsearchOperator(
6667
namespace,
6768
clusterDNSZone string,
6869
elasticsearchEndpoint *url.URL,
70+
esClientRetryCount int,
71+
esClientRetryWaitTime,
72+
esClientRetryMaxWaitTime time.Duration,
6973
) *ElasticsearchOperator {
7074
return &ElasticsearchOperator{
7175
logger: log.WithFields(
@@ -84,6 +88,11 @@ func NewElasticsearchOperator(
8488
elasticsearchEndpoint: elasticsearchEndpoint,
8589
operating: make(map[types.UID]operatingEntry),
8690
recorder: createEventRecorder(client),
91+
esClientRestyConfig: &RestyConfig{
92+
ClientRetryCount: esClientRetryCount,
93+
ClientRetryWaitTime: esClientRetryWaitTime,
94+
ClientRetryMaxWaitTime: esClientRetryMaxWaitTime,
95+
},
8796
}
8897
}
8998

@@ -497,8 +506,8 @@ func (r *EDSResource) ensureService(ctx context.Context) error {
497506
}
498507

499508
// Drain drains a pod for Elasticsearch data.
500-
func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod) error {
501-
return r.esClient.Drain(ctx, pod)
509+
func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error {
510+
return r.esClient.Drain(ctx, pod, config)
502511
}
503512

504513
// PreScaleDownHook ensures that the IndexReplicas is set as defined in the EDS
@@ -641,6 +650,7 @@ func (o *ElasticsearchOperator) operateEDS(eds *zv1.ElasticsearchDataSet, delete
641650
interval: o.interval,
642651
logger: logger,
643652
recorder: o.recorder,
653+
esClientRestyConfig: o.esClientRestyConfig,
644654
}
645655

646656
rs := &EDSResource{

operator/elasticsearch_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ import (
1515
"k8s.io/apimachinery/pkg/types"
1616
)
1717

18+
const (
19+
defaultRetryCount = 999
20+
defaultRetryWaitTime = 10 * time.Second
21+
defaultRetryMaxWaitTime = 30 * time.Second
22+
)
23+
1824
func TestHasOwnership(t *testing.T) {
1925
eds := &zv1.ElasticsearchDataSet{
2026
ObjectMeta: metav1.ObjectMeta{
@@ -43,7 +49,8 @@ func TestGetElasticsearchEndpoint(t *testing.T) {
4349
faker := &clientset.Clientset{
4450
Interface: fake.NewSimpleClientset(),
4551
}
46-
esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil)
52+
esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil,
53+
defaultRetryCount, defaultRetryWaitTime, defaultRetryMaxWaitTime)
4754

4855
eds := &zv1.ElasticsearchDataSet{
4956
ObjectMeta: metav1.ObjectMeta{
@@ -59,7 +66,8 @@ func TestGetElasticsearchEndpoint(t *testing.T) {
5966
customEndpoint, err := url.Parse(customURL)
6067
assert.NoError(t, err)
6168

62-
esOperator = NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", ".cluster.local.", customEndpoint)
69+
esOperator = NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", ".cluster.local.", customEndpoint,
70+
defaultRetryCount, defaultRetryWaitTime, defaultRetryMaxWaitTime)
6371
url = esOperator.getElasticsearchEndpoint(eds)
6472
assert.Equal(t, customURL, url.String())
6573
}

operator/es_client.go

Lines changed: 75 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ import (
1717
v1 "k8s.io/api/core/v1"
1818
)
1919

20-
// TODO make configurable as flags.
21-
var (
22-
defaultRetryCount = 999
23-
defaultRetryWaitTime = 10 * time.Second
24-
defaultRetryMaxWaitTime = 30 * time.Second
25-
)
20+
// Restry Configuration
21+
type RestyConfig struct {
22+
ClientRetryCount int
23+
ClientRetryWaitTime time.Duration
24+
ClientRetryMaxWaitTime time.Duration
25+
}
2626

2727
// ESClient is a pod drainer which can drain data from Elasticsearch pods.
2828
type ESClient struct {
@@ -91,7 +91,7 @@ func (c *ESClient) logger() *log.Entry {
9191
}
9292

9393
// Drain drains data from an Elasticsearch pod.
94-
func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error {
94+
func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod, config *RestyConfig) error {
9595

9696
c.logger().Info("Ensuring cluster is in green state")
9797

@@ -111,7 +111,7 @@ func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error {
111111
}
112112

113113
c.logger().Info("Waiting for draining to finish")
114-
return c.waitForEmptyEsNode(ctx, pod)
114+
return c.waitForEmptyEsNode(ctx, pod, config)
115115
}
116116

117117
func (c *ESClient) Cleanup(ctx context.Context) error {
@@ -220,6 +220,7 @@ func (c *ESClient) excludePodIP(pod *v1.Pod) error {
220220
if excludeString != "" {
221221
ips = strings.Split(excludeString, ",")
222222
}
223+
223224
var foundPodIP bool
224225
for _, ip := range ips {
225226
if ip == podIP {
@@ -256,6 +257,42 @@ func (c *ESClient) setExcludeIPs(ips string) error {
256257
return nil
257258
}
258259

260+
// remove the podIP from Elasticsearch exclude._ip list
261+
func (c *ESClient) removeFromExcludeIPList(pod *v1.Pod) error {
262+
263+
c.mux.Lock()
264+
defer c.mux.Unlock()
265+
266+
podIP := pod.Status.PodIP
267+
268+
esSettings, err := c.getClusterSettings()
269+
if err != nil {
270+
return err
271+
}
272+
273+
excludedIPsString := esSettings.Transient.Cluster.Routing.Allocation.Exclude.IP
274+
excludedIPs := strings.Split(excludedIPsString, ",")
275+
var newExcludedIPs []string
276+
for _, excludeIP := range excludedIPs {
277+
if excludeIP != podIP {
278+
newExcludedIPs = append(newExcludedIPs, excludeIP)
279+
sort.Strings(newExcludedIPs)
280+
}
281+
}
282+
283+
newExcludedIPsString := strings.Join(newExcludedIPs, ",")
284+
if newExcludedIPsString != excludedIPsString {
285+
c.logger().Infof("Setting exclude list to '%s'", newExcludedIPsString)
286+
287+
err = c.setExcludeIPs(newExcludedIPsString)
288+
if err != nil {
289+
return err
290+
}
291+
}
292+
293+
return nil
294+
}
295+
259296
func (c *ESClient) updateAutoRebalance(value string) error {
260297
resp, err := resty.New().R().
261298
SetHeader("Content-Type", "application/json").
@@ -276,13 +313,13 @@ func (c *ESClient) updateAutoRebalance(value string) error {
276313
}
277314

278315
// repeatedly query shard allocations to ensure success of drain operation.
279-
func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error {
316+
func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod, config *RestyConfig) error {
280317
// TODO: implement context handling
281318
podIP := pod.Status.PodIP
282-
_, err := resty.New().
283-
SetRetryCount(defaultRetryCount).
284-
SetRetryWaitTime(defaultRetryWaitTime).
285-
SetRetryMaxWaitTime(defaultRetryMaxWaitTime).
319+
resp, err := resty.New().
320+
SetRetryCount(config.ClientRetryCount).
321+
SetRetryWaitTime(config.ClientRetryWaitTime).
322+
SetRetryMaxWaitTime(config.ClientRetryMaxWaitTime).
286323
AddRetryCondition(
287324
// It is expected to return (bool, error) pair. Resty will retry
288325
// in case condition returns true or non nil error.
@@ -292,7 +329,6 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error {
292329
if err != nil {
293330
return true, err
294331
}
295-
// shardIP := make(map[string]bool)
296332
remainingShards := 0
297333
for _, shard := range shards {
298334
if shard.IP == podIP {
@@ -312,9 +348,33 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error {
312348
},
313349
).R().
314350
Get(c.Endpoint.String() + "/_cat/shards?h=index,ip&format=json")
351+
315352
if err != nil {
316353
return err
317354
}
355+
356+
// make sure the IP is still excluded, this could have been updated in the meantime.
357+
if err = c.excludePodIP(pod); err != nil {
358+
return err
359+
}
360+
361+
var shards []ESShard
362+
err = json.Unmarshal(resp.Body(), &shards)
363+
if err != nil {
364+
return err
365+
}
366+
367+
for _, shard := range shards {
368+
if shard.IP == podIP {
369+
err = fmt.Errorf("Cannot migrate shards from pod '%s' with IP '%s' within provided intervals", pod.ObjectMeta.Name, pod.Status.PodIP)
370+
// if we cannot remove node than return it back active nodes pool
371+
if errExclude := c.removeFromExcludeIPList(pod); errExclude != nil {
372+
return fmt.Errorf("during handling request error: '%v' another error has been raised '%v'", err, errExclude)
373+
}
374+
return err
375+
}
376+
}
377+
318378
return nil
319379
}
320380

@@ -452,7 +512,7 @@ func (c *ESClient) CreateIndex(indexName, groupName string, shards, replicas int
452512
SetHeader("Content-Type", "application/json").
453513
SetBody([]byte(
454514
fmt.Sprintf(
455-
`{"settings": {"index" : {"number_of_replicas" : "%d", "number_of_shards": "%d",
515+
`{"settings": {"index" : {"number_of_replicas" : "%d", "number_of_shards": "%d",
456516
"routing.allocation.include.group": "%s"}}}`,
457517
replicas,
458518
shards,

0 commit comments

Comments
 (0)