@@ -15,16 +15,13 @@ import (
1515 "github.com/stretchr/testify/require"
1616
1717 "go.opentelemetry.io/collector/component/componenttest"
18+ "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1819)
1920
20- type sizerInt64 struct {}
21-
22- func (s sizerInt64 ) Sizeof (el int64 ) int64 {
23- return el
24- }
25-
2621func TestMemoryQueue (t * testing.T ) {
27- q := newMemoryQueue [int64 ](Settings [int64 ]{Sizer : sizerInt64 {}, Capacity : 7 })
22+ set := newSettings (request .SizerTypeItems , 7 )
23+ q := newMemoryQueue [int64 ](set )
24+ require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
2825 require .NoError (t , q .Offer (context .Background (), 1 ))
2926 assert .EqualValues (t , 1 , q .Size ())
3027 assert .EqualValues (t , 7 , q .Capacity ())
@@ -50,10 +47,14 @@ func TestMemoryQueue(t *testing.T) {
5047
5148 require .NoError (t , q .Shutdown (context .Background ()))
5249 assert .False (t , consume (q , func (context.Context , int64 ) error { t .FailNow (); return nil }))
50+ require .NoError (t , q .Shutdown (context .Background ()))
5351}
5452
5553func TestMemoryQueueBlockingCancelled (t * testing.T ) {
56- q := newMemoryQueue [int64 ](Settings [int64 ]{Sizer : sizerInt64 {}, Capacity : 5 , BlockOnOverflow : true })
54+ set := newSettings (request .SizerTypeItems , 5 )
55+ set .BlockOnOverflow = true
56+ q := newMemoryQueue [int64 ](set )
57+ require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
5758 require .NoError (t , q .Offer (context .Background (), 3 ))
5859 ctx , cancel := context .WithCancel (context .Background ())
5960 wg := sync.WaitGroup {}
@@ -73,7 +74,9 @@ func TestMemoryQueueBlockingCancelled(t *testing.T) {
7374}
7475
7576func TestMemoryQueueDrainWhenShutdown (t * testing.T ) {
76- q := newMemoryQueue [int64 ](Settings [int64 ]{Sizer : sizerInt64 {}, Capacity : 7 })
77+ set := newSettings (request .SizerTypeItems , 7 )
78+ q := newMemoryQueue [int64 ](set )
79+ require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
7780 require .NoError (t , q .Offer (context .Background (), 1 ))
7881 require .NoError (t , q .Offer (context .Background (), 3 ))
7982
@@ -90,28 +93,40 @@ func TestMemoryQueueDrainWhenShutdown(t *testing.T) {
9093 }))
9194 assert .EqualValues (t , 0 , q .Size ())
9295 assert .False (t , consume (q , func (context.Context , int64 ) error { t .FailNow (); return nil }))
96+ require .NoError (t , q .Shutdown (context .Background ()))
9397}
9498
9599func TestMemoryQueueOfferInvalidSize (t * testing.T ) {
96- q := newMemoryQueue [int64 ](Settings [int64 ]{Sizer : sizerInt64 {}, Capacity : 1 })
100+ set := newSettings (request .SizerTypeItems , 1 )
101+ q := newMemoryQueue [int64 ](set )
102+ require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
97103 require .ErrorIs (t , q .Offer (context .Background (), - 1 ), errInvalidSize )
104+ require .NoError (t , q .Shutdown (context .Background ()))
98105}
99106
100107func TestMemoryQueueRejectOverCapacityElements (t * testing.T ) {
101- q := newMemoryQueue [int64 ](Settings [int64 ]{Sizer : sizerInt64 {}, Capacity : 7 , BlockOnOverflow : true })
108+ set := newSettings (request .SizerTypeItems , 1 )
109+ set .BlockOnOverflow = true
110+ q := newMemoryQueue [int64 ](set )
111+ require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
102112 require .ErrorIs (t , q .Offer (context .Background (), 8 ), errSizeTooLarge )
113+ require .NoError (t , q .Shutdown (context .Background ()))
103114}
104115
105116func TestMemoryQueueOfferZeroSize (t * testing.T ) {
106- q := newMemoryQueue [int64 ](Settings [int64 ]{Sizer : sizerInt64 {}, Capacity : 1 })
117+ set := newSettings (request .SizerTypeItems , 1 )
118+ q := newMemoryQueue [int64 ](set )
119+ require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
107120 require .NoError (t , q .Offer (context .Background (), 0 ))
108121 require .NoError (t , q .Shutdown (context .Background ()))
109122 // Because the size 0 is ignored, nothing to drain.
110123 assert .False (t , consume (q , func (context.Context , int64 ) error { t .FailNow (); return nil }))
111124}
112125
113- func TestMemoryQueueZeroCapacity (t * testing.T ) {
114- q := newMemoryQueue [int64 ](Settings [int64 ]{Sizer : sizerInt64 {}, Capacity : 1 })
126+ func TestMemoryQueueOverflow (t * testing.T ) {
127+ set := newSettings (request .SizerTypeItems , 1 )
128+ q := newMemoryQueue [int64 ](set )
129+ require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
115130 require .NoError (t , q .Offer (context .Background (), 1 ))
116131 require .ErrorIs (t , q .Offer (context .Background (), 1 ), ErrQueueIsFull )
117132 require .NoError (t , q .Shutdown (context .Background ()))
@@ -120,7 +135,9 @@ func TestMemoryQueueZeroCapacity(t *testing.T) {
120135func TestMemoryQueueWaitForResultPassErrorBack (t * testing.T ) {
121136 wg := sync.WaitGroup {}
122137 myErr := errors .New ("test error" )
123- q := newMemoryQueue [int64 ](Settings [int64 ]{Sizer : sizerInt64 {}, Capacity : 100 , WaitForResult : true })
138+ set := newSettings (request .SizerTypeItems , 100 )
139+ set .WaitForResult = true
140+ q := newMemoryQueue [int64 ](set )
124141 require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
125142 wg .Add (1 )
126143 go func () {
@@ -138,7 +155,9 @@ func TestMemoryQueueWaitForResultPassErrorBack(t *testing.T) {
138155func TestMemoryQueueWaitForResultCancelIncomingRequest (t * testing.T ) {
139156 wg := sync.WaitGroup {}
140157 stop := make (chan struct {})
141- q := newMemoryQueue [int64 ](Settings [int64 ]{Sizer : sizerInt64 {}, Capacity : 100 , WaitForResult : true })
158+ set := newSettings (request .SizerTypeItems , 100 )
159+ set .WaitForResult = true
160+ q := newMemoryQueue [int64 ](set )
142161 require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
143162
144163 // Consume async new data.
@@ -167,7 +186,9 @@ func TestMemoryQueueWaitForResultCancelIncomingRequest(t *testing.T) {
167186func TestMemoryQueueWaitForResultSizeAndCapacity (t * testing.T ) {
168187 wg := sync.WaitGroup {}
169188 stop := make (chan struct {})
170- q := newMemoryQueue [int64 ](Settings [int64 ]{Sizer : sizerInt64 {}, Capacity : 100 , WaitForResult : true })
189+ set := newSettings (request .SizerTypeItems , 100 )
190+ set .WaitForResult = true
191+ q := newMemoryQueue [int64 ](set )
171192 require .NoError (t , q .Start (context .Background (), componenttest .NewNopHost ()))
172193
173194 // Consume async new data.
@@ -197,7 +218,9 @@ func TestMemoryQueueWaitForResultSizeAndCapacity(t *testing.T) {
197218func BenchmarkMemoryQueueWaitForResult (b * testing.B ) {
198219 wg := sync.WaitGroup {}
199220 consumed := & atomic.Int64 {}
200- q := newMemoryQueue [int64 ](Settings [int64 ]{Sizer : sizerInt64 {}, Capacity : 1000 , WaitForResult : true })
221+ set := newSettings (request .SizerTypeItems , 100 )
222+ set .WaitForResult = true
223+ q := newMemoryQueue [int64 ](set )
201224 require .NoError (b , q .Start (context .Background (), componenttest .NewNopHost ()))
202225
203226 // Consume async new data.
0 commit comments