Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Commit b2ca5f1

Browse files
authored
Merge pull request #1795 from jaffee/close-client-responses
Ensure internal client closes all response bodies
2 parents 4a2ab77 + 31ab3d3 commit b2ca5f1

File tree

3 files changed

+125
-6
lines changed

3 files changed

+125
-6
lines changed

http/client.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (c *InternalClient) CreateIndex(ctx context.Context, index string, opt pilo
164164
}
165165
return err
166166
}
167-
return nil
167+
return errors.Wrap(resp.Body.Close(), "closing response body")
168168
}
169169

170170
// FragmentNodes returns a list of nodes that own a shard.
@@ -705,6 +705,9 @@ func (c *InternalClient) exportNodeCSV(ctx context.Context, node *pilosa.Node, i
705705
return nil
706706
}
707707

708+
// RetrieveShardFromURI returns a ReadCloser which contains the data of the
709+
// specified shard from the specified node. Caller *must* close the returned
710+
// ReadCloser or risk leaking goroutines/tcp connections.
708711
func (c *InternalClient) RetrieveShardFromURI(ctx context.Context, index, field, view string, shard uint64, uri pilosa.URI) (io.ReadCloser, error) {
709712
span, ctx := tracing.StartSpanFromContext(ctx, "InternalClient.RetrieveShardFromURI")
710713
defer span.Finish()
@@ -800,7 +803,7 @@ func (c *InternalClient) CreateFieldWithOptions(ctx context.Context, index, fiel
800803
return err
801804
}
802805

803-
return nil
806+
return errors.Wrap(resp.Body.Close(), "closing response body")
804807
}
805808

