Skip to content

Commit 252ae54

Browse files
authored
feat: wire AccountReconciliationService in cmd/main.go (#891)
Wire the full reconciliation service in the main entry point: - Instantiate all persistence repositories (run, snapshot, variance, dispute) - Wire SnapshotCapturer conditionally when POSITION_KEEPING_URL is set - Wire VarianceDetector with repository dependencies - Register AccountReconciliationService with gRPC server - Log warnings for unavailable optional dependencies (valuator, assertor) - Gracefully degrade: missing optional deps return UNIMPLEMENTED Tests cover service registration, health check, graceful shutdown, missing optional deps behavior, and AssertBalance regression. Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 396ec9b commit 252ae54

2 files changed

Lines changed: 456 additions & 1 deletion

File tree

services/reconciliation/cmd/main.go

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,17 @@ import (
1212
"strings"
1313
"time"
1414

15+
positionkeepingv1 "github.com/meridianhub/meridian/api/proto/meridian/position_keeping/v1"
16+
reconciliationv1 "github.com/meridianhub/meridian/api/proto/meridian/reconciliation/v1"
17+
"github.com/meridianhub/meridian/services/reconciliation/adapters/persistence"
1518
"github.com/meridianhub/meridian/services/reconciliation/config"
1619
"github.com/meridianhub/meridian/services/reconciliation/observability"
20+
"github.com/meridianhub/meridian/services/reconciliation/service"
1721
"github.com/meridianhub/meridian/shared/pkg/health"
1822
"github.com/meridianhub/meridian/shared/platform/bootstrap"
1923
"github.com/prometheus/client_golang/prometheus/promhttp"
24+
"google.golang.org/grpc"
25+
"google.golang.org/grpc/credentials/insecure"
2026
"google.golang.org/grpc/health/grpc_health_v1"
2127
"google.golang.org/grpc/reflection"
2228
)
@@ -90,6 +96,69 @@ func run(logger *slog.Logger) error {
9096

9197
logger.Info("database connection established")
9298

99+
// Instantiate persistence repositories
100+
runRepo := persistence.NewSettlementRunRepository(db)
101+
snapshotRepo := persistence.NewSettlementSnapshotRepository(db)
102+
varianceRepo := persistence.NewVarianceRepository(db)
103+
disputeRepo := persistence.NewDisputeRepository(db)
104+
105+
// Build service options with repositories (always available)
106+
serviceOpts := []service.Option{
107+
service.WithSettlementRunRepository(runRepo),
108+
service.WithDisputeRepository(disputeRepo),
109+
service.WithVarianceRepository(varianceRepo),
110+
service.WithVarianceListRepository(varianceRepo),
111+
service.WithLogger(logger),
112+
}
113+
114+
// Wire SnapshotCapturer if Position Keeping URL is configured
115+
var pkConn *grpc.ClientConn
116+
if cfg.Services.PositionKeepingURL != "" {
117+
var connErr error
118+
pkConn, connErr = grpc.NewClient(
119+
cfg.Services.PositionKeepingURL,
120+
grpc.WithTransportCredentials(insecure.NewCredentials()),
121+
)
122+
if connErr != nil {
123+
return fmt.Errorf("failed to create position keeping client at %s: %w",
124+
cfg.Services.PositionKeepingURL, connErr)
125+
}
126+
127+
pkClient := positionkeepingv1.NewPositionKeepingServiceClient(pkConn)
128+
provider := service.NewPKPositionProvider(pkClient)
129+
capturer := service.NewSnapshotCapturer(provider, runRepo, snapshotRepo)
130+
serviceOpts = append(serviceOpts,
131+
service.WithSnapshotCapturer(capturer.CaptureSnapshots),
132+
)
133+
134+
logger.Info("snapshot capturer configured",
135+
"position_keeping_url", cfg.Services.PositionKeepingURL)
136+
} else {
137+
logger.Warn("snapshot capturer not configured: POSITION_KEEPING_URL not set")
138+
}
139+
defer func() {
140+
if pkConn != nil {
141+
if err := pkConn.Close(); err != nil {
142+
logger.Error("failed to close position keeping connection", "error", err)
143+
}
144+
}
145+
}()
146+
147+
// Wire VarianceDetector (depends on repos only, always available)
148+
detector := service.NewVarianceDetector(runRepo, snapshotRepo, varianceRepo)
149+
serviceOpts = append(serviceOpts,
150+
service.WithVarianceDetector(detector.DetectVariances),
151+
)
152+
153+
// VarianceValuator requires valuation engine + reference data provider (not yet available)
154+
// BalanceAssertor requires assertion repo + PK client (not yet available)
155+
// Both will return UNIMPLEMENTED until their dependencies are wired in future tasks
156+
logger.Warn("variance valuator not configured: valuation engine not yet available")
157+
logger.Warn("balance assertor not configured: assertion repository not yet available")
158+
159+
// Create AccountReconciliationService
160+
reconciliationSvc := service.NewAccountReconciliationService(serviceOpts...)
161+
93162
// Initialize auth interceptor
94163
authConfig := bootstrap.DefaultAuthConfig(logger)
95164
authInterceptor, err := bootstrap.NewAuthInterceptor(ctx, authConfig)
@@ -102,6 +171,9 @@ func run(logger *slog.Logger) error {
102171
WithAuthInterceptor(authInterceptor).
103172
Build()
104173

174+
// Register AccountReconciliationService
175+
reconciliationv1.RegisterAccountReconciliationServiceServer(grpcServer, reconciliationSvc)
176+
105177
// Register health check service
106178
healthAggregator := health.NewAggregator([]health.Checker{
107179
observability.NewDatabaseChecker(db),
@@ -113,7 +185,7 @@ func run(logger *slog.Logger) error {
113185
reflection.Register(grpcServer)
114186

115187
logger.Info("gRPC services registered",
116-
"services", []string{"Health", "Reflection"})
188+
"services", []string{"AccountReconciliationService", "Health", "Reflection"})
117189

118190
// Create gRPC listener
119191
grpcAddress := fmt.Sprintf(":%s", cfg.Server.Port)

0 commit comments

Comments
 (0)