@@ -11,7 +11,6 @@ import (
11
11
"time"
12
12
13
13
"github.com/bsm/redislock"
14
- "github.com/pace/bricks/backend/k8sapi"
15
14
"github.com/pace/bricks/maintenance/errors"
16
15
"github.com/pace/bricks/maintenance/health"
17
16
"github.com/pace/bricks/maintenance/log"
@@ -26,8 +25,6 @@ const (
26
25
ACTIVE status = 1
27
26
)
28
27
29
- const Label = "github.com.pace.bricks.activepassive"
30
-
31
28
// ActivePassive implements a failover mechanism that allows
32
29
// to deploy a service multiple times but ony one will accept
33
30
// traffic by using the label selector of kubernetes.
@@ -51,31 +48,76 @@ type ActivePassive struct {
51
48
timeToFailover time.Duration
52
49
locker * redislock.Client
53
50
54
- // access to the kubernetes api
55
- k8sClient * k8sapi.Client
51
+ stateSetter StateSetter
56
52
57
53
// current status of the failover (to show it in the readiness status)
58
54
state status
59
55
stateMu sync.RWMutex
60
56
}
61
57
58
+ type ActivePassiveOption func (* ActivePassive ) error
59
+
60
+ func WithCustomStateSetter (fn func (ctx context.Context , state string ) error ) ActivePassiveOption {
61
+ return func (ap * ActivePassive ) error {
62
+ stateSetter , err := NewCustomStateSetter (fn )
63
+ if err != nil {
64
+ return fmt .Errorf ("failed to create state setter: %w" , err )
65
+ }
66
+
67
+ ap .stateSetter = stateSetter
68
+
69
+ return nil
70
+ }
71
+ }
72
+
73
+ func WithNoopStateSetter () ActivePassiveOption {
74
+ return func (ap * ActivePassive ) error {
75
+ ap .stateSetter = & NoopStateSetter {}
76
+
77
+ return nil
78
+ }
79
+ }
80
+
81
+ func WithPodStateSetter () ActivePassiveOption {
82
+ return func (ap * ActivePassive ) error {
83
+ stateSetter , err := NewPodStateSetter ()
84
+ if err != nil {
85
+ return fmt .Errorf ("failed to create pod state setter: %w" , err )
86
+ }
87
+
88
+ ap .stateSetter = stateSetter
89
+
90
+ return nil
91
+ }
92
+ }
93
+
62
94
// NewActivePassive creates a new active passive cluster
63
95
// identified by the name. The time to fail over determines
64
96
// the frequency of checks performed against redis to
65
97
// keep the active state.
66
98
// NOTE: creating multiple ActivePassive in one process
67
99
// is not working correctly as there is only one readiness probe.
68
- func NewActivePassive (clusterName string , timeToFailover time.Duration , client * redis.Client ) (* ActivePassive , error ) {
69
- k8sClient , err := k8sapi .NewClient ()
70
- if err != nil {
71
- return nil , err
72
- }
73
-
100
+ func NewActivePassive (clusterName string , timeToFailover time.Duration , client * redis.Client , opts ... ActivePassiveOption ) (* ActivePassive , error ) {
74
101
activePassive := & ActivePassive {
75
102
clusterName : clusterName ,
76
103
timeToFailover : timeToFailover ,
77
104
locker : redislock .New (client ),
78
- k8sClient : k8sClient ,
105
+ }
106
+
107
+ for _ , opt := range opts {
108
+ if err := opt (activePassive ); err != nil {
109
+ return nil , fmt .Errorf ("failed to apply option: %w" , err )
110
+ }
111
+ }
112
+
113
+ if activePassive .stateSetter == nil {
114
+ var err error
115
+
116
+ // Default state setter uses the k8s api to set the state.
117
+ activePassive .stateSetter , err = NewPodStateSetter ()
118
+ if err != nil {
119
+ return nil , fmt .Errorf ("failed to create default state setter: %w" , err )
120
+ }
79
121
}
80
122
81
123
health .SetCustomReadinessCheck (activePassive .Handler )
@@ -198,7 +240,7 @@ func (a *ActivePassive) becomeUndefined(ctx context.Context) {
198
240
199
241
// setState returns true if the state was set successfully
200
242
func (a * ActivePassive ) setState (ctx context.Context , state status ) bool {
201
- err := a .k8sClient . SetCurrentPodLabel (ctx , Label , a .label (state ))
243
+ err := a .stateSetter . SetState (ctx , a .label (state ))
202
244
if err != nil {
203
245
log .Ctx (ctx ).Error ().Err (err ).Msg ("failed to mark pod as undefined" )
204
246
a .stateMu .Lock ()
0 commit comments