Skip to content

Commit 00dd182

Browse files
authored
pkg/capabilities/consensus/ocr3: use services.Engine (#1009)
1 parent 5c320a9 commit 00dd182

File tree

2 files changed

+32
-77
lines changed

2 files changed

+32
-77
lines changed

pkg/capabilities/consensus/ocr3/capability.go

Lines changed: 17 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,13 @@ var info = capabilities.MustNewCapabilityInfo(
3333
)
3434

3535
type capability struct {
36-
services.StateMachine
36+
services.Service
37+
eng *services.Engine
38+
3739
capabilities.CapabilityInfo
3840
capabilities.Validator[config, inputs, requests.Response]
3941

4042
reqHandler *requests.Handler
41-
stopCh services.StopChan
42-
wg sync.WaitGroup
43-
lggr logger.Logger
4443

4544
requestTimeout time.Duration
4645
requestTimeoutLock sync.RWMutex
@@ -67,11 +66,8 @@ func NewCapability(s *requests.Store, clock clockwork.Clock, requestTimeout time
6766
o := &capability{
6867
CapabilityInfo: info,
6968
Validator: capabilities.NewValidator[config, inputs, requests.Response](capabilities.ValidatorArgs{Info: info}),
70-
reqHandler: requests.NewHandler(lggr, s, clock, requestTimeout),
7169
clock: clock,
7270
requestTimeout: requestTimeout,
73-
stopCh: make(chan struct{}),
74-
lggr: logger.Named(lggr, "OCR3CapabilityClient"),
7571
aggregatorFactory: aggregatorFactory,
7672
aggregators: map[string]types.Aggregator{},
7773
encoderFactory: encoderFactory,
@@ -80,39 +76,16 @@ func NewCapability(s *requests.Store, clock clockwork.Clock, requestTimeout time
8076
callbackChannelBufferSize: callbackChannelBufferSize,
8177
registeredWorkflowsIDs: map[string]bool{},
8278
}
79+
o.Service, o.eng = services.Config{
80+
Name: "OCR3CapabilityClient",
81+
NewSubServices: func(l logger.Logger) []services.Service {
82+
o.reqHandler = requests.NewHandler(lggr, s, clock, requestTimeout)
83+
return []services.Service{o.reqHandler}
84+
},
85+
}.NewServiceEngine(lggr)
8386
return o
8487
}
8588

86-
func (o *capability) Start(ctx context.Context) error {
87-
return o.StartOnce("OCR3Capability", func() error {
88-
err := o.reqHandler.Start(ctx)
89-
if err != nil {
90-
return fmt.Errorf("failed to start request handler: %w", err)
91-
}
92-
93-
return nil
94-
})
95-
}
96-
97-
func (o *capability) Close() error {
98-
return o.StopOnce("OCR3Capability", func() error {
99-
close(o.stopCh)
100-
o.wg.Wait()
101-
err := o.reqHandler.Close()
102-
if err != nil {
103-
return fmt.Errorf("failed to close request handler: %w", err)
104-
}
105-
106-
return nil
107-
})
108-
}
109-
110-
func (o *capability) Name() string { return o.lggr.Name() }
111-
112-
func (o *capability) HealthReport() map[string]error {
113-
return map[string]error{o.Name(): o.Healthy()}
114-
}
115-
11689
func (o *capability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
11790
c, err := o.ValidateConfig(request.Config)
11891
if err != nil {
@@ -121,13 +94,13 @@ func (o *capability) RegisterToWorkflow(ctx context.Context, request capabilitie
12194

12295
o.mu.Lock()
12396
defer o.mu.Unlock()
124-
agg, err := o.aggregatorFactory(c.AggregationMethod, *c.AggregationConfig, o.lggr)
97+
agg, err := o.aggregatorFactory(c.AggregationMethod, *c.AggregationConfig, o.eng)
12598
if err != nil {
12699
return err
127100
}
128101
o.aggregators[request.Metadata.WorkflowID] = agg
129102

130-
encoder, err := o.encoderFactory(c.Encoder, c.EncoderConfig, o.lggr)
103+
encoder, err := o.encoderFactory(c.Encoder, c.EncoderConfig, o.eng)
131104
if err != nil {
132105
return err
133106
}
@@ -155,7 +128,7 @@ func (o *capability) GetEncoderByWorkflowID(workflowID string) (types.Encoder, e
155128
}
156129

157130
func (o *capability) GetEncoderByName(encoderName string, config *values.Map) (types.Encoder, error) {
158-
return o.encoderFactory(encoderName, config, o.lggr)
131+
return o.encoderFactory(encoderName, config, o.eng)
159132
}
160133

161134
func (o *capability) GetRegisteredWorkflowsIDs() []string {
@@ -205,7 +178,7 @@ func (o *capability) Execute(ctx context.Context, r capabilities.CapabilityReque
205178
}
206179
err := r.Inputs.UnwrapTo(&m)
207180
if err != nil {
208-
o.lggr.Warnf("could not unwrap method from CapabilityRequest, using default: %v", err)
181+
o.eng.Warnf("could not unwrap method from CapabilityRequest, using default: %v", err)
209182
}
210183

211184
switch m.Method {
@@ -214,10 +187,10 @@ func (o *capability) Execute(ctx context.Context, r capabilities.CapabilityReque
214187
if err != nil {
215188
return capabilities.CapabilityResponse{}, fmt.Errorf("failed to create map for response inputs: %w", err)
216189
}
217-
o.lggr.Debugw("Execute - sending response", "workflowExecutionID", r.Metadata.WorkflowExecutionID, "inputs", inputs, "terminate", m.Terminate)
190+
o.eng.Debugw("Execute - sending response", "workflowExecutionID", r.Metadata.WorkflowExecutionID, "inputs", inputs, "terminate", m.Terminate)
218191
var responseErr error
219192
if m.Terminate {
220-
o.lggr.Debugw("Execute - terminating execution", "workflowExecutionID", r.Metadata.WorkflowExecutionID)
193+
o.eng.Debugw("Execute - terminating execution", "workflowExecutionID", r.Metadata.WorkflowExecutionID)
221194
responseErr = capabilities.ErrStopExecution
222195
}
223196
out := requests.Response{
@@ -298,7 +271,7 @@ func (o *capability) queueRequestForProcessing(
298271
ExpiresAt: o.clock.Now().Add(requestTimeout),
299272
}
300273

301-
o.lggr.Debugw("Execute - adding to store", "workflowID", r.WorkflowID, "workflowExecutionID", r.WorkflowExecutionID, "observations", r.Observations)
274+
o.eng.Debugw("Execute - adding to store", "workflowID", r.WorkflowID, "workflowExecutionID", r.WorkflowExecutionID, "observations", r.Observations)
302275

303276
o.reqHandler.SendRequest(ctx, r)
304277
return callbackCh, nil

pkg/capabilities/consensus/ocr3/requests/handler.go

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package requests
33
import (
44
"context"
55
"fmt"
6-
"sync"
76
"time"
87

98
"github.com/jonboulle/clockwork"
@@ -18,9 +17,8 @@ type responseCacheEntry struct {
1817
}
1918

2019
type Handler struct {
21-
services.StateMachine
22-
23-
lggr logger.Logger
20+
services.Service
21+
eng *services.Engine
2422

2523
store *Store
2624

@@ -33,23 +31,23 @@ type Handler struct {
3331
requestCh chan *Request
3432

3533
clock clockwork.Clock
36-
37-
stopCh services.StopChan
38-
wg sync.WaitGroup
3934
}
4035

4136
func NewHandler(lggr logger.Logger, s *Store, clock clockwork.Clock, responseExpiryTime time.Duration) *Handler {
42-
return &Handler{
43-
lggr: lggr,
37+
h := &Handler{
4438
store: s,
4539
pendingRequests: map[string]*Request{},
4640
responseCache: map[string]*responseCacheEntry{},
4741
responseCh: make(chan Response),
4842
requestCh: make(chan *Request),
4943
clock: clock,
5044
cacheExpiryTime: responseExpiryTime,
51-
stopCh: make(services.StopChan),
5245
}
46+
h.Service, h.eng = services.Config{
47+
Name: "Handler",
48+
Start: h.start,
49+
}.NewServiceEngine(lggr)
50+
return h
5351
}
5452

5553
func (h *Handler) SendResponse(ctx context.Context, resp Response) {
@@ -68,25 +66,9 @@ func (h *Handler) SendRequest(ctx context.Context, r *Request) {
6866
}
6967
}
7068

71-
func (h *Handler) Start(_ context.Context) error {
72-
return h.StartOnce("RequestHandler", func() error {
73-
h.wg.Add(1)
74-
go func() {
75-
defer h.wg.Done()
76-
ctx, cancel := h.stopCh.NewCtx()
77-
defer cancel()
78-
h.worker(ctx)
79-
}()
80-
return nil
81-
})
82-
}
83-
84-
func (h *Handler) Close() error {
85-
return h.StopOnce("RequestHandler", func() error {
86-
close(h.stopCh)
87-
h.wg.Wait()
88-
return nil
89-
})
69+
func (h *Handler) start(_ context.Context) error {
70+
h.eng.Go(h.worker)
71+
return nil
9072
}
9173

9274
func (h *Handler) worker(ctx context.Context) {
@@ -111,13 +93,13 @@ func (h *Handler) worker(ctx context.Context) {
11193
existingResponse := h.responseCache[req.WorkflowExecutionID]
11294
if existingResponse != nil {
11395
delete(h.responseCache, req.WorkflowExecutionID)
114-
h.lggr.Debugw("Found cached response for request", "workflowExecutionID", req.WorkflowExecutionID)
96+
h.eng.Debugw("Found cached response for request", "workflowExecutionID", req.WorkflowExecutionID)
11597
h.sendResponse(ctx, req, existingResponse.response)
11698
continue
11799
}
118100

119101
if err := h.store.Add(req); err != nil {
120-
h.lggr.Errorw("failed to add request to store", "err", err)
102+
h.eng.Errorw("failed to add request to store", "err", err)
121103
}
122104

123105
case resp := <-h.responseCh:
@@ -127,7 +109,7 @@ func (h *Handler) worker(ctx context.Context) {
127109
response: resp,
128110
entryTime: h.clock.Now(),
129111
}
130-
h.lggr.Debugw("Caching response without request", "workflowExecutionID", resp.WorkflowExecutionID)
112+
h.eng.Debugw("Caching response without request", "workflowExecutionID", resp.WorkflowExecutionID)
131113
continue
132114
}
133115

@@ -165,7 +147,7 @@ func (h *Handler) expireCachedResponses() {
165147
for k, v := range h.responseCache {
166148
if h.clock.Since(v.entryTime) > h.cacheExpiryTime {
167149
delete(h.responseCache, k)
168-
h.lggr.Debugw("Expired response", "workflowExecutionID", k)
150+
h.eng.Debugw("Expired response", "workflowExecutionID", k)
169151
}
170152
}
171153
}

0 commit comments

Comments
 (0)