44 "context"
55 "sync"
66 "testing"
7+ "time"
78
89 "github.com/oklog/ulid/v2"
910 "github.com/samber/lo"
@@ -16,7 +17,9 @@ import (
1617 "github.com/openmeterio/openmeter/openmeter/customer"
1718 customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter"
1819 customerservice "github.com/openmeterio/openmeter/openmeter/customer/service"
20+ entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
1921 "github.com/openmeterio/openmeter/openmeter/entitlement"
22+ "github.com/openmeterio/openmeter/openmeter/meter"
2023 meteradapter "github.com/openmeterio/openmeter/openmeter/meter/mockadapter"
2124 "github.com/openmeterio/openmeter/openmeter/productcatalog/feature"
2225 "github.com/openmeterio/openmeter/openmeter/registry"
@@ -27,24 +30,28 @@ import (
2730 subjectservice "github.com/openmeterio/openmeter/openmeter/subject/service"
2831 "github.com/openmeterio/openmeter/openmeter/testutils"
2932 "github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
33+ "github.com/openmeterio/openmeter/pkg/clock"
3034 "github.com/openmeterio/openmeter/pkg/datetime"
3135 "github.com/openmeterio/openmeter/pkg/framework/lockr"
36+ "github.com/openmeterio/openmeter/pkg/timeutil"
3237)
3338
3439func newTestNamespace (t * testing.T ) string {
3540 t .Helper ()
3641 return ulid .Make ().String ()
3742}
3843
39- // migrateOnce serialises schema migrations to avoid concurrent-write errors from ent.
44+ // migrateOnce serializes schema migrations to avoid concurrent-write errors from ent.
4045var migrateOnce sync.Mutex
4146
4247type testDeps struct {
43- dbClient * testutils.TestDB
44- subjectService subject.Service
45- customerService customer.Service
46- featureRepo feature.FeatureRepo
47- registry * registry.Entitlement
48+ dbClient * testutils.TestDB
49+ subjectService subject.Service
50+ customerService customer.Service
51+ meterService meter.ManageService
52+ featureRepo feature.FeatureRepo
53+ registry * registry.Entitlement
54+ streamingConnector * streamingtestutils.MockStreamingConnector
4855}
4956
5057func (d * testDeps ) close (t * testing.T ) {
@@ -92,9 +99,11 @@ func setupTestDeps(t *testing.T) *testDeps {
9299 locker , err := lockr .NewLocker (& lockr.LockerConfig {Logger : logger })
93100 require .NoError (t , err )
94101
102+ streamingConnector := streamingtestutils .NewMockStreamingConnector (t )
103+
95104 reg := registrybuilder .GetEntitlementRegistry (registrybuilder.EntitlementOptions {
96105 DatabaseClient : dbClient ,
97- StreamingConnector : streamingtestutils . NewMockStreamingConnector ( t ) ,
106+ StreamingConnector : streamingConnector ,
98107 Logger : logger ,
99108 Tracer : noop .NewTracerProvider ().Tracer ("test" ),
100109 MeterService : meterService ,
@@ -107,11 +116,13 @@ func setupTestDeps(t *testing.T) *testDeps {
107116 })
108117
109118 return & testDeps {
110- dbClient : testdb ,
111- subjectService : subjectSvc ,
112- customerService : customerSvc ,
113- featureRepo : reg .FeatureRepo ,
114- registry : reg ,
119+ dbClient : testdb ,
120+ subjectService : subjectSvc ,
121+ customerService : customerSvc ,
122+ meterService : meterService ,
123+ featureRepo : reg .FeatureRepo ,
124+ registry : reg ,
125+ streamingConnector : streamingConnector ,
115126 }
116127}
117128
@@ -372,3 +383,133 @@ func TestQueryGovernanceAccess_NoFeatureKeysReturnsAll(t *testing.T) {
372383 assert .True (t , resp .Data [0 ].Features ["feat-1" ].HasAccess )
373384 assert .True (t , resp .Data [0 ].Features ["feat-2" ].HasAccess )
374385}
386+
387+ // createMeterInPG writes a meter row to ent DB (FK constraint on features.meter_id).
388+ // The mock meter adapter only stores in memory; this must be called after CreateMeter.
389+ func createMeterInPG (t * testing.T , dbClient * entdb.Client , mtr meter.Meter ) {
390+ t .Helper ()
391+ _ , err := dbClient .Meter .Create ().
392+ SetID (mtr .ID ).
393+ SetNamespace (mtr .Namespace ).
394+ SetName (mtr .Name ).
395+ SetKey (mtr .Key ).
396+ SetAggregation (mtr .Aggregation ).
397+ SetEventType (mtr .EventType ).
398+ SetNillableValueProperty (mtr .ValueProperty ).
399+ Save (t .Context ())
400+ require .NoError (t , err )
401+ }
402+
403+ func createMeter (t * testing.T , deps * testDeps , ns , key string ) meter.Meter {
404+ t .Helper ()
405+ mtr , err := deps .meterService .CreateMeter (t .Context (), meter.CreateMeterInput {
406+ Namespace : ns ,
407+ Name : key ,
408+ Key : key ,
409+ Aggregation : meter .MeterAggregationSum ,
410+ EventType : "test" ,
411+ ValueProperty : lo .ToPtr ("$.value" ),
412+ })
413+ require .NoError (t , err )
414+ createMeterInPG (t , deps .dbClient .EntDriver .Client (), mtr )
415+ return mtr
416+ }
417+
418+ func createMeteredFeatureAndEntitlement (t * testing.T , deps * testDeps , ns , featureKey string , mtr meter.Meter , cust * customer.Customer , issueAfterReset * float64 ) {
419+ t .Helper ()
420+ feat , err := deps .featureRepo .CreateFeature (t .Context (), feature.CreateFeatureInputs {
421+ Key : featureKey ,
422+ Name : featureKey ,
423+ Namespace : ns ,
424+ MeterID : lo .ToPtr (mtr .ID ),
425+ })
426+ require .NoError (t , err )
427+
428+ _ , err = deps .registry .Entitlement .CreateEntitlement (t .Context (), entitlement.CreateEntitlementInputs {
429+ Namespace : ns ,
430+ UsageAttribution : cust .GetUsageAttribution (),
431+ FeatureKey : lo .ToPtr (featureKey ),
432+ FeatureID : lo .ToPtr (feat .ID ),
433+ EntitlementType : entitlement .EntitlementTypeMetered ,
434+ UsagePeriod : lo .ToPtr (entitlement .NewUsagePeriodInputFromRecurrence (timeutil.Recurrence {
435+ Interval : timeutil .RecurrencePeriodDaily ,
436+ Anchor : clock .Now (),
437+ })),
438+ IssueAfterReset : issueAfterReset ,
439+ }, nil )
440+ require .NoError (t , err )
441+ }
442+
443+ func TestQueryGovernanceAccess_MeteredEntitlement_HasAccess (t * testing.T ) {
444+ deps := setupTestDeps (t )
445+ t .Cleanup (func () { deps .close (t ) })
446+
447+ now := time .Date (2025 , 1 , 1 , 0 , 0 , 0 , 0 , time .UTC )
448+ clock .SetTime (now )
449+ defer clock .ResetTime ()
450+
451+ h := newTestHandler (deps )
452+ ns := newTestNamespace (t )
453+
454+ mtr := createMeter (t , deps , ns , "api-calls" )
455+ cust := createCustomer (t , deps , ns , "acme" , []string {"acme" })
456+ // IssueAfterReset=10.0 → balance starts at 10, HasAccess=true
457+ createMeteredFeatureAndEntitlement (t , deps , ns , "premium" , mtr , cust , lo .ToPtr (10.0 ))
458+
459+ // Add an event so the streaming mock has data for the meter.
460+ deps .streamingConnector .AddSimpleEvent (mtr .Key , 1 , now )
461+
462+ clock .SetTime (now .Add (time .Hour ))
463+
464+ resp , err := h .processGovernanceQuery (t .Context (), queryGovernanceAccessRequest {
465+ Namespace : ns ,
466+ Body : api.GovernanceQueryRequest {
467+ Customer : api.GovernanceQueryRequestCustomers {Keys : []string {"acme" }},
468+ Feature : & api.GovernanceQueryRequestFeatures {Keys : []string {"premium" }},
469+ },
470+ PageSize : defaultPageSize ,
471+ })
472+ require .NoError (t , err )
473+ require .Len (t , resp .Data , 1 )
474+ assert .Empty (t , resp .Errors )
475+ featureAccess := resp .Data [0 ].Features ["premium" ]
476+ assert .True (t , featureAccess .HasAccess )
477+ assert .Nil (t , featureAccess .Reason )
478+ }
479+
480+ func TestQueryGovernanceAccess_MeteredEntitlement_Exhausted (t * testing.T ) {
481+ deps := setupTestDeps (t )
482+ t .Cleanup (func () { deps .close (t ) })
483+
484+ now := time .Date (2025 , 1 , 1 , 0 , 0 , 0 , 0 , time .UTC )
485+ clock .SetTime (now )
486+ defer clock .ResetTime ()
487+
488+ h := newTestHandler (deps )
489+ ns := newTestNamespace (t )
490+
491+ mtr := createMeter (t , deps , ns , "api-calls" )
492+ cust := createCustomer (t , deps , ns , "acme" , []string {"acme" })
493+ // No IssueAfterReset → balance=0, HasAccess=false → UsageLimitReached
494+ createMeteredFeatureAndEntitlement (t , deps , ns , "premium" , mtr , cust , nil )
495+
496+ deps .streamingConnector .AddSimpleEvent (mtr .Key , 1 , now )
497+
498+ clock .SetTime (now .Add (time .Hour ))
499+
500+ resp , err := h .processGovernanceQuery (t .Context (), queryGovernanceAccessRequest {
501+ Namespace : ns ,
502+ Body : api.GovernanceQueryRequest {
503+ Customer : api.GovernanceQueryRequestCustomers {Keys : []string {"acme" }},
504+ Feature : & api.GovernanceQueryRequestFeatures {Keys : []string {"premium" }},
505+ },
506+ PageSize : defaultPageSize ,
507+ })
508+ require .NoError (t , err )
509+ require .Len (t , resp .Data , 1 )
510+ assert .Empty (t , resp .Errors )
511+ featureAccess := resp .Data [0 ].Features ["premium" ]
512+ assert .False (t , featureAccess .HasAccess )
513+ require .NotNil (t , featureAccess .Reason )
514+ assert .Equal (t , api .GovernanceFeatureAccessReasonCodeUsageLimitReached , featureAccess .Reason .Code )
515+ }
0 commit comments