Skip to content

Commit 02af34a

Browse files
committed
feat: add observer
1 parent f997aca commit 02af34a

File tree

4 files changed

+240
-0
lines changed

4 files changed

+240
-0
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/spf13/cast v1.7.1
1010
github.com/stretchr/testify v1.10.0
1111
github.com/things-go/limiter v0.1.5
12+
github.com/things-go/proc v0.0.0-20250308140217-2fbab6367193
1213
github.com/xuri/excelize/v2 v2.9.0
1314
go.opentelemetry.io/otel/sdk v1.35.0
1415
go.opentelemetry.io/otel/trace v1.35.0

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
7777
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
7878
github.com/things-go/limiter v0.1.5 h1:vCVzm0897TSWFhy2G52rU27hdhJl/lgvfiRQrek53JQ=
7979
github.com/things-go/limiter v0.1.5/go.mod h1:C7SPsXIfiPPC44k/mb0TCwZa5DzqxXcL7Zg6g5hPL5I=
80+
github.com/things-go/proc v0.0.0-20250308140217-2fbab6367193 h1:3+vbs76RM7I1SsaOrxeSp/gvQzlu7+B8zodgPHxO4NU=
81+
github.com/things-go/proc v0.0.0-20250308140217-2fbab6367193/go.mod h1:aQJ7LTcxq+1R1Air5QGcYd9GsS5WPw45Ix/pHfQSPO4=
8082
github.com/xuri/efp v0.0.0-20250227110027-3491fafc2b79 h1:78nKszZqigiBRBVcoe/AuPzyLTWW5B+ltBaUX1rlIXA=
8183
github.com/xuri/efp v0.0.0-20250227110027-3491fafc2b79/go.mod h1:ybY/Jr0T0GTCnYjKqmdwxyxn2BQf2RcQIIvex5QldPI=
8284
github.com/xuri/excelize/v2 v2.9.0 h1:1tgOaEq92IOEumR1/JfYS/eR0KHOCsRv/rYXXh6YJQE=

observer/observer.go

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package observer
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/things-go/proc-extra/gpool"
9+
"github.com/things-go/proc/topic"
10+
)
11+
12+
type Message interface {
13+
IntoPayload() ([]byte, error)
14+
}
15+
16+
type Observer interface {
17+
Name() string
18+
Dispose(context.Context, Message) error
19+
}
20+
21+
type AllowOptions struct {
22+
spawn bool
23+
}
24+
25+
type AllowOption func(*AllowOptions)
26+
27+
func newAllowOption(opts ...AllowOption) AllowOptions {
28+
ao := AllowOptions{
29+
spawn: false,
30+
}
31+
for _, f := range opts {
32+
f(&ao)
33+
}
34+
return ao
35+
}
36+
37+
func AllowSpawn() AllowOption {
38+
return func(a *AllowOptions) {
39+
a.spawn = true
40+
}
41+
}
42+
43+
type ConcreteObserver struct {
44+
subs *topic.Tree
45+
wg sync.WaitGroup
46+
errHandler func(ctx context.Context, name string, err error)
47+
}
48+
49+
func NewConcreteObserver() *ConcreteObserver {
50+
return &ConcreteObserver{
51+
subs: topic.NewStandardTree(),
52+
errHandler: func(ctx context.Context, name string, err error) {},
53+
}
54+
}
55+
func (cc *ConcreteObserver) SetErrHandler(f func(ctx context.Context, name string, err error)) *ConcreteObserver {
56+
if f != nil {
57+
cc.errHandler = f
58+
}
59+
return cc
60+
}
61+
62+
func (cc *ConcreteObserver) GetObserverNames(topic string) []string {
63+
values := cc.subs.Match(topic)
64+
names := make([]string, 0, len(values))
65+
for _, v := range values {
66+
names = append(names, v.(Observer).Name())
67+
}
68+
return names
69+
}
70+
71+
func (cc *ConcreteObserver) Notify(ctx context.Context, topic string, m Message, opts ...AllowOption) error {
72+
ao := newAllowOption(opts...)
73+
values := cc.subs.Match(topic)
74+
for _, v := range values {
75+
ob := v.(Observer)
76+
if ao.spawn {
77+
cc.wg.Add(1)
78+
gpool.Go(func() {
79+
defer cc.wg.Done()
80+
err := ob.Dispose(ctx, m)
81+
if err != nil {
82+
cc.errHandler(ctx, ob.Name(), err)
83+
}
84+
})
85+
} else {
86+
err := ob.Dispose(ctx, m)
87+
if err != nil {
88+
return fmt.Errorf("observer: '%v' dispose failure, %w", ob.Name(), err)
89+
}
90+
}
91+
}
92+
return nil
93+
}
94+
95+
func (cc *ConcreteObserver) AddObserver(topic string, ob Observer) error {
96+
cc.subs.Add(topic, ob)
97+
return nil
98+
}
99+
100+
func (cc *ConcreteObserver) DeleteObserver(topic string, ob Observer) error {
101+
cc.subs.Remove(topic, ob)
102+
return nil
103+
}
104+
105+
func (cc *ConcreteObserver) Close() error {
106+
cc.subs.Reset()
107+
cc.wg.Wait()
108+
return nil
109+
}

