Skip to content

Commit f6adc61

Browse files
authored
fix: wire account type cache and register product types in seed-demo (#1272)
* fix: wire account type cache and register product types in seed-demo Three interrelated issues prevented seed-demo from creating internal accounts with product types on demo: 1. AccountTypeRegistryService was listed in gateway service names but never registered on the gRPC server. Wire it in wireReferenceData. 2. Internal account service had no account type cache (nil), causing FailedPrecondition when ProductTypeCode is provided. Wire LocalAccountTypeCache using the registry as loader. 3. seed-demo didn't register product type definitions before creating accounts. Add Step 2.5 to create and activate INVENTORY_KWH and ENERGY_TRADING product types via AccountTypeRegistryService. Also regenerate descriptor.binpb to fix stale proto descriptors that caused the HTTP transcoder to incorrectly map fields like instrument_code for newer proto definitions. * fix: remove dead AlreadyExists guard from seed-demo CreateDraft CreateDraft uses ON CONFLICT (code, version) DO NOTHING at the DB layer and returns the existing draft on conflict — it never returns AlreadyExists. Remove the misleading error handler that could mask legitimate errors. * fix: handle activation race conditions in seed-demo product types Treat AlreadyExists and FailedPrecondition from ActivateAccountType as non-fatal, so concurrent or repeated seed runs don't fail when the product type was already activated by another process. --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 36f0c73 commit f6adc61

4 files changed

Lines changed: 168 additions & 10 deletions

File tree

cmd/meridian/descriptor.binpb

2.5 KB
Binary file not shown.

cmd/meridian/main.go

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ import (
7575
positionkeepingservice "github.com/meridianhub/meridian/services/position-keeping/service"
7676
reconciliationpersistence "github.com/meridianhub/meridian/services/reconciliation/adapters/persistence"
7777
reconciliationservice "github.com/meridianhub/meridian/services/reconciliation/service"
78+
"github.com/meridianhub/meridian/services/reference-data/accounttype"
79+
refcache "github.com/meridianhub/meridian/services/reference-data/cache"
7880
refcel "github.com/meridianhub/meridian/services/reference-data/cel"
7981
refhandler "github.com/meridianhub/meridian/services/reference-data/handler"
8082
refnode "github.com/meridianhub/meridian/services/reference-data/node"
@@ -334,17 +336,24 @@ func registerServices(
334336
tracer *observability.Tracer,
335337
logger *slog.Logger,
336338
) error {
339+
// refDataComps is populated by wireReferenceData and used by wireInternalAccount for the account type cache.
340+
var refDataComps *refDataComponents
341+
337342
// Tier 0: No gRPC dependencies
338343
for _, wire := range []struct {
339344
name string
340345
fn func() error
341346
}{
342347
{"party", func() error { return wireParty(grpcServer, conns.gormDB("party"), logger) }},
343-
{"reference-data", func() error { return wireReferenceData(grpcServer, conns.pgxPool("reference-data"), logger) }},
348+
{"reference-data", func() error {
349+
var err error
350+
refDataComps, err = wireReferenceData(grpcServer, conns.pgxPool("reference-data"), logger)
351+
return err
352+
}},
344353
{"market-information", func() error { return wireMarketInformation(grpcServer, conns.pgxPool("market-information"), logger) }},
345354
{"tenant", func() error { return wireTenant(grpcServer, conns.gormDB("tenant"), logger) }},
346355
{"internal-account", func() error {
347-
return wireInternalAccount(grpcServer, conns.gormDB("internal-account"), logger)
356+
return wireInternalAccount(grpcServer, conns.gormDB("internal-account"), refDataComps, logger)
348357
}},
349358
{"control-plane", func() error { return wireControlPlane(grpcServer, conns.pgxPool("control-plane"), logger) }},
350359
{"audit", func() error { return wireAudit(grpcServer, conns.gormDB("tenant"), logger) }}, // audit uses platform DB
@@ -419,36 +428,58 @@ func wireParty(server *grpc.Server, db *gorm.DB, logger *slog.Logger) error {
419428
return nil
420429
}
421430

422-
func wireReferenceData(server *grpc.Server, pool *pgxpool.Pool, logger *slog.Logger) error {
431+
// refDataComponents holds objects created during wireReferenceData that other
432+
// services need (e.g., the account type cache loader for internal-account).
433+
type refDataComponents struct {
434+
accountTypeRegistry *accounttype.PostgresRegistry
435+
celCompiler *refcel.Compiler
436+
}
437+
438+
func wireReferenceData(server *grpc.Server, pool *pgxpool.Pool, logger *slog.Logger) (*refDataComponents, error) {
423439
instrumentRegistry, err := refregistry.NewPostgresRegistry(pool)
424440
if err != nil {
425-
return fmt.Errorf("instrument registry: %w", err)
441+
return nil, fmt.Errorf("instrument registry: %w", err)
426442
}
427443

428444
compiler, err := refcel.NewCompiler()
429445
if err != nil {
430-
return fmt.Errorf("CEL compiler: %w", err)
446+
return nil, fmt.Errorf("CEL compiler: %w", err)
431447
}
432448

433449
refDataSvc, err := refhandler.NewService(instrumentRegistry, compiler, logger)
434450
if err != nil {
435-
return fmt.Errorf("ref data service: %w", err)
451+
return nil, fmt.Errorf("ref data service: %w", err)
436452
}
437453

438454
nodeRepo := refnode.NewPostgresRepository(pool)
439455
nodeSvc, err := refhandler.NewNodeService(nodeRepo, logger)
440456
if err != nil {
441-
return fmt.Errorf("node service: %w", err)
457+
return nil, fmt.Errorf("node service: %w", err)
442458
}
443459

444460
sagaRegistry := refsaga.NewPostgresRegistry(pool, nil)
445461
sagaSvc := refsaga.NewRegistryHandler(sagaRegistry, nil, nil, logger)
446462

463+
// Account type registry and gRPC service
464+
acctTypeReg, err := accounttype.NewPostgresRegistry(pool)
465+
if err != nil {
466+
return nil, fmt.Errorf("account type registry: %w", err)
467+
}
468+
469+
acctTypeSvc, err := refhandler.NewAccountTypeService(acctTypeReg, instrumentRegistry, compiler, logger)
470+
if err != nil {
471+
return nil, fmt.Errorf("account type service: %w", err)
472+
}
473+
447474
referencedatav1.RegisterReferenceDataServiceServer(server, refDataSvc)
448475
referencedatav1.RegisterNodeServiceServer(server, nodeSvc)
476+
referencedatav1.RegisterAccountTypeRegistryServiceServer(server, acctTypeSvc)
449477
sagav1.RegisterSagaRegistryServiceServer(server, sagaSvc)
450478
logger.Info("registered reference-data service")
451-
return nil
479+
return &refDataComponents{
480+
accountTypeRegistry: acctTypeReg,
481+
celCompiler: compiler,
482+
}, nil
452483
}
453484

454485
func wireMarketInformation(server *grpc.Server, pool *pgxpool.Pool, logger *slog.Logger) error {
@@ -478,9 +509,32 @@ func wireTenant(server *grpc.Server, db *gorm.DB, logger *slog.Logger) error {
478509
return nil
479510
}
480511

481-
func wireInternalAccount(server *grpc.Server, db *gorm.DB, logger *slog.Logger) error {
512+
// registryAccountTypeLoader adapts accounttype.PostgresRegistry to cache.AccountTypeLoader.
513+
type registryAccountTypeLoader struct {
514+
registry *accounttype.PostgresRegistry
515+
}
516+
517+
func (l *registryAccountTypeLoader) LoadAccountType(ctx context.Context, code string) (*accounttype.Definition, error) {
518+
return l.registry.GetActiveDefinition(ctx, code)
519+
}
520+
521+
func (l *registryAccountTypeLoader) ListActiveAccountTypes(ctx context.Context) ([]*accounttype.Definition, error) {
522+
return l.registry.ListActive(ctx)
523+
}
524+
525+
func wireInternalAccount(server *grpc.Server, db *gorm.DB, refData *refDataComponents, logger *slog.Logger) error {
482526
repo := internalaccountpersistence.NewRepository(db)
483-
svc, err := internalaccountservice.NewService(repo)
527+
528+
var opts []internalaccountservice.Option
529+
530+
// Wire account type cache if reference-data components are available
531+
if refData != nil {
532+
loader := &registryAccountTypeLoader{registry: refData.accountTypeRegistry}
533+
cache := refcache.NewLocalAccountTypeCache(loader, refData.celCompiler)
534+
opts = append(opts, internalaccountservice.WithAccountTypeCache(cache))
535+
}
536+
537+
svc, err := internalaccountservice.NewServiceFull(repo, nil, nil, logger, nil, opts...)
484538
if err != nil {
485539
return err
486540
}

cmd/seed-demo/main.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
internalaccountv1 "github.com/meridianhub/meridian/api/proto/meridian/internal_account/v1"
3939
marketv1 "github.com/meridianhub/meridian/api/proto/meridian/market_information/v1"
4040
partyv1 "github.com/meridianhub/meridian/api/proto/meridian/party/v1"
41+
referencedatav1 "github.com/meridianhub/meridian/api/proto/meridian/reference_data/v1"
4142
tenantv1 "github.com/meridianhub/meridian/api/proto/meridian/tenant/v1"
4243
"github.com/meridianhub/meridian/shared/platform/await"
4344
"google.golang.org/genproto/googleapis/type/money"
@@ -126,6 +127,12 @@ func run() error {
126127
// Tenant-scoped context for all subsequent calls
127128
tCtx := withTenant(ctx)
128129

130+
// 4.5 Register product types (account type definitions)
131+
fmt.Println("\n=== Step 2.5: Register Product Types ===")
132+
if err := registerProductTypes(tCtx, conn); err != nil {
133+
return fmt.Errorf("register product types: %w", err)
134+
}
135+
129136
// 5. Register parties
130137
fmt.Println("\n=== Step 3: Register Parties ===")
131138
dnoPartyID, gspPartyIDs, customerPartyIDs, err := registerParties(tCtx, conn)
@@ -228,6 +235,94 @@ func applyManifest(ctx context.Context, conn *grpc.ClientConn) error {
228235
return nil
229236
}
230237

238+
// ─── Product Type Registration ───────────────────────────────────────────────
239+
240+
// productTypeDef defines a product type to register.
241+
type productTypeDef struct {
242+
code string
243+
displayName string
244+
description string
245+
normalBalance referencedatav1.NormalBalance
246+
behaviorClass referencedatav1.BehaviorClass
247+
instrumentCode string
248+
validationCEL string
249+
}
250+
251+
var productTypes = []productTypeDef{
252+
{
253+
code: "INVENTORY_KWH",
254+
displayName: "KWH Inventory Account",
255+
description: "Inventory account for tracking kilowatt-hour energy assets at grid supply points",
256+
normalBalance: referencedatav1.NormalBalance_NORMAL_BALANCE_DEBIT,
257+
behaviorClass: referencedatav1.BehaviorClass_BEHAVIOR_CLASS_INVENTORY,
258+
instrumentCode: "KWH",
259+
validationCEL: "amount > 0",
260+
},
261+
{
262+
code: "ENERGY_TRADING",
263+
displayName: "Energy Trading Account",
264+
description: "Customer-facing account for energy trading in GBP and KWH",
265+
normalBalance: referencedatav1.NormalBalance_NORMAL_BALANCE_DEBIT,
266+
behaviorClass: referencedatav1.BehaviorClass_BEHAVIOR_CLASS_CUSTOMER,
267+
instrumentCode: "GBP",
268+
validationCEL: "amount > 0",
269+
},
270+
}
271+
272+
// registerProductTypes creates and activates account type definitions via the
273+
// AccountTypeRegistryService. Each product type is created as a DRAFT then activated.
274+
// Idempotent: skips types that already exist.
275+
func registerProductTypes(ctx context.Context, conn *grpc.ClientConn) error {
276+
client := referencedatav1.NewAccountTypeRegistryServiceClient(conn)
277+
278+
for _, pt := range productTypes {
279+
// Check if already active
280+
_, err := client.GetActiveDefinition(ctx, &referencedatav1.GetActiveDefinitionRequest{
281+
Code: pt.code,
282+
})
283+
if err == nil {
284+
fmt.Printf(" Product type: %s (already active)\n", pt.code)
285+
continue
286+
}
287+
if st, ok := status.FromError(err); !ok || st.Code() != codes.NotFound {
288+
return fmt.Errorf("check product type %s: %w", pt.code, err)
289+
}
290+
291+
// Create draft (idempotent: ON CONFLICT returns existing draft)
292+
draftResp, err := client.CreateDraft(ctx, &referencedatav1.CreateDraftRequest{
293+
Code: pt.code,
294+
DisplayName: pt.displayName,
295+
Description: pt.description,
296+
NormalBalance: pt.normalBalance,
297+
BehaviorClass: pt.behaviorClass,
298+
InstrumentCode: pt.instrumentCode,
299+
ValidationCel: pt.validationCEL,
300+
})
301+
if err != nil {
302+
return fmt.Errorf("create draft %s: %w", pt.code, err)
303+
}
304+
305+
// Activate using the ID from the draft response
306+
defID := draftResp.GetDefinition().GetId()
307+
fmt.Printf(" Product type: %s (draft id=%s, activating...)\n", pt.code, defID)
308+
309+
_, err = client.ActivateAccountType(ctx, &referencedatav1.ActivateAccountTypeRequest{
310+
Id: defID,
311+
})
312+
if err != nil {
313+
if st, ok := status.FromError(err); ok &&
314+
(st.Code() == codes.AlreadyExists || st.Code() == codes.FailedPrecondition) {
315+
fmt.Printf(" Product type: %s (already active)\n", pt.code)
316+
continue
317+
}
318+
return fmt.Errorf("activate %s (id=%s): %w", pt.code, defID, err)
319+
}
320+
fmt.Printf(" Product type: %s (activated)\n", pt.code)
321+
}
322+
323+
return nil
324+
}
325+
231326
// ─── Party Registration ──────────────────────────────────────────────────────
232327

233328
type gspInfo struct {

examples/manifests/energy.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@
4444
"validation": "amount > 0"
4545
}
4646
},
47+
{
48+
"code": "INVENTORY_KWH",
49+
"name": "KWH Inventory Account",
50+
"normalBalance": "NORMAL_BALANCE_DEBIT",
51+
"allowedInstruments": ["KWH"],
52+
"policies": {
53+
"validation": "amount > 0"
54+
}
55+
},
4756
{
4857
"code": "CARBON_INVENTORY",
4958
"name": "Carbon Credit Inventory",

0 commit comments

Comments
 (0)