Skip to content

Commit dd0a96b

Browse files
committed
[coordinator] Introduce a unified service manager
Signed-off-by: Liran Funaro <liran.funaro@gmail.com>
1 parent 4322e38 commit dd0a96b

17 files changed

Lines changed: 784 additions & 804 deletions

docs/metrics_reference.md

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,22 @@ The following Sidecar metrics are exported for consumption by Prometheus.
5151

5252
The following Coordinator metrics are exported for consumption by Prometheus.
5353

54-
| Name | Type | Labels | Description |
55-
|------------------------------------------------------------|---------|-------------|------------------------------------------------------------------------------------------------------------|
56-
| coordinator_grpc_received_transaction_total | counter | | Total number of transactions received by the coordinator service from the client. |
57-
| coordinator_grpc_committed_transaction_total | counter | status | Total number of transactions committed status sent by the coordinator service to the client. |
58-
| coordinator_verifier_input_tx_batch_queue_size | gauge | | Size of the input transaction batch queue of the signature verifier manager. |
59-
| coordinator_verifier_output_validated_tx_batch_queue_size | gauge | | Size of the output validated transaction batch queue of the signature verifier manager. |
60-
| coordinator_vcservice_output_tx_status_batch_queue_size | gauge | | Size of the output transaction status batch queue of the validation and committer service manager. |
61-
| coordinator_vcservice_output_validated_tx_batch_queue_size | gauge | | Size of the output validated transaction batch queue of the validation and committer service manager. |
62-
| coordinator_verifier_transaction_processed_total | counter | | Total number of transactions processed by the signature verifier manager. |
63-
| coordinator_vcservice_transaction_processed_total | counter | | Total number of transactions processed by the validation and committer service manager. |
64-
| coordinator_verifier_connection_status | gauge | grpc_target | Connection status to verifier service by grpc target (1 = connected, 0 = disconnected). |
65-
| coordinator_verifier_connection_failure_total | counter | grpc_target | Total number of connection failures to verifier service. Short-lived failures may not always be captured. |
66-
| coordinator_vcservice_retired_transaction_total | counter | | Total number of transactions retried by the validation and committer service manager. |
67-
| coordinator_vcservice_connection_status | gauge | grpc_target | Connection status to vcservice service by grpc target (1 = connected, 0 = disconnected). |
68-
| coordinator_vcservice_connection_failure_total | counter | grpc_target | Total number of connection failures to vcservice service. Short-lived failures may not always be captured. |
69-
| coordinator_verifier_retired_transaction_total | counter | | Total number of transactions retried by the signature verifier manager. |
54+
| Name | Type | Labels | Description |
55+
|---------------------------------------------------|---------|-------------|------------------------------------------------------------------------------------------------------------|
56+
| coordinator_grpc_received_transaction_total | counter | | Total number of transactions received by the coordinator service from the client. |
57+
| coordinator_grpc_committed_transaction_total | counter | status | Total number of transactions committed status sent by the coordinator service to the client. |
58+
| coordinator_verifier_connection_status | gauge | grpc_target | Connection status to verifier service by grpc target (1 = connected, 0 = disconnected). |
59+
| coordinator_verifier_connection_failure_total | counter | grpc_target | Total number of connection failures to verifier service. Short-lived failures may not always be captured. |
60+
| coordinator_verifier_transaction_processed_total | counter | | Total number of transactions processed by the manager. |
61+
| coordinator_verifier_transaction_retired_total | counter | | Total number of transactions retried by the manager. |
62+
| coordinator_verifier_input_batch_queue_size | gauge | | Size of the input batch queue of the manager. |
63+
| coordinator_verifier_output_batch_queue_size | gauge | | Size of the output batch queue of the manager. |
64+
| coordinator_vcservice_connection_status | gauge | grpc_target | Connection status to vcservice service by grpc target (1 = connected, 0 = disconnected). |
65+
| coordinator_vcservice_connection_failure_total | counter | grpc_target | Total number of connection failures to vcservice service. Short-lived failures may not always be captured. |
66+
| coordinator_vcservice_transaction_processed_total | counter | | Total number of transactions processed by the manager. |
67+
| coordinator_vcservice_transaction_retired_total | counter | | Total number of transactions retried by the manager. |
68+
| coordinator_vcservice_input_batch_queue_size | gauge | | Size of the input batch queue of the manager. |
69+
| coordinator_vcservice_output_batch_queue_size | gauge | | Size of the output batch queue of the manager. |
7070

