Skip to content

Commit 9a01700

Browse files
authored
feat: implement ObservationContext for market data observations (#1456)
* feat: implement ObservationContext for market data observations Add typed ObservationContext domain struct to replace the hardcoded `[]byte("{}")` placeholder in observation persistence. The context carries source metadata, collection parameters, and the attributes map used by CEL expressions for resolution key computation. - Define ObservationContext with Attributes, SourceSystem, CollectionMethod, Unit, and Notes fields - Add observationContext field to MarketPriceObservation domain model with getter and builder support - Wire JSON serialization in mappers with backward-compatible deserialization (handles null, empty, and legacy "{}" records) - Update all call sites (service layer and tests) * fix: add defensive map copies to preserve ObservationContext immutability Copy the Attributes map in both NewObservationContext (inbound) and the ObservationContext getter (outbound) to prevent callers from mutating the internal state of MarketPriceObservation through shared map references. --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 2a047c6 commit 9a01700

10 files changed

Lines changed: 365 additions & 31 deletions

File tree

services/market-information/adapters/persistence/mappers.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package persistence
33

44
import (
55
"database/sql"
6+
"encoding/json"
67

78
"github.com/google/uuid"
89
"github.com/meridianhub/meridian/services/market-information/domain"
@@ -157,7 +158,7 @@ func ObservationToEntity(o domain.MarketPriceObservation, dataSetDefinitionID uu
157158
ObservedAt: o.ObservedAt(),
158159
CreatedAt: o.CreatedAt(),
159160
Quality: o.QualityLevel().Int(),
160-
ObservationContext: []byte("{}"), // Placeholder - actual context handling TBD
161+
ObservationContext: marshalObservationContext(o.ObservationContext()),
161162
}
162163

163164
// Set numeric value from decimal
@@ -224,5 +225,35 @@ func EntityToObservation(e MarketPriceObservationEntity, dataSetCode string, tru
224225
builder.WithCausationID(e.CausationID.UUID)
225226
}
226227

228+
// Set observation context (backward-compatible with empty/null JSONB)
229+
builder.WithObservationContext(unmarshalObservationContext(e.ObservationContext))
230+
227231
return builder.Build()
228232
}
233+
234+
// marshalObservationContext serializes an ObservationContext to JSON bytes for JSONB storage.
235+
// Returns "{}" for empty contexts to maintain a valid JSONB value.
236+
func marshalObservationContext(ctx domain.ObservationContext) []byte {
237+
if ctx.IsEmpty() {
238+
return []byte("{}")
239+
}
240+
data, err := json.Marshal(ctx)
241+
if err != nil {
242+
return []byte("{}")
243+
}
244+
return data
245+
}
246+
247+
// unmarshalObservationContext deserializes JSON bytes from JSONB into an ObservationContext.
248+
// Returns an empty ObservationContext for nil, empty, or invalid JSON (backward compatibility
249+
// with existing records that store "{}").
250+
func unmarshalObservationContext(data []byte) domain.ObservationContext {
251+
if len(data) == 0 {
252+
return domain.ObservationContext{}
253+
}
254+
var ctx domain.ObservationContext
255+
if err := json.Unmarshal(data, &ctx); err != nil {
256+
return domain.ObservationContext{}
257+
}
258+
return ctx
259+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package persistence
2+
3+
import (
4+
"testing"
5+
6+
"github.com/meridianhub/meridian/services/market-information/domain"
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestMarshalObservationContext_Empty(t *testing.T) {
11+
ctx := domain.ObservationContext{}
12+
data := marshalObservationContext(ctx)
13+
assert.Equal(t, []byte("{}"), data)
14+
}
15+
16+
func TestMarshalObservationContext_WithAttributes(t *testing.T) {
17+
ctx := domain.NewObservationContext(map[string]string{
18+
"base_code": "USD",
19+
"quote_code": "EUR",
20+
})
21+
data := marshalObservationContext(ctx)
22+
23+
// Unmarshal back and verify round-trip
24+
result := unmarshalObservationContext(data)
25+
assert.Equal(t, "USD", result.Attributes["base_code"])
26+
assert.Equal(t, "EUR", result.Attributes["quote_code"])
27+
}
28+
29+
func TestMarshalObservationContext_WithAllFields(t *testing.T) {
30+
ctx := domain.ObservationContext{
31+
Attributes: map[string]string{"tenor": "1M"},
32+
SourceSystem: "bloomberg",
33+
CollectionMethod: "api-poll",
34+
Unit: "USD/oz",
35+
Notes: "manual correction",
36+
}
37+
data := marshalObservationContext(ctx)
38+
39+
result := unmarshalObservationContext(data)
40+
assert.Equal(t, "1M", result.Attributes["tenor"])
41+
assert.Equal(t, "bloomberg", result.SourceSystem)
42+
assert.Equal(t, "api-poll", result.CollectionMethod)
43+
assert.Equal(t, "USD/oz", result.Unit)
44+
assert.Equal(t, "manual correction", result.Notes)
45+
}
46+
47+
func TestUnmarshalObservationContext_Nil(t *testing.T) {
48+
result := unmarshalObservationContext(nil)
49+
assert.True(t, result.IsEmpty())
50+
}
51+
52+
func TestUnmarshalObservationContext_EmptyBytes(t *testing.T) {
53+
result := unmarshalObservationContext([]byte{})
54+
assert.True(t, result.IsEmpty())
55+
}
56+
57+
func TestUnmarshalObservationContext_EmptyJSON(t *testing.T) {
58+
result := unmarshalObservationContext([]byte("{}"))
59+
assert.True(t, result.IsEmpty())
60+
}
61+
62+
func TestUnmarshalObservationContext_InvalidJSON(t *testing.T) {
63+
result := unmarshalObservationContext([]byte("not json"))
64+
assert.True(t, result.IsEmpty())
65+
}
66+
67+
func TestUnmarshalObservationContext_LegacyEmptyObject(t *testing.T) {
68+
// Existing records store "{}" - must deserialize gracefully
69+
result := unmarshalObservationContext([]byte("{}"))
70+
assert.True(t, result.IsEmpty())
71+
assert.Nil(t, result.Attributes) // omitempty means no "attributes" key in "{}"
72+
}
73+
74+
func TestObservationContextRoundTrip_ViaMapper(t *testing.T) {
75+
ctx := domain.ObservationContext{
76+
Attributes: map[string]string{"base_code": "GBP", "quote_code": "USD"},
77+
SourceSystem: "internal-engine",
78+
CollectionMethod: "streaming-feed",
79+
}
80+
81+
// Build a domain observation with context
82+
obs := domain.NewMarketPriceObservationBuilder().
83+
WithObservationContext(ctx).
84+
Build()
85+
86+
// Assert the getter returns what we set
87+
assert.Equal(t, ctx, obs.ObservationContext())
88+
}

services/market-information/adapters/persistence/observation_repository_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func TestObservationRepository_Record_NewObservation(t *testing.T) {
8383
uuid.New(),
8484
domain.QualityLevelActual,
8585
source.TrustLevel(),
86+
domain.ObservationContext{},
8687
)
8788
require.NoError(t, err)
8889

@@ -122,6 +123,7 @@ func TestObservationRepository_Record_SupersessionByQuality(t *testing.T) {
122123
uuid.New(),
123124
domain.QualityLevelEstimate,
124125
source.TrustLevel(),
126+
domain.ObservationContext{},
125127
)
126128
require.NoError(t, err)
127129
err = tc.Repos.Observation.Record(ctx, estimate)
@@ -145,6 +147,7 @@ func TestObservationRepository_Record_SupersessionByQuality(t *testing.T) {
145147
uuid.New(),
146148
domain.QualityLevelActual,
147149
source.TrustLevel(),
150+
domain.ObservationContext{},
148151
)
149152
require.NoError(t, err)
150153
err = tc.Repos.Observation.Record(ctx, actual)
@@ -196,6 +199,7 @@ func TestObservationRepository_Query_ByDataSetCode(t *testing.T) {
196199
uuid.New(),
197200
domain.QualityLevelActual,
198201
source.TrustLevel(),
202+
domain.ObservationContext{},
199203
)
200204
require.NoError(t, err)
201205
err = tc.Repos.Observation.Record(ctx, obs)
@@ -237,6 +241,7 @@ func TestObservationRepository_Query_ByResolutionKey(t *testing.T) {
237241
uuid.New(),
238242
domain.QualityLevelActual,
239243
source.TrustLevel(),
244+
domain.ObservationContext{},
240245
)
241246
require.NoError(t, err)
242247
err = tc.Repos.Observation.Record(ctx, obs)
@@ -286,6 +291,7 @@ func TestObservationRepository_Query_ByQualityLevel(t *testing.T) {
286291
uuid.New(),
287292
q,
288293
source.TrustLevel(),
294+
domain.ObservationContext{},
289295
)
290296
require.NoError(t, err)
291297
err = tc.Repos.Observation.Record(ctx, obs)
@@ -325,6 +331,7 @@ func TestObservationRepository_Query_IncludeSuperseded(t *testing.T) {
325331
uuid.New(),
326332
domain.QualityLevelEstimate,
327333
source.TrustLevel(),
334+
domain.ObservationContext{},
328335
)
329336
require.NoError(t, err)
330337
err = tc.Repos.Observation.Record(ctx, estimate)
@@ -343,6 +350,7 @@ func TestObservationRepository_Query_IncludeSuperseded(t *testing.T) {
343350
uuid.New(),
344351
domain.QualityLevelActual,
345352
source.TrustLevel(),
353+
domain.ObservationContext{},
346354
)
347355
require.NoError(t, err)
348356
err = tc.Repos.Observation.Record(ctx, actual)
@@ -391,6 +399,7 @@ func TestObservationRepository_GetLatest_QualityLadder(t *testing.T) {
391399
uuid.New(),
392400
domain.QualityLevelVerified,
393401
source.TrustLevel(),
402+
domain.ObservationContext{},
394403
)
395404
require.NoError(t, err)
396405
err = tc.Repos.Observation.Record(ctx, verified)
@@ -409,6 +418,7 @@ func TestObservationRepository_GetLatest_QualityLadder(t *testing.T) {
409418
uuid.New(),
410419
domain.QualityLevelActual,
411420
source.TrustLevel(),
421+
domain.ObservationContext{},
412422
)
413423
require.NoError(t, err)
414424
err = tc.Repos.Observation.Record(ctx, actual)
@@ -427,6 +437,7 @@ func TestObservationRepository_GetLatest_QualityLadder(t *testing.T) {
427437
uuid.New(),
428438
domain.QualityLevelEstimate,
429439
source.TrustLevel(),
440+
domain.ObservationContext{},
430441
)
431442
require.NoError(t, err)
432443
err = tc.Repos.Observation.Record(ctx, estimate)
@@ -464,6 +475,7 @@ func TestObservationRepository_RetrieveObservation_KnowledgeBaseTime(t *testing.
464475
uuid.New(),
465476
domain.QualityLevelActual,
466477
source.TrustLevel(),
478+
domain.ObservationContext{},
467479
)
468480
require.NoError(t, err)
469481
err = tc.Repos.Observation.Record(ctx, obs)
@@ -514,6 +526,7 @@ func TestObservationRepository_Record_InvalidDataSetCode(t *testing.T) {
514526
uuid.New(),
515527
domain.QualityLevelActual,
516528
source.TrustLevel(),
529+
domain.ObservationContext{},
517530
)
518531
require.NoError(t, err)
519532

@@ -610,6 +623,7 @@ func TestObservationRepository_HierarchicalLookup_TenantOverride(t *testing.T) {
610623
uuid.New(),
611624
domain.QualityLevelActual,
612625
source.TrustLevel(),
626+
domain.ObservationContext{},
613627
)
614628
require.NoError(t, err)
615629
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -633,6 +647,7 @@ func TestObservationRepository_HierarchicalLookup_TenantOverride(t *testing.T) {
633647
uuid.New(),
634648
domain.QualityLevelActual,
635649
source.TrustLevel(),
650+
domain.ObservationContext{},
636651
)
637652
require.NoError(t, err)
638653
err = tc.Repos.Observation.Record(tenantCtx, tenantObs)
@@ -669,6 +684,7 @@ func TestObservationRepository_HierarchicalLookup_MasterFallback(t *testing.T) {
669684
uuid.New(),
670685
domain.QualityLevelActual,
671686
source.TrustLevel(),
687+
domain.ObservationContext{},
672688
)
673689
require.NoError(t, err)
674690
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -763,6 +779,7 @@ func TestObservationRepository_HierarchicalLookup_PrivateDatasetNoFallback(t *te
763779
uuid.New(),
764780
domain.QualityLevelActual,
765781
source.TrustLevel(),
782+
domain.ObservationContext{},
766783
)
767784
require.NoError(t, err)
768785
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -802,6 +819,7 @@ func TestObservationRepository_HierarchicalLookup_RestrictedAccessDenied(t *test
802819
uuid.New(),
803820
domain.QualityLevelActual,
804821
source.TrustLevel(),
822+
domain.ObservationContext{},
805823
)
806824
require.NoError(t, err)
807825
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -840,6 +858,7 @@ func TestObservationRepository_HierarchicalLookup_RestrictedAccessGranted(t *tes
840858
uuid.New(),
841859
domain.QualityLevelActual,
842860
source.TrustLevel(),
861+
domain.ObservationContext{},
843862
)
844863
require.NoError(t, err)
845864
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -884,6 +903,7 @@ func TestObservationRepository_HierarchicalLookup_RestrictedAccessExpired(t *tes
884903
uuid.New(),
885904
domain.QualityLevelActual,
886905
source.TrustLevel(),
906+
domain.ObservationContext{},
887907
)
888908
require.NoError(t, err)
889909
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -927,6 +947,7 @@ func TestObservationRepository_HierarchicalLookup_RestrictedAccessInactive(t *te
927947
uuid.New(),
928948
domain.QualityLevelActual,
929949
source.TrustLevel(),
950+
domain.ObservationContext{},
930951
)
931952
require.NoError(t, err)
932953
err = tc.Repos.Observation.Record(ctx, masterObs)
@@ -974,6 +995,7 @@ func TestObservationRepository_CountByDataset_Basic(t *testing.T) {
974995
uuid.New(),
975996
domain.QualityLevelActual,
976997
source.TrustLevel(),
998+
domain.ObservationContext{},
977999
)
9781000
require.NoError(t, err)
9791001
err = tc.Repos.Observation.Record(ctx, obs)
@@ -1017,6 +1039,7 @@ func TestObservationRepository_CountByDataset_ExcludesSuperseded(t *testing.T) {
10171039
uuid.New(),
10181040
quality,
10191041
source.TrustLevel(),
1042+
domain.ObservationContext{},
10201043
)
10211044
require.NoError(t, err)
10221045
err = tc.Repos.Observation.Record(ctx, obs)

0 commit comments

Comments
 (0)