File tree 7 files changed +121
-22
lines changed
7 files changed +121
-22
lines changed Original file line number Diff line number Diff line change @@ -52,11 +52,6 @@ func New[T key](interval time.Duration) *Batcher[T] {
52
52
}
53
53
}
54
54
55
- // WithClock sets the clock used by the batcher. Used for testing.
56
- func (b * Batcher [T ]) WithClock (clock clock.WithDelayedExecution ) {
57
- b .clock = clock
58
- }
59
-
60
55
// Subscribe adds a new event channel subscriber. If the batcher is closed, the
61
56
// subscriber is silently dropped.
62
57
func (b * Batcher [T ]) Subscribe (eventCh ... chan <- struct {}) {
Original file line number Diff line number Diff line change @@ -30,13 +30,6 @@ func TestNew(t *testing.T) {
30
30
assert .False (t , b .closed .Load ())
31
31
}
32
32
33
- func TestWithClock (t * testing.T ) {
34
- b := New [string ](time .Millisecond * 10 )
35
- fakeClock := testingclock .NewFakeClock (time .Now ())
36
- b .WithClock (fakeClock )
37
- assert .Equal (t , fakeClock , b .clock )
38
- }
39
-
40
33
func TestSubscribe (t * testing.T ) {
41
34
t .Parallel ()
42
35
Original file line number Diff line number Diff line change
1
+ //go:build unit
2
+ // +build unit
3
+
4
+ /*
5
+ Copyright 2023 The Dapr Authors
6
+ Licensed under the Apache License, Version 2.0 (the "License");
7
+ you may not use this file except in compliance with the License.
8
+ You may obtain a copy of the License at
9
+ http://www.apache.org/licenses/LICENSE-2.0
10
+ Unless required by applicable law or agreed to in writing, software
11
+ distributed under the License is distributed on an "AS IS" BASIS,
12
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ See the License for the specific language governing permissions and
14
+ limitations under the License.
15
+ */
16
+
17
+ package batcher
18
+
19
+ import (
20
+ "k8s.io/utils/clock"
21
+ )
22
+
23
+ // WithClock sets the clock used by the batcher. Used for testing.
24
+ func (b * Batcher [T ]) WithClock (clock clock.WithDelayedExecution ) {
25
+ b .clock = clock
26
+ }
Original file line number Diff line number Diff line change
1
+ //go:build unit
2
+ // +build unit
3
+
4
+ /*
5
+ Copyright 2023 The Dapr Authors
6
+ Licensed under the Apache License, Version 2.0 (the "License");
7
+ you may not use this file except in compliance with the License.
8
+ You may obtain a copy of the License at
9
+ http://www.apache.org/licenses/LICENSE-2.0
10
+ Unless required by applicable law or agreed to in writing, software
11
+ distributed under the License is distributed on an "AS IS" BASIS,
12
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ See the License for the specific language governing permissions and
14
+ limitations under the License.
15
+ */
16
+
17
+ package batcher
18
+
19
+ import (
20
+ "testing"
21
+ "time"
22
+
23
+ "github.com/stretchr/testify/assert"
24
+ clocktesting "k8s.io/utils/clock/testing"
25
+ )
26
+
27
+ func TestWithClock (t * testing.T ) {
28
+ b := New [string ](time .Millisecond * 10 )
29
+ fakeClock := clocktesting .NewFakeClock (time .Now ())
30
+ b .WithClock (fakeClock )
31
+ assert .Equal (t , fakeClock , b .clock )
32
+ }
Original file line number Diff line number Diff line change @@ -19,7 +19,7 @@ import (
19
19
"sync/atomic"
20
20
"time"
21
21
22
- kclock "k8s.io/utils/clock"
22
+ "k8s.io/utils/clock"
23
23
)
24
24
25
25
// ErrProcessorStopped is returned when the processor is not running.
@@ -29,7 +29,7 @@ var ErrProcessorStopped = errors.New("processor is stopped")
29
29
type Processor [T queueable ] struct {
30
30
executeFn func (r T )
31
31
queue queue [T ]
32
- clock kclock .Clock
32
+ clock clock .Clock
33
33
lock sync.Mutex
34
34
wg sync.WaitGroup
35
35
processorRunningCh chan struct {}
@@ -47,16 +47,10 @@ func NewProcessor[T queueable](executeFn func(r T)) *Processor[T] {
47
47
processorRunningCh : make (chan struct {}, 1 ),
48
48
stopCh : make (chan struct {}),
49
49
resetCh : make (chan struct {}, 1 ),
50
- clock : kclock .RealClock {},
50
+ clock : clock .RealClock {},
51
51
}
52
52
}
53
53
54
- // WithClock sets the clock used by the processor. Used for testing.
55
- func (p * Processor [T ]) WithClock (clock kclock.Clock ) * Processor [T ] {
56
- p .clock = clock
57
- return p
58
- }
59
-
60
54
// Enqueue adds a new item to the queue.
61
55
// If a item with the same ID already exists, it'll be replaced.
62
56
func (p * Processor [T ]) Enqueue (r T ) error {
@@ -149,7 +143,7 @@ func (p *Processor[T]) processLoop() {
149
143
var (
150
144
r T
151
145
ok bool
152
- t kclock .Timer
146
+ t clock .Timer
153
147
scheduledTime time.Time
154
148
deadline time.Duration
155
149
)
Original file line number Diff line number Diff line change
1
+ //go:build unit
2
+ // +build unit
3
+
4
+ /*
5
+ Copyright 2023 The Dapr Authors
6
+ Licensed under the Apache License, Version 2.0 (the "License");
7
+ you may not use this file except in compliance with the License.
8
+ You may obtain a copy of the License at
9
+ http://www.apache.org/licenses/LICENSE-2.0
10
+ Unless required by applicable law or agreed to in writing, software
11
+ distributed under the License is distributed on an "AS IS" BASIS,
12
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ See the License for the specific language governing permissions and
14
+ limitations under the License.
15
+ */
16
+
17
+ package queue
18
+
19
+ import (
20
+ "k8s.io/utils/clock"
21
+ )
22
+
23
+ // WithClock sets the clock used by the processor. Used for testing.
24
+ func (p * Processor [T ]) WithClock (clock clock.Clock ) * Processor [T ] {
25
+ p .clock = clock
26
+ return p
27
+ }
Original file line number Diff line number Diff line change
1
+ //go:build unit
2
+ // +build unit
3
+
4
+ /*
5
+ Copyright 2023 The Dapr Authors
6
+ Licensed under the Apache License, Version 2.0 (the "License");
7
+ you may not use this file except in compliance with the License.
8
+ You may obtain a copy of the License at
9
+ http://www.apache.org/licenses/LICENSE-2.0
10
+ Unless required by applicable law or agreed to in writing, software
11
+ distributed under the License is distributed on an "AS IS" BASIS,
12
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ See the License for the specific language governing permissions and
14
+ limitations under the License.
15
+ */
16
+
17
+ package queue
18
+
19
+ import (
20
+ "testing"
21
+ "time"
22
+
23
+ "github.com/stretchr/testify/assert"
24
+ clocktesting "k8s.io/utils/clock/testing"
25
+ )
26
+
27
+ func TestWithClock (t * testing.T ) {
28
+ p := NewProcessor (func (* queueableItem ) {})
29
+ fakeClock := clocktesting .NewFakeClock (time .Now ())
30
+ p .WithClock (fakeClock )
31
+ assert .Equal (t , fakeClock , p .clock )
32
+ }
You can’t perform that action at this time.
0 commit comments