Skip to content

Commit bd6d703

Browse files
committed
Updates
1 parent 452a0f8 commit bd6d703

7 files changed

Lines changed: 42 additions & 69 deletions

File tree

core/pkg/service/framer/calculation/group.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ func (g *group) Close() error {
3939
return g.shutdown.Close()
4040
}
4141

42-
type OnStatusChange = func(ctx context.Context, stats ...calculator.Status)
42+
type OnStatusChange = func(context.Context, ...calculator.Status)
4343

4444
type groupConfig struct {
45+
//
4546
alamos.Instrumentation
4647
Framer *framer.Service
4748
OnStatusChange OnStatusChange

core/pkg/service/framer/calculation/service.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,6 @@ type Status = calculation.Status
4545

4646
// ServiceConfig is the configuration for opening the calculation service.
4747
type ServiceConfig struct {
48-
// ChannelObservable is used to listen to real-time changes in calculated channels
49-
// so the calculation routines can be updated accordingly.
50-
//
51-
// [REQUIRED]
52-
ChannelObservable observe.Observable[gorp.TxReader[channel.Key, channel.Channel]]
5348
// DB is the underlying database for transactional operations.
5449
//
5550
// [REQUIRED]
@@ -82,7 +77,6 @@ func (c ServiceConfig) Validate() error {
8277
validate.NotNil(v, "framer", c.Framer)
8378
validate.NotNil(v, "writer", c.Writer)
8479
validate.NotNil(v, "channel", c.Channel)
85-
validate.NotNil(v, "channel_observable", c.ChannelObservable)
8680
validate.NotNil(v, "db", c.DB)
8781
validate.NotNil(v, "status", c.Status)
8882
return v.Error()
@@ -94,7 +88,6 @@ func (c ServiceConfig) Override(other ServiceConfig) ServiceConfig {
9488
c.Framer = override.Nil(c.Framer, other.Framer)
9589
c.Writer = override.Nil(c.Writer, other.Writer)
9690
c.Channel = override.Nil(c.Channel, other.Channel)
97-
c.ChannelObservable = override.Nil(c.ChannelObservable, other.ChannelObservable)
9891
c.DB = override.Nil(c.DB, other.DB)
9992
c.Status = override.Nil(c.Status, other.Status)
10093
return c
@@ -133,7 +126,7 @@ func OpenService(ctx context.Context, cfgs ...ServiceConfig) (*Service, error) {
133126
writer: cfg.Writer,
134127
statusWriter: status.NewWriter[types.Nil](cfg.Status, nil),
135128
}
136-
s.disconnectFromChannelChanges = cfg.ChannelObservable.OnChange(s.handleChange)
129+
s.disconnectFromChannelChanges = cfg.Channel.Observe().OnChange(s.handleChange)
137130
s.mu.graph = g
138131
s.mu.calculators = make(map[channel.Key]*calculator.Calculator)
139132
s.mu.groups = make(map[int]*group)

core/pkg/service/framer/calculation/service_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,11 @@ var _ = Describe("Calculation", Ordered, func() {
123123
}))
124124
wr = MustSucceed(writer.NewService(writer.ServiceConfig{Framer: dist.Framer, Channel: channelSvc}))
125125
c = MustOpen(calculation.OpenService(ctx, calculation.ServiceConfig{
126-
DB: dist.DB,
127-
Framer: dist.Framer,
128-
Writer: wr,
129-
Channel: channelSvc,
130-
ChannelObservable: channelSvc.Observe(),
131-
Status: statusSvc,
126+
DB: dist.DB,
127+
Framer: dist.Framer,
128+
Writer: wr,
129+
Channel: channelSvc,
130+
Status: statusSvc,
132131
}))
133132
})
134133

core/pkg/service/framer/service.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,12 @@ func OpenService(ctx context.Context, cfgs ...ServiceConfig) (s *Service, err er
162162
defer func() { err = cleanup(err) }()
163163
var calcSvc *calculation.Service
164164
if calcSvc, err = calculation.OpenService(ctx, calculation.ServiceConfig{
165-
Instrumentation: cfg.Child("calculation"),
166-
DB: cfg.DB,
167-
Channel: cfg.Channel,
168-
Framer: cfg.Framer,
169-
Writer: s.Writer,
170-
ChannelObservable: cfg.Channel.Observe(),
171-
Status: cfg.Status,
165+
Instrumentation: cfg.Child("calculation"),
166+
DB: cfg.DB,
167+
Channel: cfg.Channel,
168+
Framer: cfg.Framer,
169+
Writer: s.Writer,
170+
Status: cfg.Status,
172171
}); !ok(err, calcSvc) {
173172
return nil, err
174173
}

core/pkg/service/framer/streamer/bench_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,11 @@ func newBenchStreamerEnv(b *testing.B) *benchStreamerEnv {
8585
b.Fatalf("failed to open writer service: %v", err)
8686
}
8787
calc, err := calculation.OpenService(b.Context(), calculation.ServiceConfig{
88-
DB: dist.DB,
89-
Framer: dist.Framer,
90-
Writer: wr,
91-
Channel: channelSvc,
92-
ChannelObservable: channelSvc.Observe(),
93-
Status: statusSvc,
88+
DB: dist.DB,
89+
Framer: dist.Framer,
90+
Writer: wr,
91+
Channel: channelSvc,
92+
Status: statusSvc,
9493
})
9594
if err != nil {
9695
b.Fatalf("failed to open calculation service: %v", err)

core/pkg/service/framer/streamer/streamer_test.go

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ var _ = Describe("Streamer", Ordered, func() {
3535
var (
3636
dist mock.Node
3737
channelSvc *channel.Service
38-
wr *writer.Service
38+
writerSvc *writer.Service
3939
streamerSvc *streamer.Service
4040
)
4141
BeforeAll(func(ctx SpecContext) {
@@ -62,14 +62,16 @@ var _ = Describe("Streamer", Ordered, func() {
6262
Group: dist.Group,
6363
Search: dist.Search,
6464
}))
65-
wr = MustSucceed(writer.NewService(writer.ServiceConfig{Framer: dist.Framer, Channel: channelSvc}))
65+
writerSvc = MustSucceed(writer.NewService(writer.ServiceConfig{
66+
Framer: dist.Framer,
67+
Channel: channelSvc,
68+
}))
6669
calc := MustOpen(calculation.OpenService(ctx, calculation.ServiceConfig{
67-
DB: dist.DB,
68-
Framer: dist.Framer,
69-
Writer: wr,
70-
Channel: channelSvc,
71-
ChannelObservable: channelSvc.Observe(),
72-
Status: statusSvc,
70+
DB: dist.DB,
71+
Framer: dist.Framer,
72+
Writer: writerSvc,
73+
Channel: channelSvc,
74+
Status: statusSvc,
7375
}))
7476
streamerSvc = MustSucceed(streamer.NewService(streamer.ServiceConfig{
7577
DistFramer: dist.Framer,
@@ -87,7 +89,7 @@ var _ = Describe("Streamer", Ordered, func() {
8789
}
8890
Expect(channelSvc.Create(ctx, ch)).To(Succeed())
8991
keys := []channel.Key{ch.Key()}
90-
w := MustSucceed(wr.Open(ctx, framer.WriterConfig{
92+
w := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
9193
Start: telem.Now(),
9294
Keys: keys,
9395
}))
@@ -144,7 +146,7 @@ var _ = Describe("Streamer", Ordered, func() {
144146
}
145147
Expect(channelSvc.Create(ctx, calculation)).To(Succeed())
146148
keys := []channel.Key{indexCh.Key(), dataCh1.Key(), dataCh2.Key()}
147-
w := MustSucceed(wr.Open(ctx, framer.WriterConfig{
149+
w := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
148150
Start: telem.SecondTS,
149151
Keys: keys,
150152
}))
@@ -183,7 +185,7 @@ var _ = Describe("Streamer", Ordered, func() {
183185
}
184186
Expect(channelSvc.Create(ctx, calculation)).To(Succeed())
185187
keys := []channel.Key{indexCh.Key(), dataCh1.Key(), dataCh2.Key()}
186-
w := MustSucceed(wr.Open(ctx, framer.WriterConfig{
188+
w := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
187189
Start: telem.SecondTS,
188190
Keys: keys,
189191
}))
@@ -250,12 +252,12 @@ var _ = Describe("Streamer", Ordered, func() {
250252
Expect(channelSvc.Create(ctx, calculation)).To(Succeed())
251253

252254
keysA := []channel.Key{idxA.Key(), dataA.Key()}
253-
wA := MustSucceed(wr.Open(ctx, framer.WriterConfig{
255+
wA := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
254256
Start: telem.SecondTS,
255257
Keys: keysA,
256258
}))
257259
keysB := []channel.Key{idxB.Key(), dataB.Key()}
258-
wB := MustSucceed(wr.Open(ctx, framer.WriterConfig{
260+
wB := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
259261
Start: telem.SecondTS,
260262
Keys: keysB,
261263
}))
@@ -310,7 +312,7 @@ var _ = Describe("Streamer", Ordered, func() {
310312
}
311313
Expect(channelSvc.Create(ctx, ch)).To(Succeed())
312314
keys := []channel.Key{ch.Key()}
313-
w := MustSucceed(wr.Open(ctx, framer.WriterConfig{
315+
w := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
314316
Start: telem.Now(),
315317
Keys: keys,
316318
}))
@@ -381,7 +383,7 @@ var _ = Describe("Streamer", Ordered, func() {
381383
Expect(channelSvc.Create(ctx, calculation)).To(Succeed())
382384

383385
keys := []channel.Key{indexCh.Key(), dataCh1.Key(), dataCh2.Key()}
384-
w := MustSucceed(wr.Open(ctx, framer.WriterConfig{
386+
w := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
385387
Start: telem.SecondTS,
386388
Keys: keys,
387389
}))
@@ -428,7 +430,7 @@ var _ = Describe("Streamer", Ordered, func() {
428430
}
429431
Expect(channelSvc.Create(ctx, ch)).To(Succeed())
430432
keys := []channel.Key{ch.Key()}
431-
w := MustSucceed(wr.Open(ctx, framer.WriterConfig{
433+
w := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
432434
Start: telem.Now(),
433435
Keys: keys,
434436
}))
@@ -467,7 +469,7 @@ var _ = Describe("Streamer", Ordered, func() {
467469
}
468470
Expect(channelSvc.Create(ctx, ch)).To(Succeed())
469471
keys := []channel.Key{ch.Key()}
470-
w := MustSucceed(wr.Open(ctx, framer.WriterConfig{
472+
w := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
471473
Start: telem.Now(),
472474
Keys: keys,
473475
}))
@@ -505,7 +507,7 @@ var _ = Describe("Streamer", Ordered, func() {
505507
}
506508
Expect(channelSvc.Create(ctx, ch)).To(Succeed())
507509
keys := []channel.Key{ch.Key()}
508-
w := MustSucceed(wr.Open(ctx, framer.WriterConfig{
510+
w := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
509511
Start: telem.Now(),
510512
Keys: keys,
511513
}))
@@ -546,7 +548,7 @@ var _ = Describe("Streamer", Ordered, func() {
546548
}
547549
Expect(channelSvc.Create(ctx, ch)).To(Succeed())
548550
keys := []channel.Key{ch.Key()}
549-
w := MustSucceed(wr.Open(ctx, framer.WriterConfig{
551+
w := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
550552
Start: telem.Now(),
551553
Keys: keys,
552554
}))
@@ -585,7 +587,7 @@ var _ = Describe("Streamer", Ordered, func() {
585587
}
586588
Expect(channelSvc.Create(ctx, ch)).To(Succeed())
587589
keys := []channel.Key{ch.Key()}
588-
w := MustSucceed(wr.Open(ctx, framer.WriterConfig{
590+
w := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
589591
Start: telem.Now(),
590592
Keys: keys,
591593
}))
@@ -623,7 +625,7 @@ var _ = Describe("Streamer", Ordered, func() {
623625
}
624626
Expect(channelSvc.Create(ctx, ch)).To(Succeed())
625627
keys := []channel.Key{ch.Key()}
626-
w := MustSucceed(wr.Open(ctx, framer.WriterConfig{
628+
w := MustSucceed(writerSvc.Open(ctx, framer.WriterConfig{
627629
Start: telem.Now(),
628630
Keys: keys,
629631
}))

core/pkg/service/imex/imex_test.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -402,26 +402,6 @@ var _ = Describe("ImEx", func() {
402402
Expect(round).NotTo(HaveKey("-"))
403403
})
404404

405-
It("Should skip unexported fields even when they carry a json tag", func() {
406-
// Encode goes through structToMap, which only reflects over exported
407-
// fields. An unexported field with a json tag should never reach the
408-
// wire — calling Interface() on it would panic, and exposing it would
409-
// leak private state.
410-
type payload struct {
411-
Name string `json:"name"`
412-
//nolint:govet,staticcheck
413-
secret string `json:"secret"`
414-
}
415-
env := imex.Envelope{Version: 1, Type: "log"}
416-
Expect(imex.Encode(
417-
&env, payload{Name: "n", secret: "shh"},
418-
)).To(Succeed())
419-
b := MustSucceed(json.Marshal(env))
420-
var round map[string]any
421-
Expect(json.Unmarshal(b, &round)).To(Succeed())
422-
Expect(round).NotTo(HaveKey("secret"))
423-
})
424-
425405
It("Should skip exported fields that have no json tag", func() {
426406
type payload struct {
427407
Name string `json:"name"`

0 commit comments

Comments
 (0)