File tree 2 files changed +8
-3
lines changed
2 files changed +8
-3
lines changed Original file line number Diff line number Diff line change @@ -10,6 +10,7 @@ type Observable[T any] struct {
10
10
listener map [Subscription [T ]]* Subscriber [T ]
11
11
mux sync.Mutex
12
12
done bool
13
+ stopCh chan struct {}
13
14
}
14
15
15
16
func (o * Observable [T ]) process () {
@@ -31,6 +32,7 @@ func (o *Observable[T]) close() {
31
32
for _ , sub := range o .listener {
32
33
sub .Close ()
33
34
}
35
+ close (o .stopCh )
34
36
}
35
37
36
38
func (o * Observable [T ]) Subscribe () (Subscription [T ], error ) {
@@ -59,6 +61,7 @@ func NewObservable[T any](iter Iterable[T]) *Observable[T] {
59
61
observable := & Observable [T ]{
60
62
iterable : iter ,
61
63
listener : map [Subscription [T ]]* Subscriber [T ]{},
64
+ stopCh : make (chan struct {}),
62
65
}
63
66
go observable .process ()
64
67
return observable
Original file line number Diff line number Diff line change @@ -70,9 +70,11 @@ func TestObservable_SubscribeClosedSource(t *testing.T) {
70
70
src := NewObservable [int ](iter )
71
71
data , _ := src .Subscribe ()
72
72
<- data
73
-
74
- _ , closed := src .Subscribe ()
75
- assert .NotNil (t , closed )
73
+ select {
74
+ case <- src .stopCh :
75
+ case <- time .After (time .Second ):
76
+ assert .Fail (t , "timeout not stop" )
77
+ }
76
78
}
77
79
78
80
func TestObservable_UnSubscribeWithNotExistSubscription (t * testing.T ) {
You can’t perform that action at this time.
0 commit comments