Skip to content

Commit 9a94144

Browse files
Gal TopperGal Topper
andauthored
IG-15850: Add connection semaphore per context. (#66)
* Add connection semaphore per context. * Default to no semaphore. * int64 -> int Co-authored-by: Gal Topper <galt@iguazio.com>
1 parent 3bbe8d8 commit 9a94144

File tree

4 files changed

+21
-4
lines changed

4 files changed

+21
-4
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ require (
1818
github.com/valyala/fasthttp v1.2.0
1919
go.uber.org/atomic v1.3.2 // indirect
2020
go.uber.org/multierr v1.1.0 // indirect
21+
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
2122
zombiezen.com/go/capnproto2 v2.17.0+incompatible
2223
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
4444
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
4545
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
4646
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
47+
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o=
48+
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
4749
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
4850
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
4951
zombiezen.com/go/capnproto2 v2.17.0+incompatible h1:sIoKPFGNlM38Qh+PBLa9Wzg1j99oInS/Qlk+5N/CHa4=

pkg/dataplane/http/context.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package v3iohttp
22

33
import (
44
"bytes"
5+
goctx "context"
56
"crypto/tls"
67
"encoding/base64"
78
"encoding/json"
@@ -26,17 +27,19 @@ import (
2627
"github.com/nuclio/errors"
2728
"github.com/nuclio/logger"
2829
"github.com/valyala/fasthttp"
30+
"golang.org/x/sync/semaphore"
2931
"zombiezen.com/go/capnproto2"
3032
)
3133

3234
// TODO: Request should have a global pool
3335
var requestID uint64
3436

3537
type context struct {
36-
logger logger.Logger
37-
requestChan chan *v3io.Request
38-
httpClient *fasthttp.Client
39-
numWorkers int
38+
logger logger.Logger
39+
requestChan chan *v3io.Request
40+
httpClient *fasthttp.Client
41+
numWorkers int
42+
connSemaphore *semaphore.Weighted
4043
}
4144

4245
type NewClientInput struct {
@@ -89,6 +92,10 @@ func NewContext(parentLogger logger.Logger, newContextInput *NewContextInput) (v
8992
numWorkers: numWorkers,
9093
}
9194

95+
if newContextInput.MaxConns > 0 {
96+
newContext.connSemaphore = semaphore.NewWeighted(int64(newContextInput.MaxConns))
97+
}
98+
9299
for workerIndex := 0; workerIndex < numWorkers; workerIndex++ {
93100
go newContext.workerEntry(workerIndex)
94101
}
@@ -997,11 +1004,17 @@ func (c *context) sendRequest(dataPlaneInput *v3io.DataPlaneInput,
9971004
"method", method,
9981005
"body-length", len(body))
9991006

1007+
if c.connSemaphore != nil {
1008+
c.connSemaphore.Acquire(goctx.TODO(), 1)
1009+
}
10001010
if dataPlaneInput.Timeout <= 0 {
10011011
err = c.httpClient.Do(request, response.HTTPResponse)
10021012
} else {
10031013
err = c.httpClient.DoTimeout(request, response.HTTPResponse, dataPlaneInput.Timeout)
10041014
}
1015+
if c.connSemaphore != nil {
1016+
c.connSemaphore.Release(1)
1017+
}
10051018

10061019
if err != nil {
10071020
goto cleanup

pkg/dataplane/http/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ type NewContextInput struct {
66
HTTPClient *fasthttp.Client
77
NumWorkers int
88
RequestChanLen int
9+
MaxConns int
910
}

0 commit comments

Comments
 (0)