@@ -11,7 +11,6 @@ import (
11
11
"github.com/metrico/qryn/writer/utils/logger"
12
12
"github.com/metrico/qryn/writer/utils/promise"
13
13
"github.com/metrico/qryn/writer/utils/stat"
14
- "golang.org/x/sync/semaphore"
15
14
"math/rand"
16
15
"sync"
17
16
"sync/atomic"
@@ -106,8 +105,6 @@ type InsertServiceV2 struct {
106
105
client ch_wrapper.IChClient
107
106
108
107
state int32
109
-
110
- bandwithLimiter * semaphore.Weighted
111
108
}
112
109
113
110
func (svc * InsertServiceV2 ) PlanFlush () {
@@ -206,14 +203,6 @@ func (svc *InsertServiceV2) Request(req helpers.SizeGetter, insertMode int) *pro
206
203
}
207
204
var size int64
208
205
size = req .GetSize ()
209
- to , cancel := context .WithTimeout (context .Background (), time .Second )
210
- defer cancel ()
211
- err := svc .bandwithLimiter .Acquire (to , size )
212
- if err != nil {
213
- logger .Info ("service overflow" )
214
- p .Done (0 , fmt .Errorf ("service overflow" ))
215
- return p
216
- }
217
206
func () {
218
207
var (
219
208
inserted int
@@ -226,7 +215,6 @@ func (svc *InsertServiceV2) Request(req helpers.SizeGetter, insertMode int) *pro
226
215
227
216
if err != nil || inserted == 0 {
228
217
p .Done (0 , err )
229
- svc .bandwithLimiter .Release (size )
230
218
return
231
219
}
232
220
svc .size += size
@@ -292,7 +280,6 @@ func (svc *InsertServiceV2) fetchLoopIteration() {
292
280
input [i ] = c .Input ()
293
281
size += int64 (svc .IngestSize (& input [i ]))
294
282
}
295
- svc .bandwithLimiter .Release (portion .size )
296
283
297
284
svc .setState (INSERT_STATE_INSERTING )
298
285
defer svc .setState (INSERT_STATE_IDLE )
@@ -420,25 +407,20 @@ func (svc *InsertServiceV2RoundRobin) init() {
420
407
logger .Info (fmt .Sprintf ("creating %d services" , svc .svcNum ))
421
408
svc .services = make ([]* InsertServiceV2 , svc .svcNum )
422
409
svc .rand = rand .New (rand .NewSource (time .Now ().UnixNano ()))
423
- var bandwidthLimit int64 = BANDWITH_LIMIT
424
- if svc .maxQueueSize * 2 > BANDWITH_LIMIT {
425
- bandwidthLimit = svc .maxQueueSize * 2
426
- }
427
410
for i := range svc .services {
428
411
svc .services [i ] = & InsertServiceV2 {
429
- ID : fmt .Sprintf ("%s-%s-%v" , svc .DatabaseNode .Node , svc .insertRequest , svc .AsyncInsert ),
430
- ServiceData : ServiceData {},
431
- V3Session : svc .V3Session ,
432
- DatabaseNode : svc .DatabaseNode ,
433
- AsyncInsert : svc .AsyncInsert ,
434
- OnBeforeInsert : svc .OnBeforeInsert ,
435
- pushInterval : svc .pushInterval ,
436
- maxQueueSize : svc .maxQueueSize ,
437
- insertRequest : svc .insertRequest ,
438
- acquireColumns : svc .acquireColumns ,
439
- processRequest : svc .processRequest ,
440
- bandwithLimiter : semaphore .NewWeighted (bandwidthLimit ),
441
- serviceType : svc .serviceType ,
412
+ ID : fmt .Sprintf ("%s-%s-%v" , svc .DatabaseNode .Node , svc .insertRequest , svc .AsyncInsert ),
413
+ ServiceData : ServiceData {},
414
+ V3Session : svc .V3Session ,
415
+ DatabaseNode : svc .DatabaseNode ,
416
+ AsyncInsert : svc .AsyncInsert ,
417
+ OnBeforeInsert : svc .OnBeforeInsert ,
418
+ pushInterval : svc .pushInterval ,
419
+ maxQueueSize : svc .maxQueueSize ,
420
+ insertRequest : svc .insertRequest ,
421
+ acquireColumns : svc .acquireColumns ,
422
+ processRequest : svc .processRequest ,
423
+ serviceType : svc .serviceType ,
442
424
}
443
425
svc .services [i ].Init ()
444
426
}
0 commit comments