@@ -14,6 +14,7 @@ import (
14
14
"github.com/pkg/errors"
15
15
corev1 "k8s.io/api/core/v1"
16
16
"k8s.io/client-go/tools/record"
17
+ "k8s.io/utils/pointer"
17
18
controller "sigs.k8s.io/controller-runtime/pkg/reconcile"
18
19
19
20
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
@@ -33,6 +34,7 @@ import (
33
34
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/cleanup"
34
35
esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
35
36
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/configmap"
37
+ "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/hints"
36
38
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/initcontainer"
37
39
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label"
38
40
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/license"
@@ -44,6 +46,8 @@ import (
44
46
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/stackmon"
45
47
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/user"
46
48
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
49
+ "github.com/elastic/cloud-on-k8s/pkg/utils/optional"
50
+ "github.com/elastic/cloud-on-k8s/pkg/utils/set"
47
51
)
48
52
49
53
var (
@@ -257,6 +261,10 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results {
257
261
}
258
262
}
259
263
264
+ // Update the service account orchestration hint. This is done early in the reconciliation loop to unblock association
265
+ // controllers that may be waiting for the orchestration hint.
266
+ results .WithError (d .maybeSetServiceAccountsOrchestrationHint (ctx , esReachable , esClient , resourcesState ))
267
+
260
268
// reconcile the Elasticsearch license
261
269
if esReachable {
262
270
err = license .Reconcile (ctx , d .Client , d .ES , esClient , currentLicense )
@@ -354,6 +362,112 @@ func (d *defaultDriver) newElasticsearchClient(
354
362
)
355
363
}
356
364
365
+ // maybeSetServiceAccountsOrchestrationHint attempts to update an orchestration hint to let the association controllers
366
+ // know whether all the nodes in the cluster are ready to authenticate service accounts.
367
+ func (d * defaultDriver ) maybeSetServiceAccountsOrchestrationHint (
368
+ ctx context.Context ,
369
+ esReachable bool ,
370
+ securityClient esclient.SecurityClient ,
371
+ resourcesState * reconcile.ResourcesState ,
372
+ ) error {
373
+ if d .ReconcileState .OrchestrationHints ().ServiceAccounts .IsTrue () {
374
+ // Orchestration hint is already set to true, there is no point going back to false.
375
+ return nil
376
+ }
377
+
378
+ // Case 1: New cluster, we can immediately set the orchestration hint.
379
+ if ! bootstrap .AnnotatedForBootstrap (d .ES ) {
380
+ allNodesRunningServiceAccounts , err := esv1 .AreServiceAccountsSupported (d .ES .Spec .Version )
381
+ if err != nil {
382
+ return err
383
+ }
384
+ d .ReconcileState .UpdateOrchestrationHints (
385
+ d .ReconcileState .OrchestrationHints ().Merge (hints.OrchestrationsHints {ServiceAccounts : optional .NewBool (allNodesRunningServiceAccounts )}),
386
+ )
387
+ return nil
388
+ }
389
+
390
+ // Case 2: This is an existing cluster, but actual cluster version does not support service accounts.
391
+ if d .ES .Status .Version == "" {
392
+ return nil
393
+ }
394
+ supportServiceAccounts , err := esv1 .AreServiceAccountsSupported (d .ES .Status .Version )
395
+ if err != nil {
396
+ return err
397
+ }
398
+ if ! supportServiceAccounts {
399
+ d .ReconcileState .UpdateOrchestrationHints (
400
+ d .ReconcileState .OrchestrationHints ().Merge (hints.OrchestrationsHints {ServiceAccounts : optional .NewBool (false )}),
401
+ )
402
+ return nil
403
+ }
404
+
405
+ // Case 3: cluster is already running with a version that does support service account and tokens have already been created.
406
+ // We don't however know if all nodes have been migrated and are running with the service_tokens file mounted from the configuration Secret.
407
+ // Let's try to detect that situation by comparing the existing nodes and the ones returned by the /_security/service API.
408
+ // Note that starting with release 2.3 the association controller does not create the service account token until Elasticsearch is annotated
409
+ // as compatible with service accounts. This is mostly to unblock situation described in https://github.com/elastic/cloud-on-k8s/issues/5684
410
+ if ! esReachable {
411
+ // This requires the Elasticsearch API to be available
412
+ return nil
413
+ }
414
+ allPods := names (resourcesState .AllPods )
415
+ // Detect if some service tokens are expected
416
+ saTokens , err := user .GetServiceAccountTokens (d .Client , d .ES )
417
+ if err != nil {
418
+ log .Info ("Could not detect if service accounts are expected" , "err" , err , "namespace" , d .ES .Namespace , "es_name" , d .ES .Name )
419
+ return err
420
+ }
421
+
422
+ allNodesRunningServiceAccounts , err := allNodesRunningServiceAccounts (ctx , saTokens , set .Make (allPods ... ), securityClient )
423
+ if err != nil {
424
+ log .Info ("Could not detect if all nodes are ready for using service accounts" , "err" , err , "namespace" , d .ES .Namespace , "es_name" , d .ES .Name )
425
+ return err
426
+ }
427
+ if allNodesRunningServiceAccounts != nil {
428
+ d .ReconcileState .UpdateOrchestrationHints (
429
+ d .ReconcileState .OrchestrationHints ().Merge (hints.OrchestrationsHints {ServiceAccounts : optional .NewBool (* allNodesRunningServiceAccounts )}),
430
+ )
431
+ }
432
+
433
+ return nil
434
+ }
435
+
436
+ // allNodesRunningServiceAccounts attempts to detect if all the nodes in the clusters have loaded the service_tokens file.
437
+ // It returns nil if no decision can be made, for example when there is no tokens are expected to be found.
438
+ func allNodesRunningServiceAccounts (
439
+ ctx context.Context ,
440
+ saTokens user.ServiceAccountTokens ,
441
+ allPods set.StringSet ,
442
+ securityClient esclient.SecurityClient ,
443
+ ) (* bool , error ) {
444
+ if len (allPods ) == 0 {
445
+ return nil , nil
446
+ }
447
+ if len (saTokens ) == 0 {
448
+ // No tokens are expected: we cannot call the Elasticsearch API to detect which nodes are
449
+ // running with the conf/service_tokens file.
450
+ return nil , nil
451
+ }
452
+
453
+ // Get the namespaced service name to call the /_security/service/<namespace>/<service>/credential API
454
+ namespacedServices := saTokens .NamespacedServices ()
455
+
456
+ // Get the nodes which have loaded tokens from the conf/service_tokens file.
457
+ for namespacedService := range namespacedServices {
458
+ credentials , err := securityClient .GetServiceAccountCredentials (ctx , namespacedService )
459
+ if err != nil {
460
+ return nil , err
461
+ }
462
+ diff := allPods .Diff (credentials .Nodes ())
463
+ if len (diff ) == 0 {
464
+ return pointer .Bool (true ), nil
465
+ }
466
+ }
467
+ // Some nodes are running but did not show up in the security API.
468
+ return pointer .Bool (false ), nil
469
+ }
470
+
357
471
// warnUnsupportedDistro sends an event of type warning if the Elasticsearch Docker image is not a supported
358
472
// distribution by looking at if the prepare fs init container terminated with the UnsupportedDistro exit code.
359
473
func warnUnsupportedDistro (pods []corev1.Pod , recorder * events.Recorder ) {
0 commit comments