observer/observer_test.go

+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package observer
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"log"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
var _ Observer = (*c1)(nil)
14+
var _ Observer = (*c2)(nil)
15+
var _ Observer = (*c3)(nil)
16+
17+
type c1 struct{}
18+
19+
func (c *c1) Name() string { return "c1" }
20+
func (c *c1) Dispose(ctx context.Context, m Message) error {
21+
log.Printf("dispose c1! notifier: %v\n", m.(*CallMessage).Name)
22+
return nil
23+
}
24+
25+
type c2 struct{}
26+
27+
func (c *c2) Name() string { return "c2" }
28+
func (c *c2) Dispose(ctx context.Context, m Message) error {
29+
cm := m.(*CallMessage)
30+
if cm.Name == "c2nodo" {
31+
return errors.New("no do for this notifier!")
32+
}
33+
log.Printf("dispose c2! notifier: %v\n", cm.Name)
34+
return nil
35+
}
36+
37+
type c3 struct{}
38+
39+
func (c *c3) Name() string { return "c3" }
40+
func (c *c3) Dispose(ctx context.Context, m Message) error {
41+
log.Printf("dispose c3! notifier: %v\n", m.(*CallMessage).Name)
42+
return nil
43+
}
44+
45+
type CallMessage struct {
46+
Name string `json:"name"`
47+
}
48+
49+
func (c *CallMessage) IntoPayload() ([]byte, error) {
50+
return json.Marshal(c)
51+
}
52+
53+
func Test_ObserverCallChain(t *testing.T) {
54+
var err error
55+
56+
topic := "/observer/call-chain"
57+
cc := NewConcreteObserver()
58+
defer cc.Close()
59+
cc1 := &c1{}
60+
cc2 := &c2{}
61+
cc3 := &c3{}
62+
err = cc.AddObserver(topic, cc1)
63+
require.NoError(t, err)
64+
err = cc.AddObserver(topic, cc2)
65+
require.NoError(t, err)
66+
err = cc.AddObserver(topic, cc3)
67+
require.NoError(t, err)
68+
69+
t.Run("match names", func(t *testing.T) {
70+
names := cc.GetObserverNames(topic)
71+
t.Logf("Observers for topic '%s': %v", topic, names)
72+
})
73+
74+
t.Run("normal", func(t *testing.T) {
75+
err := cc.Notify(context.Background(), topic, &CallMessage{
76+
Name: "aaa",
77+
})
78+
require.NoError(t, err)
79+
})
80+
81+
t.Run("error stop", func(t *testing.T) {
82+
err := cc.Notify(context.Background(), topic, &CallMessage{
83+
Name: "c2nodo",
84+
})
85+
require.Error(t, err)
86+
t.Log(err)
87+
})
88+
89+
t.Run("drop c2(which can't do) then notify", func(t *testing.T) {
90+
err := cc.DeleteObserver(topic, cc2)
91+
require.NoError(t, err)
92+
err = cc.Notify(context.Background(), topic, &CallMessage{
93+
Name: "c2nodo",
94+
})
95+
require.NoError(t, err)
96+
})
97+
}
98+
99+
func Test_ObserverSpawn(t *testing.T) {
100+
var err error
101+
102+
topic := "/observer/spawn"
103+
cc := NewConcreteObserver()
104+
defer cc.Close()
105+
cc1 := &c1{}
106+
cc2 := &c2{}
107+
cc3 := &c3{}
108+
err = cc.AddObserver(topic, cc1)
109+
require.NoError(t, err)
110+
err = cc.AddObserver(topic, cc2)
111+
require.NoError(t, err)
112+
err = cc.AddObserver(topic, cc3)
113+
require.NoError(t, err)
114+
115+
t.Run("normal", func(t *testing.T) {
116+
err := cc.Notify(context.Background(), topic, &CallMessage{
117+
Name: "aaa",
118+
}, AllowSpawn())
119+
require.NoError(t, err)
120+
})
121+
122+
t.Run("error stop", func(t *testing.T) {
123+
err := cc.Notify(context.Background(), topic, &CallMessage{
124+
Name: "c2nodo",
125+
}, AllowSpawn())
126+
require.NoError(t, err)
127+
})
128+
}

0 commit comments

Comments
 (0)