Skip to content

Commit 81c7e78

Browse files
authored
fix: differentiate loadInstrument errors and skip cardinality check for existing buckets (#2037)
Two position-keeping fixes: 1. validateMeasurementWithCEL now distinguishes ErrInstrumentNotFound (codes.NotFound) from backend errors like cache connection failures (codes.Internal), preventing misclassification of infrastructure errors as missing instruments. 2. checkBucketCardinality now checks BucketExists before enforcing the limit, allowing measurements against existing buckets even when the cardinality limit has been reached. Only truly new buckets are subject to the limit. Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 062f558 commit 81c7e78

3 files changed

Lines changed: 166 additions & 5 deletions

File tree

services/position-keeping/service/adapters.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ type BucketCounter interface {
5757
// CountBuckets returns the number of distinct buckets for an account and instrument.
5858
// Returns the count and any error encountered during the query.
5959
CountBuckets(ctx context.Context, accountID string, instrumentCode string) (int, error)
60+
61+
// BucketExists checks whether a specific bucket already exists for the given account, instrument, and bucket ID.
62+
// Used to distinguish new buckets from existing ones during cardinality enforcement.
63+
BucketExists(ctx context.Context, accountID string, instrumentCode string, bucketID string) (bool, error)
6064
}
6165

6266
// ErrEmptyUUID is returned when UUID string is empty

services/position-keeping/service/record_measurement.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -305,9 +305,14 @@ func (s *PositionKeepingService) validateMeasurementWithCEL(
305305

306306
instrument, err := s.loadInstrument(ctx, instrumentCode)
307307
if err != nil {
308-
RecordValidationFailure(instrumentCode, ValidationFailureReasonInstrumentNotFound)
309-
return nil, status.Errorf(codes.NotFound,
310-
"instrument definition not found for measurement type '%s': %v", instrumentCode, err)
308+
if errors.Is(err, ErrInstrumentNotFound) {
309+
RecordValidationFailure(instrumentCode, ValidationFailureReasonInstrumentNotFound)
310+
return nil, status.Errorf(codes.NotFound,
311+
"instrument definition not found for measurement type '%s': %v", instrumentCode, err)
312+
}
313+
RecordValidationFailure(instrumentCode, ValidationFailureReasonCELError)
314+
return nil, status.Errorf(codes.Internal,
315+
"failed to load instrument definition for measurement type '%s': %v", instrumentCode, err)
311316
}
312317

313318
activation := buildCELActivation(metadata, amount)
@@ -370,11 +375,22 @@ func (s *PositionKeepingService) evalBucketKeyProgram(
370375
}
371376

372377
// checkBucketCardinality checks whether adding a new bucket would exceed the cardinality limit.
378+
// Existing buckets are always allowed through - only truly new buckets are subject to the limit.
373379
func (s *PositionKeepingService) checkBucketCardinality(ctx context.Context, accountID, instrumentCode, bucketID string) error {
374380
if s.bucketCounter == nil || bucketID == "" {
375381
return nil
376382
}
377383

384+
exists, err := s.bucketCounter.BucketExists(ctx, accountID, instrumentCode, bucketID)
385+
if err != nil {
386+
return status.Errorf(codes.Internal,
387+
"failed to check bucket existence for account '%s' instrument '%s': %v",
388+
accountID, instrumentCode, err)
389+
}
390+
if exists {
391+
return nil
392+
}
393+
378394
count, err := s.bucketCounter.CountBuckets(ctx, accountID, instrumentCode)
379395
if err != nil {
380396
return status.Errorf(codes.Internal,

services/position-keeping/service/record_measurement_test.go

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package service_test
22

33
import (
44
"context"
5+
"errors"
56
"testing"
67
"time"
78

@@ -865,6 +866,65 @@ func TestRecordMeasurement_CEL_InstrumentNotFound(t *testing.T) {
865866
mockMeasurementRepo.AssertNotCalled(t, "Create")
866867
}
867868

869+
// TestRecordMeasurement_CEL_InstrumentBackendError tests that non-NotFound errors from
870+
// loadInstrument are returned as Internal, not collapsed to NotFound.
871+
func TestRecordMeasurement_CEL_InstrumentBackendError(t *testing.T) {
872+
ctx := context.Background()
873+
mockRepo := new(MockRepository)
874+
mockMeasurementRepo := new(MockMeasurementRepository)
875+
mockEventPublisher := domain.NewInMemoryEventPublisher()
876+
mockIdempotency := new(MockIdempotencyService)
877+
mockCache := new(MockInstrumentCache)
878+
879+
svc, err := service.NewPositionKeepingService(
880+
mockRepo,
881+
mockMeasurementRepo,
882+
mockEventPublisher,
883+
mockIdempotency,
884+
newTestOutboxPublisher(t),
885+
service.WithInstrumentCache(mockCache),
886+
)
887+
require.NoError(t, err)
888+
889+
logID := uuid.New()
890+
now := time.Now().UTC()
891+
892+
positionLog := &domain.FinancialPositionLog{
893+
LogID: logID,
894+
AccountID: "test-account-123",
895+
StatusTracking: &domain.StatusTracking{
896+
CurrentStatus: domain.TransactionStatusPending,
897+
},
898+
CreatedAt: now,
899+
UpdatedAt: now,
900+
Version: 1,
901+
}
902+
903+
mockRepo.On("FindByID", ctx, logID).Return(positionLog, nil)
904+
// Cache returns a backend error (e.g., connection failure), NOT ErrInstrumentNotFound
905+
mockCache.On("GetOrLoad", ctx, "kWh", 1).Return(nil, errors.New("connection refused"))
906+
907+
req := &positionkeepingv1.RecordMeasurementRequest{
908+
PositionStateId: logID.String(),
909+
MeasurementType: "kWh",
910+
Value: "100.5",
911+
Unit: "kWh",
912+
Timestamp: timestamppb.New(now.Add(-1 * time.Hour)),
913+
}
914+
915+
resp, err := svc.RecordMeasurement(ctx, req)
916+
917+
require.Error(t, err)
918+
require.Nil(t, resp)
919+
st, ok := status.FromError(err)
920+
require.True(t, ok)
921+
assert.Equal(t, codes.Internal, st.Code())
922+
assert.Contains(t, st.Message(), "failed to load instrument definition")
923+
mockRepo.AssertExpectations(t)
924+
mockCache.AssertExpectations(t)
925+
mockMeasurementRepo.AssertNotCalled(t, "Create")
926+
}
927+
868928
// =============================================================================
869929
// Bucket Key Generation Tests
870930
// =============================================================================
@@ -880,6 +940,12 @@ func (m *MockBucketCounter) CountBuckets(ctx context.Context, accountID string,
880940
return args.Int(0), args.Error(1)
881941
}
882942

943+
// BucketExists implements service.BucketCounter.
944+
func (m *MockBucketCounter) BucketExists(ctx context.Context, accountID string, instrumentCode string, bucketID string) (bool, error) {
945+
args := m.Called(ctx, accountID, instrumentCode, bucketID)
946+
return args.Bool(0), args.Error(1)
947+
}
948+
883949
// createTestBucketKeyProgram creates a CEL program for testing bucket key generation.
884950
// The expression should evaluate to a string.
885951
func createTestBucketKeyProgram(t *testing.T, expression string) cel.Program {
@@ -1160,7 +1226,8 @@ func TestRecordMeasurement_Cardinality_RejectsWhenLimitExceeded(t *testing.T) {
11601226

11611227
mockRepo.On("FindByID", ctx, logID).Return(positionLog, nil)
11621228
mockCache.On("GetOrLoad", ctx, "kWh", 1).Return(cachedInstrument, nil)
1163-
// Return count at limit
1229+
// New bucket (does not exist yet) and count at limit
1230+
mockBucketCounter.On("BucketExists", ctx, "test-account-123", "kWh", "new-meter-999").Return(false, nil)
11641231
mockBucketCounter.On("CountBuckets", ctx, "test-account-123", "kWh").Return(service.MaxBucketsPerAccountInstrument, nil)
11651232

11661233
req := &positionkeepingv1.RecordMeasurementRequest{
@@ -1188,6 +1255,79 @@ func TestRecordMeasurement_Cardinality_RejectsWhenLimitExceeded(t *testing.T) {
11881255
mockMeasurementRepo.AssertNotCalled(t, "Create")
11891256
}
11901257

1258+
// TestRecordMeasurement_Cardinality_AllowsExistingBucketAtLimit tests that existing buckets
1259+
// are allowed through even when the cardinality limit has been reached.
1260+
func TestRecordMeasurement_Cardinality_AllowsExistingBucketAtLimit(t *testing.T) {
1261+
ctx := context.Background()
1262+
mockRepo := new(MockRepository)
1263+
mockMeasurementRepo := new(MockMeasurementRepository)
1264+
mockEventPublisher := domain.NewInMemoryEventPublisher()
1265+
mockIdempotency := new(MockIdempotencyService)
1266+
mockCache := new(MockInstrumentCache)
1267+
mockBucketCounter := new(MockBucketCounter)
1268+
1269+
svc, err := service.NewPositionKeepingService(
1270+
mockRepo,
1271+
mockMeasurementRepo,
1272+
mockEventPublisher,
1273+
mockIdempotency,
1274+
newTestOutboxPublisher(t),
1275+
service.WithInstrumentCache(mockCache),
1276+
service.WithBucketCounter(mockBucketCounter),
1277+
)
1278+
require.NoError(t, err)
1279+
1280+
logID := uuid.New()
1281+
now := time.Now().UTC()
1282+
1283+
positionLog := &domain.FinancialPositionLog{
1284+
LogID: logID,
1285+
AccountID: "test-account-123",
1286+
StatusTracking: &domain.StatusTracking{
1287+
CurrentStatus: domain.TransactionStatusPending,
1288+
},
1289+
CreatedAt: now,
1290+
UpdatedAt: now,
1291+
Version: 1,
1292+
}
1293+
1294+
bucketKeyProgram := createTestBucketKeyProgram(t, `attributes["meter_id"]`)
1295+
1296+
cachedInstrument := &service.CachedInstrument{
1297+
InstrumentCode: "kWh",
1298+
BucketKeyProgram: bucketKeyProgram,
1299+
}
1300+
1301+
mockRepo.On("FindByID", ctx, logID).Return(positionLog, nil)
1302+
mockCache.On("GetOrLoad", ctx, "kWh", 1).Return(cachedInstrument, nil)
1303+
// Bucket already exists - should skip cardinality count entirely
1304+
mockBucketCounter.On("BucketExists", ctx, "test-account-123", "kWh", "existing-meter").Return(true, nil)
1305+
mockMeasurementRepo.On("Create", ctx, mock.AnythingOfType("*domain.Measurement")).Return(nil)
1306+
1307+
req := &positionkeepingv1.RecordMeasurementRequest{
1308+
PositionStateId: logID.String(),
1309+
MeasurementType: "kWh",
1310+
Value: "100.5",
1311+
Unit: "kWh",
1312+
Timestamp: timestamppb.New(now.Add(-1 * time.Hour)),
1313+
Metadata: map[string]string{
1314+
"meter_id": "existing-meter",
1315+
},
1316+
}
1317+
1318+
resp, err := svc.RecordMeasurement(ctx, req)
1319+
1320+
require.NoError(t, err)
1321+
require.NotNil(t, resp)
1322+
assert.NotEmpty(t, resp.MeasurementId)
1323+
mockRepo.AssertExpectations(t)
1324+
mockCache.AssertExpectations(t)
1325+
mockBucketCounter.AssertExpectations(t)
1326+
mockMeasurementRepo.AssertExpectations(t)
1327+
// CountBuckets should NOT be called since bucket already exists
1328+
mockBucketCounter.AssertNotCalled(t, "CountBuckets")
1329+
}
1330+
11911331
// TestRecordMeasurement_Cardinality_AllowsUnderLimit tests requests under limit succeed.
11921332
func TestRecordMeasurement_Cardinality_AllowsUnderLimit(t *testing.T) {
11931333
ctx := context.Background()
@@ -1232,7 +1372,8 @@ func TestRecordMeasurement_Cardinality_AllowsUnderLimit(t *testing.T) {
12321372

12331373
mockRepo.On("FindByID", ctx, logID).Return(positionLog, nil)
12341374
mockCache.On("GetOrLoad", ctx, "kWh", 1).Return(cachedInstrument, nil)
1235-
// Return count well under limit
1375+
// New bucket (does not exist yet) but count well under limit
1376+
mockBucketCounter.On("BucketExists", ctx, "test-account-123", "kWh", "meter-001").Return(false, nil)
12361377
mockBucketCounter.On("CountBuckets", ctx, "test-account-123", "kWh").Return(100, nil)
12371378
mockMeasurementRepo.On("Create", ctx, mock.AnythingOfType("*domain.Measurement")).Return(nil)
12381379

0 commit comments

Comments
 (0)