Skip to content

Commit 37aa6d3

Browse files
authored
feat: Bring back pipelining feature to Golang client (#174)
* feat: add initial implementation of new pipeline API * feat: initial working version of pipelining * feat: add atomic commands * feat: add Pipeline method to DMap interface * feat: add Discard and Close to DMapPipeline * feat: add TestDMapPipeline_ErrNotReady * fix: use the same key to test GetPut * chore: add documentation to Pipeline implementation * chore: add missing docs to Client interface * feat: add ErrConnRefused to deal with dead cluster members * chore: add inline documentation
1 parent b661514 commit 37aa6d3

File tree

10 files changed

+1120
-34
lines changed

10 files changed

+1120
-34
lines changed

client.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ const DefaultScanCount = 10
2727

2828
// Member denotes a member of the Olric cluster.
2929
type Member struct {
30-
// Member name in the cluster
30+
// Member name in the cluster. It's also host:port of the node.
3131
Name string
3232

3333
// ID of the Member in the cluster. Hash of Name and Birthdate of the member
@@ -178,7 +178,8 @@ type DMap interface {
178178
// after being decremented or an error.
179179
Decr(ctx context.Context, key string, delta int) (int, error)
180180

181-
// GetPut atomically sets the key to value and returns the old value stored at key.
181+
// GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no
182+
// previous value.
182183
GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error)
183184

184185
// IncrByFloat atomically increments the key by delta. The return value is the new value
@@ -223,6 +224,19 @@ type DMap interface {
223224
// is no global lock on DMaps. So if you call Put/PutEx and Destroy methods
224225
// concurrently on the cluster, Put call may set new values to the DMap.
225226
Destroy(ctx context.Context) error
227+
228+
// Pipeline is a mechanism to realise Redis Pipeline technique.
229+
//
230+
// Pipelining is a technique to extremely speed up processing by packing
231+
// operations to batches, send them at once to Redis and read a replies in a
232+
// singe step.
233+
// See https://redis.io/topics/pipelining
234+
//
235+
// Pay attention, that Pipeline is not a transaction, so you can get unexpected
236+
// results in case of big pipelines and small read/write timeouts.
237+
// Redis client has retransmission logic in case of timeouts, pipeline
238+
// can be retransmitted and commands can be executed more than once.
239+
Pipeline() (*DMapPipeline, error)
226240
}
227241

228242
type statsConfig struct {
@@ -275,6 +289,10 @@ type Client interface {
275289
// Members returns a thread-safe list of cluster members.
276290
Members(ctx context.Context) ([]Member, error)
277291

292+
// RefreshMetadata fetches a list of available members and the latest routing
293+
// table version. It also closes stale clients, if there are any.
294+
RefreshMetadata(ctx context.Context) error
295+
278296
// Close stops background routines and frees allocated resources.
279297
Close(ctx context.Context) error
280298
}

cluster_client.go

Lines changed: 82 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@ package olric
1717
import (
1818
"context"
1919
"encoding/json"
20+
"errors"
2021
"fmt"
21-
"github.com/buraksezer/olric/hasher"
2222
"log"
23+
"net"
2324
"os"
2425
"sync"
2526
"sync/atomic"
27+
"syscall"
2628
"time"
2729

2830
"github.com/buraksezer/olric/config"
31+
"github.com/buraksezer/olric/hasher"
2932
"github.com/buraksezer/olric/internal/bufpool"
3033
"github.com/buraksezer/olric/internal/cluster/partitions"
3134
"github.com/buraksezer/olric/internal/discovery"
@@ -69,6 +72,10 @@ func processProtocolError(err error) error {
6972
if err == redis.Nil {
7073
return ErrKeyNotFound
7174
}
75+
if errors.Is(err, syscall.ECONNREFUSED) {
76+
opErr := err.(*net.OpError)
77+
return fmt.Errorf("%s %s %s: %w", opErr.Op, opErr.Net, opErr.Addr, ErrConnRefused)
78+
}
7279
return convertDMapError(protocol.ConvertError(err))
7380
}
7481

@@ -95,10 +102,7 @@ func (dm *ClusterDMap) writePutCommand(c *dmap.PutConfig, key string, value []by
95102
return cmd
96103
}
97104

98-
func (cl *ClusterClient) smartPick(dmap, key string) (*redis.Client, error) {
99-
hkey := partitions.HKey(dmap, key)
100-
partID := hkey % cl.partitionCount
101-
105+
func (cl *ClusterClient) clientByPartID(partID uint64) (*redis.Client, error) {
102106
raw := cl.routingTable.Load()
103107
if raw == nil {
104108
return nil, fmt.Errorf("routing table is empty")
@@ -118,6 +122,12 @@ func (cl *ClusterClient) smartPick(dmap, key string) (*redis.Client, error) {
118122
return cl.client.Get(primaryOwner), nil
119123
}
120124

125+
func (cl *ClusterClient) smartPick(dmap, key string) (*redis.Client, error) {
126+
hkey := partitions.HKey(dmap, key)
127+
partID := hkey % cl.partitionCount
128+
return cl.clientByPartID(partID)
129+
}
130+
121131
// Put sets the value for the given key. It overwrites any previous value for
122132
// that key, and it's thread-safe. The key has to be a string. value type is arbitrary.
123133
// It is safe to modify the contents of the arguments after Put returns but not before.
@@ -150,31 +160,33 @@ func (dm *ClusterDMap) Put(ctx context.Context, key string, value interface{}, o
150160
return processProtocolError(cmd.Err())
151161
}
152162

163+
func (dm *ClusterDMap) makeGetResponse(cmd *redis.StringCmd) (*GetResponse, error) {
164+
raw, err := cmd.Bytes()
165+
if err != nil {
166+
return nil, processProtocolError(err)
167+
}
168+
169+
e := dm.newEntry()
170+
e.Decode(raw)
171+
return &GetResponse{
172+
entry: e,
173+
}, nil
174+
}
175+
153176
// Get gets the value for the given key. It returns ErrKeyNotFound if the DB
154177
// does not contain the key. It's thread-safe. It is safe to modify the contents
155178
// of the returned value. See GetResponse for the details.
156179
func (dm *ClusterDMap) Get(ctx context.Context, key string) (*GetResponse, error) {
180+
cmd := protocol.NewGet(dm.name, key).SetRaw().Command(ctx)
157181
rc, err := dm.clusterClient.smartPick(dm.name, key)
158182
if err != nil {
159183
return nil, err
160184
}
161-
162-
cmd := protocol.NewGet(dm.name, key).SetRaw().Command(ctx)
163185
err = rc.Process(ctx, cmd)
164186
if err != nil {
165187
return nil, processProtocolError(err)
166188
}
167-
168-
raw, err := cmd.Bytes()
169-
if err != nil {
170-
return nil, processProtocolError(err)
171-
}
172-
173-
e := dm.newEntry()
174-
e.Decode(raw)
175-
return &GetResponse{
176-
entry: e,
177-
}, nil
189+
return dm.makeGetResponse(cmd)
178190
}
179191

180192
// Delete deletes values for the given keys. Delete will not return error
@@ -241,7 +253,8 @@ func (dm *ClusterDMap) Decr(ctx context.Context, key string, delta int) (int, er
241253
return int(res), nil
242254
}
243255

244-
// GetPut atomically sets the key to value and returns the old value stored at key.
256+
// GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no
257+
// previous value.
245258
func (dm *ClusterDMap) GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error) {
246259
rc, err := dm.clusterClient.smartPick(dm.name, key)
247260
if err != nil {
@@ -604,6 +617,43 @@ func (cl *ClusterClient) Members(ctx context.Context) ([]Member, error) {
604617
return members, nil
605618
}
606619

620+
// RefreshMetadata fetches a list of available members and the latest routing
621+
// table version. It also closes stale clients, if there are any.
622+
func (cl *ClusterClient) RefreshMetadata(ctx context.Context) error {
623+
// Fetch a list of currently available cluster members.
624+
var members []Member
625+
var err error
626+
for {
627+
members, err = cl.Members(ctx)
628+
if errors.Is(err, ErrConnRefused) {
629+
err = nil
630+
continue
631+
}
632+
if err != nil {
633+
return err
634+
}
635+
break
636+
}
637+
// Use a map for fast access.
638+
addresses := make(map[string]struct{})
639+
for _, member := range members {
640+
addresses[member.Name] = struct{}{}
641+
}
642+
643+
// Clean stale client connections
644+
for addr := range cl.client.Addresses() {
645+
if _, ok := addresses[addr]; !ok {
646+
// Gone
647+
if err := cl.client.Close(addr); err != nil {
648+
return err
649+
}
650+
}
651+
}
652+
653+
// Re-fetch the routing table, we should use the latest routing table version.
654+
return cl.fetchRoutingTable()
655+
}
656+
607657
// Close stops background routines and frees allocated resources.
608658
func (cl *ClusterClient) Close(ctx context.Context) error {
609659
select {
@@ -705,7 +755,7 @@ func (cl *ClusterClient) fetchRoutingTablePeriodically() {
705755
case <-ticker.C:
706756
err := cl.fetchRoutingTable()
707757
if err != nil {
708-
cl.logger.Printf("[ERROR] Failed to fetch the latest routing table: %s", err)
758+
cl.logger.Printf("[ERROR] Failed to fetch the latest version of the routing table: %s", err)
709759
}
710760
}
711761
}
@@ -754,13 +804,24 @@ func NewClusterClient(addresses []string, options ...ClusterClientOption) (*Clus
754804
cl.client.Get(address)
755805
}
756806

807+
// Discover all cluster members
808+
members, err := cl.Members(ctx)
809+
if err != nil {
810+
return nil, fmt.Errorf("error while discovering the cluster members: %w", err)
811+
}
812+
for _, member := range members {
813+
cl.client.Get(member.Name)
814+
}
815+
816+
// Hash function is required to target primary owners instead of random cluster members.
757817
partitions.SetHashFunc(cc.hasher)
758818

759-
// Initial fetch.
819+
// Initial fetch. ClusterClient targets the primary owners for a smooth and quick operation.
760820
if err := cl.fetchRoutingTable(); err != nil {
761821
return nil, err
762822
}
763823

824+
// Refresh the routing table in every 15 seconds.
764825
cl.wg.Add(1)
765826
go cl.fetchRoutingTablePeriodically()
766827

cluster_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ package olric
1616

1717
import (
1818
"context"
19-
"github.com/buraksezer/olric/hasher"
2019
"log"
2120
"os"
2221
"testing"
2322
"time"
2423

2524
"github.com/buraksezer/olric/config"
25+
"github.com/buraksezer/olric/hasher"
2626
"github.com/buraksezer/olric/internal/testutil"
2727
"github.com/buraksezer/olric/stats"
2828
"github.com/stretchr/testify/require"

cluster_iterator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func (i *ClusterIterator) fetchRoutingTablePeriodically() {
273273
return
274274
case <-time.After(time.Second):
275275
if err := i.fetchRoutingTable(); err != nil {
276-
i.logger.Printf("[ERROR] Failed to fetch the latest routing table: %s", err)
276+
i.logger.Printf("[ERROR] Failed to fetch the latest version of the routing table: %s", err)
277277
}
278278
}
279279
}

embedded_client.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,37 @@ type EmbeddedDMap struct {
6161
storageEngine string
6262
}
6363

64+
// Pipeline is a mechanism to realise Redis Pipeline technique.
65+
//
66+
// Pipelining is a technique to extremely speed up processing by packing
67+
// operations to batches, send them at once to Redis and read a replies in a
68+
// singe step.
69+
// See https://redis.io/topics/pipelining
70+
//
71+
// Pay attention, that Pipeline is not a transaction, so you can get unexpected
72+
// results in case of big pipelines and small read/write timeouts.
73+
// Redis client has retransmission logic in case of timeouts, pipeline
74+
// can be retransmitted and commands can be executed more than once.
75+
func (dm *EmbeddedDMap) Pipeline() (*DMapPipeline, error) {
76+
cc, err := NewClusterClient([]string{dm.client.db.rt.This().String()})
77+
if err != nil {
78+
return nil, err
79+
}
80+
cdm, err := cc.NewDMap(dm.name)
81+
if err != nil {
82+
return nil, err
83+
}
84+
return cdm.Pipeline()
85+
}
86+
87+
// RefreshMetadata fetches a list of available members and the latest routing
88+
// table version. It also closes stale clients, if there are any. EmbeddedClient has
89+
// this method to implement the Client interface. It doesn't need to refresh metadata manually.
90+
func (e *EmbeddedClient) RefreshMetadata(_ context.Context) error {
91+
// EmbeddedClient already has the latest metadata.
92+
return nil
93+
}
94+
6495
// Scan returns an iterator to loop over the keys.
6596
//
6697
// Available scan options:
@@ -152,7 +183,8 @@ func (dm *EmbeddedDMap) Name() string {
152183
return dm.name
153184
}
154185

155-
// GetPut atomically sets the key to value and returns the old value stored at key.
186+
// GetPut atomically sets the key to value and returns the old value stored at key. It returns nil if there is no
187+
// previous value.
156188
func (dm *EmbeddedDMap) GetPut(ctx context.Context, key string, value interface{}) (*GetResponse, error) {
157189
e, err := dm.dm.GetPut(ctx, key, value)
158190
if err != nil {

integration_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package olric
1616

1717
import (
1818
"context"
19+
"errors"
1920
"fmt"
2021
"io"
2122
"testing"
@@ -71,6 +72,12 @@ func TestIntegration_NodesJoinOrLeftDuringQuery(t *testing.T) {
7172

7273
for i := 0; i < 100000; i++ {
7374
_, err = dm.Get(context.Background(), fmt.Sprintf("mykey-%d", i))
75+
if errors.Is(err, ErrConnRefused) {
76+
// Rewind
77+
i--
78+
require.NoError(t, c.RefreshMetadata(context.Background()))
79+
continue
80+
}
7481
require.NoError(t, err)
7582
if i == 5999 {
7683
err = c.client.Close(db2.name)
@@ -411,6 +418,11 @@ func TestIntegration_Kill_Nodes_During_Operation(t *testing.T) {
411418

412419
for i := 0; i < 100000; i++ {
413420
_, err = dm.Get(context.Background(), fmt.Sprintf("mykey-%d", i))
421+
if errors.Is(err, ErrConnRefused) {
422+
i--
423+
fmt.Println(c.RefreshMetadata(context.Background()))
424+
continue
425+
}
414426
require.NoError(t, err)
415427
}
416428
}

internal/server/client.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,17 @@ func NewClient(c *config.Client) *Client {
4747
}
4848
}
4949

50+
func (c *Client) Addresses() map[string]struct{} {
51+
c.mu.RLock()
52+
defer c.mu.RUnlock()
53+
54+
addresses := make(map[string]struct{})
55+
for address, _ := range c.clients {
56+
addresses[address] = struct{}{}
57+
}
58+
return addresses
59+
}
60+
5061
func (c *Client) Get(addr string) *redis.Client {
5162
c.mu.RLock()
5263
rc, ok := c.clients[addr]
@@ -69,22 +80,26 @@ func (c *Client) Get(addr string) *redis.Client {
6980
return rc
7081
}
7182

72-
func (c *Client) Pick() (*redis.Client, error) {
83+
func (c *Client) pickNodeRoundRobin() (string, error) {
7384
c.mu.RLock()
7485
defer c.mu.RUnlock()
7586

7687
addr, err := c.roundRobin.Get()
7788
if err == roundrobin.ErrEmptyInstance {
78-
return nil, fmt.Errorf("no available client found")
89+
return "", fmt.Errorf("no available client found")
7990
}
8091
if err != nil {
81-
return nil, err
92+
return "", err
8293
}
83-
rc, ok := c.clients[addr]
84-
if !ok {
85-
return nil, fmt.Errorf("client could not be found: %s", addr)
94+
return addr, nil
95+
}
96+
97+
func (c *Client) Pick() (*redis.Client, error) {
98+
addr, err := c.pickNodeRoundRobin()
99+
if err != nil {
100+
return nil, err
86101
}
87-
return rc, nil
102+
return c.Get(addr), nil
88103
}
89104

90105
func (c *Client) Close(addr string) error {

0 commit comments

Comments
 (0)