806809
// FragmentBlocks returns a list of block checksums for a fragment on a host.
@@ -994,15 +997,24 @@ func (c *InternalClient) SendMessage(ctx context.Context, uri *pilosa.URI, msg [
994997
req.Header.Set("Accept", "application/json")
995998

996999
// Execute request.
997-
_, err = c.executeRequest(req.WithContext(ctx))
998-
return err
1000+
resp, err := c.executeRequest(req.WithContext(ctx))
1001+
if err != nil {
1002+
return errors.Wrap(err, "executing request")
1003+
}
1004+
return errors.Wrap(resp.Body.Close(), "closing response body")
9991005
}
10001006

1001-
// executeRequest executes the given request and checks the Response
1007+
// executeRequest executes the given request and checks the Response. For
1008+
// responses with non-2XX status, the body is read and closed, and an error is
1009+
// returned. If the error is nil, the caller must ensure that the response body
1010+
// is closed.
10021011
func (c *InternalClient) executeRequest(req *http.Request) (*http.Response, error) {
10031012
tracing.GlobalTracer.InjectHTTPHeaders(req)
10041013
resp, err := c.httpClient.Do(req)
10051014
if err != nil {
1015+
if resp != nil {
1016+
resp.Body.Close()
1017+
}
10061018
return nil, errors.Wrap(err, "executing request")
10071019
}
10081020
if resp.StatusCode < 200 || resp.StatusCode >= 300 {

pilosa.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ var (
5656
ErrQueryTimeout = errors.New("query timeout")
5757
ErrTooManyWrites = errors.New("too many write commands")
5858

59-
ErrClusterDoesNotOwnShard = errors.New("cluster does not own shard")
59+
// TODO(2.0) poorly named - used when a *node* doesn't own a shard. Probably
60+
// we won't need this error at all by 2.0 though.
61+
ErrClusterDoesNotOwnShard = errors.New("node does not own shard")
6062

6163
ErrNodeIDNotExists = errors.New("node with provided ID does not exist")
6264
ErrNodeNotCoordinator = errors.New("node is not the coordinator")

server/server_test.go

+105
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
package server_test
1616

1717
import (
18+
"bytes"
1819
"context"
1920
"encoding/json"
21+
"flag"
2022
"fmt"
2123
"io/ioutil"
2224
"math/rand"
@@ -28,13 +30,22 @@ import (
2830
"testing/quick"
2931
"time"
3032

33+
"golang.org/x/sync/errgroup"
34+
3135
"github.com/pelletier/go-toml"
3236
"github.com/pilosa/pilosa"
3337
"github.com/pilosa/pilosa/http"
38+
"github.com/pilosa/pilosa/roaring"
3439
"github.com/pilosa/pilosa/server"
3540
"github.com/pilosa/pilosa/test"
3641
)
3742

43+
var runStress bool
44+
45+
func init() { // nolint: gochecknoinits
46+
flag.BoolVar(&runStress, "stress", false, "Enable stress tests (time consuming)")
47+
}
48+
3849
// Ensure program can process queries and maintain consistency.
3950
func TestMain_Set_Quick(t *testing.T) {
4051
if testing.Short() {
@@ -754,3 +765,97 @@ func TestClusterQueriesAfterRestart(t *testing.T) {
754765
}
755766

756767
// TODO: confirm that things keep working if a node is hard-closed (no nodeLeave event) and immediately restarted with a different address.
768+
769+
func TestClusterExhaustingConnections(t *testing.T) {
770+
if !runStress {
771+
t.Skip("stress")
772+
}
773+
cluster := test.MustRunCluster(t, 5)
774+
defer cluster.Close()
775+
cmd1 := cluster[1]
776+
777+
for _, com := range cluster {
778+
nodes := com.API.Hosts(context.Background())
779+
for _, n := range nodes {
780+
if n.State != "READY" {
781+
t.Fatalf("unexpected node state after upping cluster: %v", nodes)
782+
}
783+
}
784+
}
785+
786+
cmd1.MustCreateIndex(t, "testidx", pilosa.IndexOptions{})
787+
cmd1.MustCreateField(t, "testidx", "testfield", pilosa.OptFieldTypeSet(pilosa.CacheTypeRanked, 10))
788+
789+
eg := errgroup.Group{}
790+
for i := 0; i < 20; i++ {
791+
i := i
792+
eg.Go(func() error {
793+
for j := i; j < 10000; j += 20 {
794+
_, err := cluster[i%5].API.Query(context.Background(), &pilosa.QueryRequest{
795+
Index: "testidx",
796+
Query: fmt.Sprintf("Set(%d, testfield=0)", j*pilosa.ShardWidth),
797+
})
798+
if err != nil {
799+
return err
800+
}
801+
}
802+
return nil
803+
})
804+
}
805+
err := eg.Wait()
806+
if err != nil {
807+
t.Fatalf("setting lots of shards: %v", err)
808+
}
809+
}
810+
811+
func TestClusterExhaustingConnectionsImport(t *testing.T) {
812+
if !runStress {
813+
t.Skip("stress")
814+
}
815+
cluster := test.MustRunCluster(t, 5)
816+
defer cluster.Close()
817+
cmd1 := cluster[1]
818+
819+
for _, com := range cluster {
820+
nodes := com.API.Hosts(context.Background())
821+
for _, n := range nodes {
822+
if n.State != "READY" {
823+
t.Fatalf("unexpected node state after upping cluster: %v", nodes)
824+
}
825+
}
826+
}
827+
828+
cmd1.MustCreateIndex(t, "testidx", pilosa.IndexOptions{})
829+
cmd1.MustCreateField(t, "testidx", "testfield", pilosa.OptFieldTypeSet(pilosa.CacheTypeRanked, 10))
830+
831+
bm := roaring.NewBitmap()
832+
bm.DirectAdd(0)
833+
buf := &bytes.Buffer{}
834+
bm.WriteTo(buf)
835+
data := buf.Bytes()
836+
837+
eg := errgroup.Group{}
838+
for i := uint64(0); i < 20; i++ {
839+
i := i
840+
eg.Go(func() error {
841+
for j := i; j < 10000; j += 20 {
842+
if (j-i)%1000 == 0 {
843+
fmt.Printf("%d is %.2f%% done.\n", i, float64(j-i)*100/100000)
844+
}
845+
err := cluster[i%5].API.ImportRoaring(context.Background(), "testidx", "testfield", j, false, &pilosa.ImportRoaringRequest{
846+
Views: map[string][]byte{
847+
"": data,
848+
},
849+
})
850+
if err != nil {
851+
return err
852+
}
853+
}
854+
return nil
855+
})
856+
}
857+
err := eg.Wait()
858+
if err != nil {
859+
t.Fatalf("setting lots of shards: %v", err)
860+
}
861+
}

0 commit comments

Comments
 (0)