7171
## Verifier Metrics
7272

scripts/metrics_doc_extract.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,14 @@ def print_all_metrics_from_content(content: str):
6464
print_single_metric_from_block(metric_type, is_vec, params_block)
6565
continue
6666

67+
params_block = extract_block(params_block, "{}")
68+
if not params_block:
69+
continue
70+
6771
# match_type == 'method'
6872
package_name, function_name = rest
6973

70-
# Determine the source file based on the package name
74+
# Determine the source file based on the package name.
7175
source_file = repo_root_dir / "utils" / package_name / "metrics.go"
7276
if not source_file.exists():
7377
print(f"Warning: {source_file} not found", file=sys.stderr)
@@ -78,11 +82,13 @@ def print_all_metrics_from_content(content: str):
7882
if not func_body:
7983
continue
8084

81-
# Substitute params.Namespace and params.Subsystem in the function body
85+
# Substitute "params.Namespace" and "params.Subsystem" in the function body.
8286
namespace = extract_field(params_block, "Namespace")
8387
func_body = re.sub(r'\bparams\.Namespace\b', f'"{namespace}"', func_body)
8488
subsystem = extract_field(params_block, "Subsystem")
8589
func_body = re.sub(r'\bparams\.Subsystem\b', f'"{subsystem}"', func_body)
90+
# Substitute "params" with the actual params definition, so we can extract nested metrics.
91+
func_body = re.sub(r'\bparams\b', f'monitoring.MetricsParameters{{{params_block}}}', func_body)
8692

8793
# Extract metrics from the substituted function body
8894
print_all_metrics_from_content(func_body)

service/coordinator/coordinator.go

Lines changed: 17 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"context"
1111
"sync"
1212
"sync/atomic"
13-
"time"
1413

1514
"github.com/cockroachdb/errors"
1615
"github.com/hyperledger/fabric-lib-go/common/flogging"
@@ -29,6 +28,7 @@ import (
2928
"github.com/hyperledger/fabric-x-committer/utils/monitoring"
3029
"github.com/hyperledger/fabric-x-committer/utils/monitoring/promutil"
3130
"github.com/hyperledger/fabric-x-committer/utils/serve"
31+
"github.com/hyperledger/fabric-x-committer/utils/servicemanager"
3232
)
3333

