Skip to content

Commit 6dbc752

Browse files
authored
feat: add energy meter read double-entry with GSP clearing accounts (#1261)
* feat: add energy meter read double-entry with GSP clearing accounts Add clearing_account_id to ExecuteDepositRequest so callers can override the debit side of the deposit double-entry. The seed-demo now creates GSP KWH inventory internal accounts and routes meter read deposits through them, producing balanced DEBIT GSP / CREDIT customer postings that power the GSP Exposure view. * fix: address review feedback — proto pattern, pagination, fail-fast guard - Add pattern constraint to clearing_account_id proto field - Add pagination handling to findInternalAccountByCode - Add fail-fast guard when gspKwhAccountID is empty * fix: validate clearing account override is a valid UUID Add UUID format validation and Warn-level audit log when a clearing account override is applied. The saga further validates the account exists in Financial Accounting. --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 60c8cc5 commit 6dbc752

4 files changed

Lines changed: 140 additions & 27 deletions

File tree

api/proto/meridian/current_account/v1/current_account.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,14 @@ message ExecuteDepositRequest {
420420
// the instrument's fungibility_key_expression.
421421
// Example: {"batch_id": "2024-A", "grade": "premium"}
422422
map<string, string> attributes = 6;
423+
424+
// clearing_account_id optionally overrides the clearing account for the debit
425+
// side of the double-entry. Used for meter reads where the debit targets a
426+
// GSP-specific internal account rather than a generic clearing account.
427+
string clearing_account_id = 7 [(buf.validate.field).string = {
428+
max_len: 100
429+
pattern: "^[a-zA-Z0-9_-]*$"
430+
}];
423431
}
424432

425433
// Response for deposit execution

cmd/seed-demo/main.go

Lines changed: 109 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@
44
// - An energy company tenant ("volterra-energy")
55
// - The energy manifest (instruments: GBP, KWH, CARBON_CREDIT)
66
// - A DNO organization (UK Power Networks) with 4 Grid Supply Points
7+
// - 4 GSP KWH inventory internal accounts (one per grid supply point)
78
// - 10 residential customers each with:
89
// - A GBP billing account (charges in pounds sterling)
910
// - A KWH consumption tracking account (meter reading credits)
10-
// - Initial deposits: GBP billing charges and KWH consumption credits (30 days simulated)
11+
// - 30 days of simulated meter reads using double-entry:
12+
// - CREDIT customer KWH account (asset: energy consumed)
13+
// - DEBIT GSP KWH inventory account (liability: energy owed to grid)
14+
// - 30 days of GBP billing at fixed retail tariff (24.5p/kWh)
1115
// - A wholesale energy price dataset with 30 days of historical prices
1216
//
1317
// All operations are idempotent — safe to run multiple times.
@@ -31,6 +35,7 @@ import (
3135
commonv1 "github.com/meridianhub/meridian/api/proto/meridian/common/v1"
3236
controlplanev1 "github.com/meridianhub/meridian/api/proto/meridian/control_plane/v1"
3337
currentaccountv1 "github.com/meridianhub/meridian/api/proto/meridian/current_account/v1"
38+
internalaccountv1 "github.com/meridianhub/meridian/api/proto/meridian/internal_account/v1"
3439
marketv1 "github.com/meridianhub/meridian/api/proto/meridian/market_information/v1"
3540
partyv1 "github.com/meridianhub/meridian/api/proto/meridian/party/v1"
3641
tenantv1 "github.com/meridianhub/meridian/api/proto/meridian/tenant/v1"
@@ -52,8 +57,9 @@ const (
5257

5358
// Sentinel errors for idempotent lookups.
5459
var (
55-
errPartyNotFoundInListing = fmt.Errorf("party reported as existing but not found in listing")
56-
errAccountNotFoundInListing = fmt.Errorf("account reported as existing but not found in listing")
60+
errPartyNotFoundInListing = fmt.Errorf("party reported as existing but not found in listing")
61+
errAccountNotFoundInListing = fmt.Errorf("account reported as existing but not found in listing")
62+
errMissingGSPClearingAccount = fmt.Errorf("missing GSP KWH clearing account")
5763
)
5864

5965
var (
@@ -127,30 +133,38 @@ func run() error {
127133
return fmt.Errorf("register parties: %w", err)
128134
}
129135

130-
// 6. Create accounts
131-
fmt.Println("\n=== Step 4: Create Current Accounts ===")
132-
customerAccounts, err := createAccounts(tCtx, conn, dnoPartyID, customerPartyIDs)
136+
// 6. Create GSP internal accounts (KWH inventory — debit side of meter read double-entry)
137+
fmt.Println("\n=== Step 4: Create GSP Internal Accounts ===")
138+
gspKwhAccountIDs, err := createGSPInternalAccounts(tCtx, conn, gspPartyIDs)
139+
if err != nil {
140+
return fmt.Errorf("create GSP internal accounts: %w", err)
141+
}
142+
143+
// 7. Create customer accounts
144+
fmt.Println("\n=== Step 5: Create Current Accounts ===")
145+
customerAccounts, err := createAccounts(tCtx, conn, dnoPartyID, customerPartyIDs, gspKwhAccountIDs)
133146
if err != nil {
134147
return fmt.Errorf("create accounts: %w", err)
135148
}
136149

137-
// 7. Deposit initial balances
138-
fmt.Println("\n=== Step 5: Seed Account Balances ===")
150+
// 8. Deposit initial balances (KWH meter reads target GSP clearing accounts)
151+
fmt.Println("\n=== Step 6: Seed Account Balances ===")
139152
if err := seedBalances(tCtx, conn, customerAccounts); err != nil {
140153
return fmt.Errorf("seed balances: %w", err)
141154
}
142155

143-
// 8. Seed market data
144-
fmt.Println("\n=== Step 6: Seed Wholesale Energy Prices ===")
156+
// 9. Seed market data
157+
fmt.Println("\n=== Step 7: Seed Wholesale Energy Prices ===")
145158
if err := seedMarketData(tCtx, conn); err != nil {
146159
return fmt.Errorf("seed market data: %w", err)
147160
}
148161

149162
fmt.Println("\n=== Demo Seed Complete ===")
150163
fmt.Printf("Tenant: %s (slug: %s)\n", tenantID, tenantSlug)
151164
fmt.Printf("DNO: %s\n", dnoPartyID)
152-
fmt.Printf("GSPs: %d grid supply points\n", len(gspPartyIDs))
165+
fmt.Printf("GSPs: %d grid supply points with KWH inventory accounts\n", len(gspPartyIDs))
153166
fmt.Printf("Customers: %d customers with GBP billing + KWH consumption accounts\n", len(customerPartyIDs))
167+
fmt.Printf("Double-entry: Each KWH meter read DEBITs GSP inventory, CREDITs customer account\n")
154168
fmt.Printf("Market: 30 days of wholesale energy prices\n")
155169
return nil
156170
}
@@ -324,23 +338,87 @@ func findPartyByExternalRef(ctx context.Context, client partyv1.PartyServiceClie
324338
return "", fmt.Errorf("%w: external_reference=%q", errPartyNotFoundInListing, extRef)
325339
}
326340

341+
// ─── GSP Internal Account Creation ──────────────────────────────────────────
342+
343+
// createGSPInternalAccounts creates KWH inventory internal accounts for each GSP.
344+
// These are the debit side of the meter read double-entry: when a customer consumes
345+
// energy, the GSP's inventory account is debited (liability to the grid).
346+
func createGSPInternalAccounts(ctx context.Context, conn *grpc.ClientConn, gspPartyIDs []string) ([]string, error) {
347+
client := internalaccountv1.NewInternalAccountServiceClient(conn)
348+
349+
gspKwhAccountIDs := make([]string, len(gspDefinitions))
350+
for i, gsp := range gspDefinitions {
351+
accountCode := fmt.Sprintf("GSP-KWH-%s", gsp.region)
352+
resp, err := client.InitiateInternalAccount(ctx, &internalaccountv1.InitiateInternalAccountRequest{
353+
AccountCode: accountCode,
354+
Name: fmt.Sprintf("%s KWH Inventory", gsp.name),
355+
InstrumentCode: "KWH",
356+
Description: fmt.Sprintf("KWH inventory for %s — tracks energy owed to the grid", gsp.region),
357+
OrgPartyId: gspPartyIDs[i],
358+
ProductTypeCode: "INVENTORY_KWH",
359+
})
360+
if err != nil {
361+
if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {
362+
existingID, findErr := findInternalAccountByCode(ctx, client, accountCode)
363+
if findErr != nil {
364+
return nil, fmt.Errorf("find existing GSP account %s: %w", gsp.region, findErr)
365+
}
366+
gspKwhAccountIDs[i] = existingID
367+
fmt.Printf(" GSP-KWH: %s (%s, existing)\n", existingID, gsp.region)
368+
continue
369+
}
370+
return nil, fmt.Errorf("create GSP KWH account for %s: %w", gsp.region, err)
371+
}
372+
gspKwhAccountIDs[i] = resp.GetAccountId()
373+
fmt.Printf(" GSP-KWH: %s (%s)\n", resp.GetAccountId(), gsp.region)
374+
}
375+
376+
return gspKwhAccountIDs, nil
377+
}
378+
379+
func findInternalAccountByCode(ctx context.Context, client internalaccountv1.InternalAccountServiceClient, accountCode string) (string, error) {
380+
var pageToken string
381+
for {
382+
listResp, err := client.ListInternalAccounts(ctx, &internalaccountv1.ListInternalAccountsRequest{
383+
Pagination: &commonv1.Pagination{PageSize: 100, PageToken: pageToken},
384+
})
385+
if err != nil {
386+
return "", fmt.Errorf("list internal accounts to find %q: %w", accountCode, err)
387+
}
388+
for _, a := range listResp.GetFacilities() {
389+
if a.GetAccountCode() == accountCode {
390+
return a.GetAccountId(), nil
391+
}
392+
}
393+
pageToken = listResp.GetPagination().GetNextPageToken()
394+
if pageToken == "" {
395+
break
396+
}
397+
}
398+
return "", fmt.Errorf("%w: account_code=%q", errAccountNotFoundInListing, accountCode)
399+
}
400+
327401
// ─── Account Creation ────────────────────────────────────────────────────────
328402

329403
type customerAccountPair struct {
330-
customerName string
331-
partyID string
332-
gbpAccountID string
333-
kwhAccountID string
404+
customerName string
405+
partyID string
406+
gbpAccountID string
407+
kwhAccountID string
408+
gspRegion string
409+
gspKwhAccountID string // GSP internal account for the debit side of KWH double-entry
334410
}
335411

336-
func createAccounts(ctx context.Context, conn *grpc.ClientConn, dnoPartyID string, customerPartyIDs []string) ([]customerAccountPair, error) {
412+
func createAccounts(ctx context.Context, conn *grpc.ClientConn, dnoPartyID string, customerPartyIDs []string, gspKwhAccountIDs []string) ([]customerAccountPair, error) {
337413
client := currentaccountv1.NewCurrentAccountServiceClient(conn)
338414

339415
accounts := make([]customerAccountPair, len(customerPartyIDs))
340416
for i, partyID := range customerPartyIDs {
341417
cust := customerDefinitions[i]
342418
accounts[i].customerName = cust.legalName
343419
accounts[i].partyID = partyID
420+
accounts[i].gspRegion = gspDefinitions[cust.gspIndex].region
421+
accounts[i].gspKwhAccountID = gspKwhAccountIDs[cust.gspIndex]
344422

345423
// GBP billing account — charges in pounds sterling
346424
gbpID, err := createAccountIdempotent(ctx, client, partyID, fmt.Sprintf("VE-GBP-%03d", i+1), "GBP", dnoPartyID)
@@ -356,7 +434,7 @@ func createAccounts(ctx context.Context, conn *grpc.ClientConn, dnoPartyID strin
356434
return nil, fmt.Errorf("create KWH account for %s: %w", cust.legalName, err)
357435
}
358436
accounts[i].kwhAccountID = kwhID
359-
fmt.Printf(" KWH: %s (%s)\n", kwhID, cust.legalName)
437+
fmt.Printf(" KWH: %s (%s, GSP: %s)\n", kwhID, cust.legalName, accounts[i].gspRegion)
360438
}
361439

362440
return accounts, nil
@@ -433,15 +511,22 @@ func seedCustomerBalances(ctx context.Context, client currentaccountv1.CurrentAc
433511
if err := depositIdempotent(ctx, client, acct.gbpAccountID, dailyGBP, "GBP",
434512
fmt.Sprintf("Energy billing %s: %.2f kWh @ %.1fp/kWh", date.Format("2006-01-02"), dailyKWH, fixedRate*100),
435513
fmt.Sprintf("BILL-%s-%s", acct.partyID, date.Format("20060102")),
514+
"", // no clearing override for GBP — uses default clearing account
436515
); err != nil {
437516
return fmt.Errorf("deposit GBP for %s day %d: %w", acct.customerName, day, err)
438517
}
439518

440-
// KWH consumption deposit — meter reading credit for energy consumed
519+
// KWH meter read deposit — CREDIT customer (asset: energy consumed),
520+
// DEBIT GSP inventory (liability: energy owed to grid).
521+
// The clearing_account_id override routes the debit to the customer's GSP.
441522
if acct.kwhAccountID != "" {
523+
if acct.gspKwhAccountID == "" {
524+
return fmt.Errorf("%w: %s (%s)", errMissingGSPClearingAccount, acct.customerName, acct.gspRegion)
525+
}
442526
if err := depositIdempotent(ctx, client, acct.kwhAccountID, dailyKWH, "KWH",
443527
fmt.Sprintf("Meter reading %s: %.3f kWh consumed", date.Format("2006-01-02"), dailyKWH),
444528
fmt.Sprintf("METER-%s-%s", acct.partyID, date.Format("20060102")),
529+
acct.gspKwhAccountID, // GSP inventory account as clearing target
445530
); err != nil {
446531
return fmt.Errorf("deposit KWH for %s day %d: %w", acct.customerName, day, err)
447532
}
@@ -452,12 +537,13 @@ func seedCustomerBalances(ctx context.Context, client currentaccountv1.CurrentAc
452537
return nil
453538
}
454539

455-
func depositIdempotent(ctx context.Context, client currentaccountv1.CurrentAccountServiceClient, accountID string, amount float64, currency, description, reference string) error {
540+
func depositIdempotent(ctx context.Context, client currentaccountv1.CurrentAccountServiceClient, accountID string, amount float64, currency, description, reference, clearingAccountID string) error {
456541
_, err := client.ExecuteDeposit(ctx, &currentaccountv1.ExecuteDepositRequest{
457-
AccountId: accountID,
458-
Amount: toMoney(amount, currency),
459-
Description: description,
460-
Reference: reference,
542+
AccountId: accountID,
543+
Amount: toMoney(amount, currency),
544+
Description: description,
545+
Reference: reference,
546+
ClearingAccountId: clearingAccountID,
461547
})
462548
if err != nil {
463549
if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {

services/current-account/service/deposit_orchestrator.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,10 @@ func NewDepositOrchestrator(cfg DepositOrchestratorConfig) (*DepositOrchestrator
125125
// instruments (e.g., RICE-KG with batch tracking), both debit and credit sides
126126
// of the double-entry must have matching fungibility keys. If nil, no fungibility
127127
// validation is performed (suitable for fully fungible instruments like USD).
128-
func (o *DepositOrchestrator) Orchestrate(ctx context.Context, account domain.CurrentAccount, amount domain.Amount, transactionID string, attributes map[string]string) (*pb.ExecuteDepositResponse, error) {
128+
// - clearingAccountIDOverride: If non-empty, used as the clearing account for the debit
129+
// side instead of the dynamically resolved or statically configured clearing account.
130+
// Used for energy meter reads where the debit targets a GSP-specific internal account.
131+
func (o *DepositOrchestrator) Orchestrate(ctx context.Context, account domain.CurrentAccount, amount domain.Amount, transactionID string, attributes map[string]string, clearingAccountIDOverride string) (*pb.ExecuteDepositResponse, error) {
129132
sagaStart := time.Now()
130133
sagaStatus := operationStatusSuccess
131134
defer func() {
@@ -161,8 +164,24 @@ func (o *DepositOrchestrator) Orchestrate(ctx context.Context, account domain.Cu
161164
"instrument", instrumentCode)
162165
}
163166

164-
// Resolve clearing account ID (dynamic resolver preferred, fallback to static config)
165-
clearingAccountID := o.resolveClearingAccountID(ctx, amount.InstrumentCode())
167+
// Resolve clearing account ID: explicit override > dynamic resolver > static config
168+
var clearingAccountID string
169+
if clearingAccountIDOverride != "" {
170+
// Validate override is a well-formed account identifier (UUID format).
171+
// The saga will further validate the account exists in Financial Accounting.
172+
if _, parseErr := uuid.Parse(clearingAccountIDOverride); parseErr != nil {
173+
sagaStatus = operationStatusFailed
174+
return nil, status.Errorf(codes.InvalidArgument,
175+
"clearing_account_id override is not a valid UUID: %s", clearingAccountIDOverride)
176+
}
177+
clearingAccountID = clearingAccountIDOverride
178+
o.logger.Warn("clearing account override applied",
179+
"clearing_account_id", clearingAccountID,
180+
"account_id", account.AccountID(),
181+
"instrument", amount.InstrumentCode())
182+
} else {
183+
clearingAccountID = o.resolveClearingAccountID(ctx, amount.InstrumentCode())
184+
}
166185

167186
// Parse correlation ID safely - fallback to generated ID if invalid
168187
correlationUUID, parseErr := uuid.Parse(correlationID)

services/current-account/service/grpc_deposit_endpoints.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (s *Service) ExecuteDeposit(ctx context.Context, req *pb.ExecuteDepositRequ
161161
}
162162

163163
// Orchestrate transaction with saga pattern - Position Keeping is the source of truth for balance
164-
resp, err := s.depositOrchestrator.Orchestrate(ctx, account, amount, transactionID, req.Attributes)
164+
resp, err := s.depositOrchestrator.Orchestrate(ctx, account, amount, transactionID, req.Attributes, req.ClearingAccountId)
165165
if err != nil {
166166
operationStatus = opStatusSagaFailed
167167
return nil, err

0 commit comments

Comments
 (0)