diff --git a/events/batcher/batcher.go b/events/batcher/batcher.go index e6f58e9..b5734fa 100644 --- a/events/batcher/batcher.go +++ b/events/batcher/batcher.go @@ -52,11 +52,6 @@ func New[T key](interval time.Duration) *Batcher[T] { } } -// WithClock sets the clock used by the batcher. Used for testing. -func (b *Batcher[T]) WithClock(clock clock.WithDelayedExecution) { - b.clock = clock -} - // Subscribe adds a new event channel subscriber. If the batcher is closed, the // subscriber is silently dropped. func (b *Batcher[T]) Subscribe(eventCh ...chan<- struct{}) { diff --git a/events/batcher/batcher_test.go b/events/batcher/batcher_test.go index a62e856..b504de8 100644 --- a/events/batcher/batcher_test.go +++ b/events/batcher/batcher_test.go @@ -30,13 +30,6 @@ func TestNew(t *testing.T) { assert.False(t, b.closed.Load()) } -func TestWithClock(t *testing.T) { - b := New[string](time.Millisecond * 10) - fakeClock := testingclock.NewFakeClock(time.Now()) - b.WithClock(fakeClock) - assert.Equal(t, fakeClock, b.clock) -} - func TestSubscribe(t *testing.T) { t.Parallel() diff --git a/events/batcher/unit.go b/events/batcher/unit.go new file mode 100644 index 0000000..51dd412 --- /dev/null +++ b/events/batcher/unit.go @@ -0,0 +1,26 @@ +//go:build unit +// +build unit + +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package batcher + +import ( + "k8s.io/utils/clock" +) + +// WithClock sets the clock used by the batcher. Used for testing. +func (b *Batcher[T]) WithClock(clock clock.WithDelayedExecution) { + b.clock = clock +} diff --git a/events/batcher/unit_test.go b/events/batcher/unit_test.go new file mode 100644 index 0000000..e55a42c --- /dev/null +++ b/events/batcher/unit_test.go @@ -0,0 +1,32 @@ +//go:build unit +// +build unit + +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package batcher + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + clocktesting "k8s.io/utils/clock/testing" +) + +func TestWithClock(t *testing.T) { + b := New[string](time.Millisecond * 10) + fakeClock := clocktesting.NewFakeClock(time.Now()) + b.WithClock(fakeClock) + assert.Equal(t, fakeClock, b.clock) +} diff --git a/events/queue/processor.go b/events/queue/processor.go index b7661b5..5ebc0f7 100644 --- a/events/queue/processor.go +++ b/events/queue/processor.go @@ -19,7 +19,7 @@ import ( "sync/atomic" "time" - kclock "k8s.io/utils/clock" + "k8s.io/utils/clock" ) // ErrProcessorStopped is returned when the processor is not running. @@ -29,7 +29,7 @@ var ErrProcessorStopped = errors.New("processor is stopped") type Processor[T queueable] struct { executeFn func(r T) queue queue[T] - clock kclock.Clock + clock clock.Clock lock sync.Mutex wg sync.WaitGroup processorRunningCh chan struct{} @@ -47,16 +47,10 @@ func NewProcessor[T queueable](executeFn func(r T)) *Processor[T] { processorRunningCh: make(chan struct{}, 1), stopCh: make(chan struct{}), resetCh: make(chan struct{}, 1), - clock: kclock.RealClock{}, + clock: clock.RealClock{}, } } -// WithClock sets the clock used by the processor. Used for testing. -func (p *Processor[T]) WithClock(clock kclock.Clock) *Processor[T] { - p.clock = clock - return p -} - // Enqueue adds a new item to the queue. // If a item with the same ID already exists, it'll be replaced. func (p *Processor[T]) Enqueue(r T) error { @@ -149,7 +143,7 @@ func (p *Processor[T]) processLoop() { var ( r T ok bool - t kclock.Timer + t clock.Timer scheduledTime time.Time deadline time.Duration ) diff --git a/events/queue/processor_unit.go b/events/queue/processor_unit.go new file mode 100644 index 0000000..2e119ec --- /dev/null +++ b/events/queue/processor_unit.go @@ -0,0 +1,27 @@ +//go:build unit +// +build unit + +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "k8s.io/utils/clock" +) + +// WithClock sets the clock used by the processor. Used for testing. +func (p *Processor[T]) WithClock(clock clock.Clock) *Processor[T] { + p.clock = clock + return p +} diff --git a/events/queue/processor_unit_test.go b/events/queue/processor_unit_test.go new file mode 100644 index 0000000..d1b3d0d --- /dev/null +++ b/events/queue/processor_unit_test.go @@ -0,0 +1,32 @@ +//go:build unit +// +build unit + +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + clocktesting "k8s.io/utils/clock/testing" +) + +func TestWithClock(t *testing.T) { + p := NewProcessor(func(*queueableItem) {}) + fakeClock := clocktesting.NewFakeClock(time.Now()) + p.WithClock(fakeClock) + assert.Equal(t, fakeClock, p.clock) +}