3434
var logger = flogging.MustGetLogger("coordinator")
@@ -39,8 +39,9 @@ type (
3939
Service struct {
4040
servicepb.UnimplementedCoordinatorServer
4141
dependencyMgr *dependencygraph.Manager
42-
signatureVerifierMgr *signatureVerifierManager
43-
validatorCommitterMgr *validatorCommitterManager
42+
signatureVerifierMgr *servicemanager.Manager
43+
validatorCommitterMgr *servicemanager.Manager
44+
validatorCommitterAPI *validatorCommitterAPI
4445
policyMgr *policyManager
4546
queues *channels
4647
config *Config
@@ -142,8 +143,8 @@ func NewCoordinatorService(c *Config) *Service {
142143

143144
policyMgr := newPolicyManager()
144145

145-
svMgr := newSignatureVerifierManager(
146-
&signVerifierManagerConfig{
146+
svMgr := newVerifierManager(
147+
&verifierManagerParams{
147148
clientConfig: &c.Verifier,
148149
incomingTxsForValidation: queues.depGraphToSigVerifierFreeTxs,
149150
outgoingValidatedTxs: queues.sigVerifierToVCServiceValidatedTxs,
@@ -184,11 +185,6 @@ func (c *Service) Run(ctx context.Context) error {
184185
defer cancel()
185186
g, eCtx := errgroup.WithContext(canCtx)
186187

187-
g.Go(func() error {
188-
c.monitorQueues(eCtx)
189-
return nil
190-
})
191-
192188
g.Go(func() error {
193189
logger.Info("Starting dependency graph manager")
194190
c.dependencyMgr.Run(eCtx)
@@ -197,7 +193,7 @@ func (c *Service) Run(ctx context.Context) error {
197193

198194
g.Go(func() error {
199195
logger.Info("Starting signature verifier manager")
200-
if err := c.signatureVerifierMgr.run(eCtx); err != nil {
196+
if err := c.signatureVerifierMgr.Run(eCtx); err != nil {
201197
logger.Errorf("coordinator service stops due to an error returned by signature verifier manager: %v", err)
202198
return err
203199
}
@@ -206,20 +202,23 @@ func (c *Service) Run(ctx context.Context) error {
206202

207203
g.Go(func() error {
208204
logger.Info("Starting validator committer manager")
209-
if err := c.validatorCommitterMgr.run(eCtx); err != nil {
205+
if err := c.validatorCommitterMgr.Run(eCtx); err != nil {
210206
logger.Errorf("coordinator service stopped due "+
211207
"to an error returned by validator committer manager: %v", err)
212208
return err
213209
}
214210
return nil
215211
})
216212

217-
if !c.validatorCommitterMgr.ready.WaitForReady(eCtx) {
218-
return g.Wait()
213+
var err error
214+
c.validatorCommitterAPI, err = newValidatorCommitterAPI(ctx, &c.config.ValidatorCommitter, c.policyMgr)
215+
if err != nil {
216+
return err
219217
}
218+
defer c.validatorCommitterAPI.close()
220219

221220
// We attempt to recover the policy manager and the last committed block number from the state DB.
222-
if err := c.validatorCommitterMgr.recoverPolicyManagerFromStateDB(ctx); err != nil {
221+
if err = c.validatorCommitterAPI.recoverPolicyManagerFromStateDB(ctx); err != nil {
223222
return err
224223
}
225224

@@ -250,7 +249,7 @@ func (c *Service) SetLastCommittedBlockNumber(
250249
lastBlock *servicepb.BlockRef,
251250
) (*emptypb.Empty, error) {
252251
// Error is already wrapped with proper gRPC status code by validatorCommitterMgr.
253-
return &emptypb.Empty{}, c.validatorCommitterMgr.setLastCommittedBlockNumber(ctx, lastBlock)
252+
return &emptypb.Empty{}, c.validatorCommitterAPI.setLastCommittedBlockNumber(ctx, lastBlock)
254253
}
255254

256255
// GetNextBlockNumberToCommit returns the next expected block number to be received by the coordinator.
@@ -259,7 +258,7 @@ func (c *Service) GetNextBlockNumberToCommit(
259258
_ *emptypb.Empty,
260259
) (*servicepb.BlockRef, error) {
261260
// Error is already wrapped with proper gRPC status code by validatorCommitterMgr.
262-
return c.validatorCommitterMgr.getNextBlockNumberToCommit(ctx)
261+
return c.validatorCommitterAPI.getNextBlockNumberToCommit(ctx)
263262
}
264263

265264
// GetTransactionsStatus returns the status of given transactions identifiers.
@@ -268,7 +267,7 @@ func (c *Service) GetTransactionsStatus(
268267
q *committerpb.TxIDsBatch,
269268
) (*committerpb.TxStatusBatch, error) {
270269
// Error is already wrapped with proper gRPC status code by validatorCommitterMgr.
271-
return c.validatorCommitterMgr.getTransactionsStatus(ctx, q)
270+
return c.validatorCommitterAPI.getTransactionsStatus(ctx, q)
272271
}
273272

274273
// NoPendingTransactionProcessing returns true when all previously submitted
@@ -417,22 +416,3 @@ func (c *Service) sendTxStatus(
417416
}
418417
}
419418
}
420-
421-
func (c *Service) monitorQueues(ctx context.Context) {
422-
// TODO: make sampling time configurable
423-
ticker := time.NewTicker(100 * time.Millisecond)
424-
for {
425-
select {
426-
case <-ctx.Done():
427-
return
428-
case <-ticker.C:
429-
}
430-
431-
m := c.metrics
432-
q := c.queues
433-
promutil.SetGauge(m.sigverifierInputTxBatchQueueSize, len(q.depGraphToSigVerifierFreeTxs))
434-
promutil.SetGauge(m.sigverifierOutputValidatedTxBatchQueueSize, len(q.sigVerifierToVCServiceValidatedTxs))
435-
promutil.SetGauge(m.vcserviceOutputValidatedTxBatchQueueSize, len(q.vcServiceToDepGraphValidatedTxs))
436-
promutil.SetGauge(m.vcserviceOutputTxStatusBatchQueueSize, q.vcServiceToCoordinatorTxStatus.len())
437-
}
438-
}

service/coordinator/coordinator_test.go

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/hyperledger/fabric-x-committer/api/servicepb"
2727
"github.com/hyperledger/fabric-x-committer/mock"
2828
"github.com/hyperledger/fabric-x-committer/service/coordinator/dependencygraph"
29+
"github.com/hyperledger/fabric-x-committer/service/vc"
2930
"github.com/hyperledger/fabric-x-committer/service/verifier/policy"
3031
"github.com/hyperledger/fabric-x-committer/utils"
3132
"github.com/hyperledger/fabric-x-committer/utils/channel"
@@ -523,41 +524,6 @@ func TestCoordinatorServiceDependentOrderedTxs(t *testing.T) {
523524
require.Less(t, blindV2, updateV3, "v2 writer must follow blind-write; order: %v", order)
524525
}
525526

526-
func TestQueueSize(t *testing.T) {
527-
t.Parallel()
528-
env := newCoordinatorTestEnv(t, &testConfig{numSigService: 2, numVcService: 2})
529-
ctx, cancel := context.WithTimeout(t.Context(), 2*time.Minute)
530-
t.Cleanup(cancel)
531-
go env.coordinator.monitorQueues(ctx)
532-
533-
q := env.coordinator.queues
534-
m := env.coordinator.metrics
535-
q.depGraphToSigVerifierFreeTxs <- dependencygraph.TxNodeBatch{}
536-
q.sigVerifierToVCServiceValidatedTxs <- dependencygraph.TxNodeBatch{}
537-
q.vcServiceToDepGraphValidatedTxs <- dependencygraph.TxNodeBatch{}
538-
require.True(t, q.vcServiceToCoordinatorTxStatus.write(t.Context(), &committerpb.TxStatusBatch{}))
539-
540-
require.Eventually(t, func() bool {
541-
return test.GetIntMetricValue(t, m.sigverifierInputTxBatchQueueSize) == 1 &&
542-
test.GetIntMetricValue(t, m.sigverifierOutputValidatedTxBatchQueueSize) == 1 &&
543-
test.GetIntMetricValue(t, m.vcserviceOutputValidatedTxBatchQueueSize) == 1 &&
544-
test.GetIntMetricValue(t, m.vcserviceOutputTxStatusBatchQueueSize) == 1
545-
}, 3*time.Second, 500*time.Millisecond)
546-
547-
<-q.depGraphToSigVerifierFreeTxs
548-
<-q.sigVerifierToVCServiceValidatedTxs
549-
<-q.vcServiceToDepGraphValidatedTxs
550-
_, ok := q.vcServiceToCoordinatorTxStatus.read(t.Context())
551-
require.True(t, ok)
552-
553-
require.Eventually(t, func() bool {
554-
return test.GetIntMetricValue(t, m.sigverifierInputTxBatchQueueSize) == 0 &&
555-
test.GetIntMetricValue(t, m.sigverifierOutputValidatedTxBatchQueueSize) == 0 &&
556-
test.GetIntMetricValue(t, m.vcserviceOutputValidatedTxBatchQueueSize) == 0 &&
557-
test.GetIntMetricValue(t, m.vcserviceOutputTxStatusBatchQueueSize) == 0
558-
}, 3*time.Second, 500*time.Millisecond)
559-
}
560-
561527
func TestCoordinatorRecovery(t *testing.T) {
562528
t.Parallel()
563529
env := newCoordinatorTestEnv(t, &testConfig{numSigService: 1, numVcService: 1})

service/coordinator/dependencygraph/transaction_node.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type (
4646
// Note that a transaction T2 can depend on another transaction T1 only if T1
4747
// has arrived before T2 or T1 precedes T2 in transaction order in the block.
4848
// Hence, we will not have cyclic dependencies.
49-
dependsOnTxs TxNodeBatch
49+
dependsOnTxs []*TransactionNode
5050
// dependentTxs is a set of transactions that depend on this transaction.
5151
// After validating this transaction, dependentTxs is used to remove dependencies
5252
// from each dependent transaction.
@@ -59,7 +59,7 @@ type (
5959
}
6060

6161
// TxNodeBatch is a batch of transaction nodes.
62-
TxNodeBatch []*TransactionNode
62+
TxNodeBatch = []*TransactionNode
6363

6464
// readWriteKeys holds the read and write keys of a transaction.
6565
readWriteKeys struct {

0 commit comments

Comments
 (0)