Skip to content

Commit aac0bf8

Browse files
fix: fix read split buffer function gr block bug (#18)
1 parent afec806 commit aac0bf8

5 files changed

Lines changed: 41 additions & 8 deletions

File tree

example/dubbo/go-client/cmd/client.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ package main
2020
import (
2121
"context"
2222
"fmt"
23+
"net/http"
24+
_ "net/http/pprof"
25+
"runtime"
2326
"sync"
2427
"time"
2528
)
@@ -47,10 +50,13 @@ var greeterProvider = new(pkg.GreeterClientImpl)
4750

4851
func init() {
4952
config.SetConsumerService(greeterProvider)
53+
runtime.SetMutexProfileFraction(1)
5054
}
51-
5255
// need to setup environment variable "CONF_CONSUMER_FILE_PATH" to "conf/client.yml" before run
5356
func main() {
57+
go func() {
58+
_ = http.ListenAndServe("0.0.0.0:6061", nil)
59+
}()
5460
config.Load()
5561
time.Sleep(time.Second * 3)
5662
testSayHello()
@@ -94,7 +100,7 @@ func testSayHelloWithHighParallel() {
94100
wg := sync.WaitGroup{}
95101
goodCounter := atomic.Uint32{}
96102
badCounter := atomic.Uint32{}
97-
for i := 0; i < 1000; i ++{
103+
for i := 0; i < 2000; i ++{
98104
wg.Add(1)
99105
go func() {
100106
defer wg.Done()

example/dubbo/go-server/cmd/server.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ package main
1919

2020
import (
2121
"fmt"
22+
"net/http"
23+
_ "net/http/pprof"
2224
"os"
2325
"os/signal"
26+
"runtime"
2427
"syscall"
2528
"time"
2629
)
@@ -36,20 +39,28 @@ import (
3639
_ "dubbo.apache.org/dubbo-go/v3/registry/nacos"
3740
_ "dubbo.apache.org/dubbo-go/v3/registry/protocol"
3841
_ "dubbo.apache.org/dubbo-go/v3/registry/zookeeper"
42+
43+
_ "github.com/dubbogo/triple/pkg/triple"
3944
)
4045

4146
import (
4247
"github.com/dubbogo/triple/example/dubbo/go-server/pkg"
43-
_ "github.com/dubbogo/triple/pkg/triple"
4448
)
4549

4650
var (
4751
survivalTimeout = int(3 * time.Second)
4852
)
4953

54+
func init() {
55+
runtime.SetMutexProfileFraction(1)
56+
}
57+
5058
// need to setup environment variable "CONF_PROVIDER_FILE_PATH" to "conf/server.yml" before run
5159
func main() {
5260
config.SetProviderService(pkg.NewGreeterProvider())
61+
go func() {
62+
_ = http.ListenAndServe("0.0.0.0:6060", nil)
63+
}()
5364
config.Load()
5465
initSignal()
5566
}

example/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ go 1.13
44

55
require (
66
dubbo.apache.org/dubbo-go/v3 v3.0.0-rc2
7-
github.com/dubbogo/gost v1.11.16
87
github.com/dubbogo/triple v1.0.1
98
github.com/golang/protobuf v1.5.2
109
github.com/stretchr/testify v1.7.0

pkg/http2/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package http2
22

33
import (
44
"bytes"
5+
"context"
56
"crypto/tls"
67
"net"
78
"net/http"
@@ -84,7 +85,7 @@ func (h *Client) StreamPost(addr, path string, sendChan chan *bytes.Buffer, opts
8485
close(closeChan)
8586
return
8687
}
87-
ch := readSplitData(rsp.Body)
88+
ch := readSplitData(context.Background(), rsp.Body)
8889
Loop:
8990
for {
9091
select {

pkg/http2/server.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package http2
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/binary"
67
"fmt"
78
"io"
@@ -179,7 +180,7 @@ func skipHeader(frameData []byte) ([]byte, uint32) {
179180
return frameData[5:], length
180181
}
181182

182-
func readSplitData(rBody io.ReadCloser) chan *bytes.Buffer {
183+
func readSplitData(ctx context.Context, rBody io.ReadCloser) chan *bytes.Buffer {
183184
cbm := make(chan *bytes.Buffer)
184185
go func() {
185186
buf := make([]byte, 4098) // todo configurable
@@ -224,7 +225,14 @@ func readSplitData(rBody io.ReadCloser) chan *bytes.Buffer {
224225
if err != nil {
225226
fmt.Printf("read SplitedDatas error = %v\n", err)
226227
}
227-
cbm <- bytes.NewBuffer(allDataBody)
228+
select {
229+
case <-ctx.Done():
230+
close(cbm)
231+
return
232+
default:
233+
cbm <- bytes.NewBuffer(allDataBody)
234+
}
235+
228236
// temp data is sent, and reset wanting data size
229237
fromFrameHeaderDataSize = 0
230238
}
@@ -236,7 +244,15 @@ func readSplitData(rBody io.ReadCloser) chan *bytes.Buffer {
236244

237245
func (s *Server) http2HandleFunction(wi http.ResponseWriter, r *http.Request) {
238246
// body data from http
239-
bodyCh := readSplitData(r.Body)
247+
ctx, cancel := context.WithCancel(context.Background())
248+
bodyCh := readSplitData(ctx, r.Body)
249+
defer func() {
250+
cancel()
251+
select {
252+
case <-bodyCh:
253+
default:
254+
}
255+
}()
240256
sendChan := make(chan *bytes.Buffer)
241257
ctrlChan := make(chan http.Header)
242258
errChan := make(chan interface{})

0 commit comments

Comments
 (0)