Skip to content

Commit fd5b7de

Browse files
committed
feat: add initial implementation of event mechanism
1 parent 4a5e4ca commit fd5b7de

33 files changed

+996
-116
lines changed

cluster.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,11 @@ func (db *Olric) clusterMembersCommandHandler(conn redcon.Conn, cmd redcon.Comma
161161
members := db.rt.Discovery().GetMembers()
162162
conn.WriteArray(len(members))
163163
for _, member := range members {
164-
conn.WriteArray(4)
164+
conn.WriteArray(3)
165165
conn.WriteBulkString(member.Name)
166-
conn.WriteUint64(member.ID)
166+
// go-redis/redis package cannot handle uint64. At the time of this writing,
167+
// there is no solution for this, and I don't want to use a soft fork to repair it.
168+
//conn.WriteUint64(member.ID)
167169
conn.WriteInt64(member.Birthdate)
168170
if coordinator.CompareByID(member) {
169171
conn.WriteBulkString("true")

cluster_client.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"encoding/json"
2020
"fmt"
21+
"github.com/buraksezer/olric/internal/discovery"
2122
"log"
2223
"os"
2324
"time"
@@ -491,16 +492,13 @@ func (cl *ClusterClient) Members(ctx context.Context) ([]Member, error) {
491492
m := Member{}
492493
item := rawItem.([]interface{})
493494
m.Name = item[0].(string)
495+
m.Birthdate = item[1].(int64)
494496

495-
switch id := item[1].(type) {
496-
case uint64:
497-
m.ID = id
498-
case int64:
499-
m.ID = uint64(id)
500-
}
497+
// go-redis/redis package cannot handle uint64 type. At the time of this writing,
498+
// there is no solution for this, and I don't want to use a soft fork to repair it.
499+
m.ID = discovery.MemberID(m.Name, m.Birthdate)
501500

502-
m.Birthdate = item[2].(int64)
503-
if item[3] == "true" {
501+
if item[2] == "true" {
504502
m.Coordinator = true
505503
}
506504
members = append(members, m)

cmd/olricd/olricd.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ olricd:
4242
memberCountQuorum: 1
4343
routingTablePushInterval: 1m
4444

45+
enableClusterEventsChannel: true
46+
4547
storageEngines:
4648
config:
4749
kvstore:
@@ -186,7 +188,7 @@ dmaps:
186188
engine:
187189
name: kvstore
188190
config:
189-
tableSize: 4096
191+
tableSize: 1048576
190192
# checkEmptyFragmentsInterval: 1m
191193
# triggerCompactionInterval: 10m
192194
# numEvictionWorkers: 1

config/config.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ type Config struct {
225225
// load for a server in the cluster. Keep it small.
226226
LoadFactor float64
227227

228+
EnableClusterEventsChannel bool
229+
228230
// Default hasher is github.com/cespare/xxhash/v2
229231
Hasher hasher.Hasher
230232

@@ -370,6 +372,8 @@ func (c *Config) Sanitize() error {
370372

371373
if c.Logger == nil {
372374
c.Logger = log.New(c.LogOutput, "", log.LstdFlags)
375+
} else {
376+
c.Logger.SetOutput(c.LogOutput)
373377
}
374378

375379
if c.Hasher == nil {
@@ -443,8 +447,10 @@ func (c *Config) Sanitize() error {
443447
return nil
444448
}
445449

446-
// New returns a Config with sane defaults.
447-
// It takes an env parameter used by memberlist: local, lan and wan.
450+
// New returns a Config with sane defaults. If you change a configuration parameter,
451+
// please run Sanitize and Validate functions respectively.
452+
//
453+
// New takes an env parameter used by memberlist: local, lan and wan.
448454
//
449455
// local:
450456
//

config/internal/loader/loader.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,24 @@ package loader
1717
import "gopkg.in/yaml.v2"
1818

1919
type olricd struct {
20-
Name string `yaml:"name"`
21-
BindAddr string `yaml:"bindAddr"`
22-
BindPort int `yaml:"bindPort"`
23-
Interface string `yaml:"interface"`
24-
ReplicationMode int `yaml:"replicationMode"`
25-
PartitionCount uint64 `yaml:"partitionCount"`
26-
LoadFactor float64 `yaml:"loadFactor"`
27-
KeepAlivePeriod string `yaml:"keepAlivePeriod"`
28-
BootstrapTimeout string `yaml:"bootstrapTimeout"`
29-
ReplicaCount int `yaml:"replicaCount"`
30-
WriteQuorum int `yaml:"writeQuorum"`
31-
ReadQuorum int `yaml:"readQuorum"`
32-
ReadRepair bool `yaml:"readRepair"`
33-
MemberCountQuorum int32 `yaml:"memberCountQuorum"`
34-
RoutingTablePushInterval string `yaml:"routingTablePushInterval"`
35-
TriggerBalancerInterval string `yaml:"triggerBalancerInterval"`
36-
LeaveTimeout string `yaml:"leaveTimeout"`
20+
Name string `yaml:"name"`
21+
BindAddr string `yaml:"bindAddr"`
22+
BindPort int `yaml:"bindPort"`
23+
Interface string `yaml:"interface"`
24+
ReplicationMode int `yaml:"replicationMode"`
25+
PartitionCount uint64 `yaml:"partitionCount"`
26+
LoadFactor float64 `yaml:"loadFactor"`
27+
KeepAlivePeriod string `yaml:"keepAlivePeriod"`
28+
BootstrapTimeout string `yaml:"bootstrapTimeout"`
29+
ReplicaCount int `yaml:"replicaCount"`
30+
WriteQuorum int `yaml:"writeQuorum"`
31+
ReadQuorum int `yaml:"readQuorum"`
32+
ReadRepair bool `yaml:"readRepair"`
33+
MemberCountQuorum int32 `yaml:"memberCountQuorum"`
34+
RoutingTablePushInterval string `yaml:"routingTablePushInterval"`
35+
TriggerBalancerInterval string `yaml:"triggerBalancerInterval"`
36+
LeaveTimeout string `yaml:"leaveTimeout"`
37+
EnableClusterEventsChannel bool `yaml:"enableClusterEventsChannel"`
3738
}
3839

3940
type client struct {

config/load.go

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -364,35 +364,36 @@ func Load(filename string) (*Config, error) {
364364
}
365365

366366
cfg := &Config{
367-
BindAddr: c.Olricd.BindAddr,
368-
BindPort: c.Olricd.BindPort,
369-
Interface: c.Olricd.Interface,
370-
ServiceDiscovery: c.ServiceDiscovery,
371-
MemberlistInterface: c.Memberlist.Interface,
372-
MemberlistConfig: memberlistConfig,
373-
Client: &clientConfig,
374-
LogLevel: c.Logging.Level,
375-
JoinRetryInterval: joinRetryInterval,
376-
RoutingTablePushInterval: routingTablePushInterval,
377-
TriggerBalancerInterval: triggerBalancerInterval,
378-
MaxJoinAttempts: c.Memberlist.MaxJoinAttempts,
379-
Peers: c.Memberlist.Peers,
380-
PartitionCount: c.Olricd.PartitionCount,
381-
ReplicaCount: c.Olricd.ReplicaCount,
382-
WriteQuorum: c.Olricd.WriteQuorum,
383-
ReadQuorum: c.Olricd.ReadQuorum,
384-
ReplicationMode: c.Olricd.ReplicationMode,
385-
ReadRepair: c.Olricd.ReadRepair,
386-
LoadFactor: c.Olricd.LoadFactor,
387-
MemberCountQuorum: c.Olricd.MemberCountQuorum,
388-
Logger: log.New(logOutput, "", log.LstdFlags),
389-
LogOutput: logOutput,
390-
LogVerbosity: c.Logging.Verbosity,
391-
Hasher: hasher.NewDefaultHasher(),
392-
KeepAlivePeriod: keepAlivePeriod,
393-
BootstrapTimeout: bootstrapTimeout,
394-
LeaveTimeout: leaveTimeout,
395-
DMaps: dmapConfig,
367+
BindAddr: c.Olricd.BindAddr,
368+
BindPort: c.Olricd.BindPort,
369+
Interface: c.Olricd.Interface,
370+
ServiceDiscovery: c.ServiceDiscovery,
371+
MemberlistInterface: c.Memberlist.Interface,
372+
MemberlistConfig: memberlistConfig,
373+
Client: &clientConfig,
374+
LogLevel: c.Logging.Level,
375+
JoinRetryInterval: joinRetryInterval,
376+
RoutingTablePushInterval: routingTablePushInterval,
377+
TriggerBalancerInterval: triggerBalancerInterval,
378+
EnableClusterEventsChannel: c.Olricd.EnableClusterEventsChannel,
379+
MaxJoinAttempts: c.Memberlist.MaxJoinAttempts,
380+
Peers: c.Memberlist.Peers,
381+
PartitionCount: c.Olricd.PartitionCount,
382+
ReplicaCount: c.Olricd.ReplicaCount,
383+
WriteQuorum: c.Olricd.WriteQuorum,
384+
ReadQuorum: c.Olricd.ReadQuorum,
385+
ReplicationMode: c.Olricd.ReplicationMode,
386+
ReadRepair: c.Olricd.ReadRepair,
387+
LoadFactor: c.Olricd.LoadFactor,
388+
MemberCountQuorum: c.Olricd.MemberCountQuorum,
389+
Logger: log.New(logOutput, "", log.LstdFlags),
390+
LogOutput: logOutput,
391+
LogVerbosity: c.Logging.Verbosity,
392+
Hasher: hasher.NewDefaultHasher(),
393+
KeepAlivePeriod: keepAlivePeriod,
394+
BootstrapTimeout: bootstrapTimeout,
395+
LeaveTimeout: leaveTimeout,
396+
DMaps: dmapConfig,
396397
}
397398

398399
if err := cfg.Sanitize(); err != nil {

events.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package olric
2+
3+
import "time"
4+
5+
type ClusterTopologyEvent struct {
6+
Event string
7+
Name string
8+
Timestamp time.Time
9+
}

0 commit comments

Comments
 (0)