Skip to content

Commit 96cdbaa

Browse files
committed
feat: implement ObservationContext for market data observations
Add typed ObservationContext domain struct to replace the hardcoded `[]byte("{}")` placeholder in observation persistence. The context carries source metadata, collection parameters, and the attributes map used by CEL expressions for resolution key computation. - Define ObservationContext with Attributes, SourceSystem, CollectionMethod, Unit, and Notes fields - Add observationContext field to MarketPriceObservation domain model with getter and builder support - Wire JSON serialization in mappers with backward-compatible deserialization (handles null, empty, and legacy "{}" records) - Update all call sites (service layer and tests)
1 parent 765be40 commit 96cdbaa

10 files changed

Lines changed: 354 additions & 31 deletions

File tree

services/market-information/adapters/persistence/mappers.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package persistence
33

44
import (
55
"database/sql"
6+
"encoding/json"
67

78
"github.com/google/uuid"
89
"github.com/meridianhub/meridian/services/market-information/domain"
@@ -157,7 +158,7 @@ func ObservationToEntity(o domain.MarketPriceObservation, dataSetDefinitionID uu
157158
ObservedAt: o.ObservedAt(),
158159
CreatedAt: o.CreatedAt(),
159160
Quality: o.QualityLevel().Int(),
160-
ObservationContext: []byte("{}"), // Placeholder - actual context handling TBD
161+
ObservationContext: marshalObservationContext(o.ObservationContext()),
161162
}
162163

163164
// Set numeric value from decimal
@@ -224,5 +225,35 @@ func EntityToObservation(e MarketPriceObservationEntity, dataSetCode string, tru
224225
builder.WithCausationID(e.CausationID.UUID)
225226
}
226227

228+
// Set observation context (backward-compatible with empty/null JSONB)
229+
builder.WithObservationContext(unmarshalObservationContext(e.ObservationContext))
230+
227231
return builder.Build()
228232
}
233+
234+
// marshalObservationContext serializes an ObservationContext to JSON bytes for JSONB storage.
235+
// Returns "{}" for empty contexts to maintain a valid JSONB value.
236+
func marshalObservationContext(ctx domain.ObservationContext) []byte {
237+
if ctx.IsEmpty() {
238+
return []byte("{}")
239+
}
240+
data, err := json.Marshal(ctx)
241+
if err != nil {
242+
return []byte("{}")
243+
}
244+
return data
245+
}
246+
247+
// unmarshalObservationContext deserializes JSON bytes from JSONB into an ObservationContext.
248+
// Returns an empty ObservationContext for nil, empty, or invalid JSON (backward compatibility
249+
// with existing records that store "{}").
250+
func unmarshalObservationContext(data []byte) domain.ObservationContext {
251+
if len(data) == 0 {
252+
return domain.ObservationContext{}
253+
}
254+
var ctx domain.ObservationContext
255+
if err := json.Unmarshal(data, &ctx); err != nil {
256+
return domain.ObservationContext{}
257+
}
258+
return ctx
259+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package persistence
2+
3+
import (
4+
"testing"
5+
6+
"github.com/meridianhub/meridian/services/market-information/domain"
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestMarshalObservationContext_Empty(t *testing.T) {
11+
ctx := domain.ObservationContext{}
12+
data := marshalObservationContext(ctx)
13+
assert.Equal(t, []byte("{}"), data)
14+
}
15+
16+
func TestMarshalObservationContext_WithAttributes(t *testing.T) {
17+
ctx := domain.NewObservationContext(map[string]string{
18+
"base_code": "USD",
19+
"quote_code": "EUR",
20+
})
21+
data := marshalObservationContext(ctx)
22+
23+
// Unmarshal back and verify round-trip
24+
result := unmarshalObservationContext(data)
25+
assert.Equal(t, "USD", result.Attributes["base_code"])
26+
assert.Equal(t, "EUR", result.Attributes["quote_code"])
27+
}
28+
29+
func TestMarshalObservationContext_WithAllFields(t *testing.T) {
30+
ctx := domain.ObservationContext{
31+
Attributes: map[string]string{"tenor": "1M"},
32+
SourceSystem: "bloomberg",
33+
CollectionMethod: "api-poll",
34+
Unit: "USD/oz",
35+
Notes: "manual correction",
36+
}
37+
data := marshalObservationContext(ctx)
38+
39+
result := unmarshalObservationContext(data)
40+
assert.Equal(t, "1M", result.Attributes["tenor"])
41+
assert.Equal(t, "bloomberg", result.SourceSystem)
42+
assert.Equal(t, "api-poll", result.CollectionMethod)
43+
assert.Equal(t, "USD/oz", result.Unit)
44+
assert.Equal(t, "manual correction", result.Notes)
45+
}
46+
47+
func TestUnmarshalObservationContext_Nil(t *testing.T) {
48+
result := unmarshalObservationContext(nil)
49+
assert.True(t, result.IsEmpty())
50+
}
51+
52+
func TestUnmarshalObservationContext_EmptyBytes(t *testing.T) {
53+
result := unmarshalObservationContext([]byte{})
54+
assert.True(t, result.IsEmpty())
55+
}
56+
57+
func TestUnmarshalObservationContext_EmptyJSON(t *testing.T) {
58+
result := unmarshalObservationContext([]byte("{}"))
59+
assert.True(t, result.IsEmpty())
60+
}
61+
62+
func TestUnmarshalObservationContext_InvalidJSON(t *testing.T) {
63+
result := unmarshalObservationContext([]byte("not json"))
64+
assert.True(t, result.IsEmpty())
65+
}
66+
67+
func TestUnmarshalObservationContext_LegacyEmptyObject(t *testing.T) {
68+
// Existing records store "{}" - must deserialize gracefully
69+
result := unmarshalObservationContext([]byte("{}"))
70+
assert.True(t, result.IsEmpty())
71+
assert.Nil(t, result.Attributes) // omitempty means no "attributes" key in "{}"
72+
}
73+
74+
func TestObservationContextRoundTrip_ViaMapper(t *testing.T) {
75+
ctx := domain.ObservationContext{
76+
Attributes: map[string]string{"base_code": "GBP", "quote_code": "USD"},
77+
SourceSystem: "internal-engine",
78+
CollectionMethod: "streaming-feed",
79+
}
80+
81+
// Build a domain observation with context
82+
obs := domain.NewMarketPriceObservationBuilder().
83+
WithObservationContext(ctx).
84+
Build()
85+
86+
// Assert the getter returns what we set
87+
assert.Equal(t, ctx, obs.ObservationContext())
88+
}

services/market-information/adapters/persistence/observation_repository_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func TestObservationRepository_Record_NewObservation(t *testing.T) {
8383
uuid.New(),
8484
domain.QualityLevelActual,
8585
source.TrustLevel(),
86+
domain.ObservationContext{},
8687
)
8788
require.NoError(t, err)
8889

@@ -122,6 +123,7 @@ func TestObservationRepository_Record_SupersessionByQuality(t *testing.T) {
122123
uuid.New(),
123124
domain.QualityLevelEstimate,
124125
source.TrustLevel(),
126+
domain.ObservationContext{},
125127
)
126128
require.NoError(t, err)
127129
err = tc.Repos.Observation.Record(ctx, estimate)
@@ -145,6 +147,7 @@ func TestObservationRepository_Record_SupersessionByQuality(t *testing.T) {
145147
uuid.New(),
146148
domain.QualityLevelActual,
147149
source.TrustLevel(),
150+
domain.ObservationContext{},
148151
)
149152
require.NoError(t, err)
150153
err = tc.Repos.Observation.Record(ctx, actual)
@@ -196,6 +199,7 @@ func TestObservationRepository_Query_ByDataSetCode(t *testing.T) {
196199
uuid.New(),
197200
domain.QualityLevelActual,
198201
source.TrustLevel(),
202+
domain.ObservationContext{},
199203
)
200204
require.NoError(t, err)
201205
err = tc.Repos.Observation.Record(ctx, obs)
@@ -237,6 +241,7 @@ func TestObservationRepository_Query_ByResolutionKey(t *testing.T) {
237241
uuid.New(),
238242
domain.QualityLevelActual,
239243
source.TrustLevel(),
244+
domain.ObservationContext{},
240245
)
241246
require.NoError(t, err)
242247
err = tc.Repos.Observation.Record(ctx, obs)
@@ -286,6 +291,7 @@ func TestObservationRepository_Query_ByQualityLevel(t *testing.T) {
286291
uuid.New(),
287292
q,
288293
source.TrustLevel(),
294+
domain.ObservationContext{},
289295
)
290296
require.NoError(t, err)
291297
err = tc.Repos.Observation.Record(ctx, obs)
@@ -325,6 +331,7 @@ func TestObservationRepository_Query_IncludeSuperseded(t *testing.T) {
325331
uuid.New(),
326332
domain.QualityLevelEstimate,
327333
source.TrustLevel(),
334+
domain.ObservationContext{},
328335
)
329336
require.NoError(t, err)
330337
err = tc.Repos.Observation.Record(ctx, estimate)
@@ -343,6 +350,7 @@ func TestObservationRepository_Query_IncludeSuperseded(t *testing.T) {
343350
uuid.New(),
344351
domain.QualityLevelActual,
345352
source.TrustLevel(),
353+
domain.ObservationContext{},
346354
)
347355
require.NoError(t, err)
348356
err = tc.Repos.Observation.Record(ctx, actual)
@@ -391,6 +399,7 @@ func TestObservationRepository_GetLatest_QualityLadder(t *testing.T) {
391399
uuid.New(),
392400
domain.QualityLevelVerified,
393401
source.TrustLevel(),
402+
domain.ObservationContext{},
394403
)
395404
require.NoError(t, err)
396405
err = tc.Repos.Observation.Record(ctx, verified)
@@ -409,6 +418,7 @@ func TestObservationRepository_GetLatest_QualityLadder(t *testing.T) {
409418
uuid.New(),
410419
domain.QualityLevelActual,
411420
source.TrustLevel(),
421+
domain.ObservationContext{},
412422
)
413423
require.NoError(t, err)
414424
err = tc.Repos.Observation.Record(ctx, actual)
@@ -427,6 +437,7 @@ func TestObservationRepository_GetLatest_QualityLadder(t *testing.T) {
427437
uuid.New(),
428438
domain.QualityLevelEstimate,
429439
source.TrustLevel(),
440+
domain.ObservationContext{},
430441
)
431442
require.NoError(t, err)
432443
err = tc.Repos.Observation.Record(ctx, estimate)
@@ -464,6 +475,7 @@ func TestObservationRepository_RetrieveObservation_KnowledgeBaseTime(t *testing.
464475
uuid.New(),
465476
domain.QualityLevelActual,
466477
source.TrustLevel(),
478+
domain.ObservationContext{},
467479
)
468480
require.NoError(t, err)
469481
err = tc.Repos.Observation.Record(ctx, obs)
@@ -514,6 +526,7 @@ func TestObservationRepository_Record_InvalidDataSetCode(t *testing.T) {
514526
uuid.New(),
515527
domain.QualityLevelActual,
516528
source.TrustLevel(),
529+
domain.ObservationContext{},
517530
)
518531
require.NoError(t, err)
519532

@@ -610,6 +623,7 @@ func TestObservationRepository_HierarchicalLookup_TenantOverride(t *testing.T) {
610623
uuid.New(),
611624
domain.QualityLevelActual,
612625
source.TrustLevel(),
626+
domain.ObservationContext{},
613627
)
614628
require.NoError(t, err)
615629
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -633,6 +647,7 @@ func TestObservationRepository_HierarchicalLookup_TenantOverride(t *testing.T) {
633647
uuid.New(),
634648
domain.QualityLevelActual,
635649
source.TrustLevel(),
650+
domain.ObservationContext{},
636651
)
637652
require.NoError(t, err)
638653
err = tc.Repos.Observation.Record(tenantCtx, tenantObs)
@@ -669,6 +684,7 @@ func TestObservationRepository_HierarchicalLookup_MasterFallback(t *testing.T) {
669684
uuid.New(),
670685
domain.QualityLevelActual,
671686
source.TrustLevel(),
687+
domain.ObservationContext{},
672688
)
673689
require.NoError(t, err)
674690
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -763,6 +779,7 @@ func TestObservationRepository_HierarchicalLookup_PrivateDatasetNoFallback(t *te
763779
uuid.New(),
764780
domain.QualityLevelActual,
765781
source.TrustLevel(),
782+
domain.ObservationContext{},
766783
)
767784
require.NoError(t, err)
768785
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -802,6 +819,7 @@ func TestObservationRepository_HierarchicalLookup_RestrictedAccessDenied(t *test
802819
uuid.New(),
803820
domain.QualityLevelActual,
804821
source.TrustLevel(),
822+
domain.ObservationContext{},
805823
)
806824
require.NoError(t, err)
807825
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -840,6 +858,7 @@ func TestObservationRepository_HierarchicalLookup_RestrictedAccessGranted(t *tes
840858
uuid.New(),
841859
domain.QualityLevelActual,
842860
source.TrustLevel(),
861+
domain.ObservationContext{},
843862
)
844863
require.NoError(t, err)
845864
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -884,6 +903,7 @@ func TestObservationRepository_HierarchicalLookup_RestrictedAccessExpired(t *tes
884903
uuid.New(),
885904
domain.QualityLevelActual,
886905
source.TrustLevel(),
906+
domain.ObservationContext{},
887907
)
888908
require.NoError(t, err)
889909
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -927,6 +947,7 @@ func TestObservationRepository_HierarchicalLookup_RestrictedAccessInactive(t *te
927947
uuid.New(),
928948
domain.QualityLevelActual,
929949
source.TrustLevel(),
950+
domain.ObservationContext{},
930951
)
931952
require.NoError(t, err)
932953
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -974,6 +995,7 @@ func TestObservationRepository_CountByDataset_Basic(t *testing.T) {
974995
uuid.New(),
975996
domain.QualityLevelActual,
976997
source.TrustLevel(),
998+
domain.ObservationContext{},
977999
)
9781000
require.NoError(t, err)
9791001
err = tc.Repos.Observation.Record(ctx, obs)
@@ -1017,6 +1039,7 @@ func TestObservationRepository_CountByDataset_ExcludesSuperseded(t *testing.T) {
10171039
uuid.New(),
10181040
quality,
10191041
source.TrustLevel(),
1042+
domain.ObservationContext{},
10201043
)
10211044
require.NoError(t, err)
10221045
err = tc.Repos.Observation.Record(ctx, obs)

0 commit comments

Comments
 (0)