Skip to content

Commit afcb658

Browse files
committed
Working write policy
1 parent e36dc32 commit afcb658

15 files changed

+559
-59
lines changed

aerospike_suite_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141
var (
4242
hosts = flag.String("hosts", "", "Comma separated Aerospike server seed hostnames or IP addresses and ports. eg: s1:3000,s2:3000,s3:3000")
4343
host = flag.String("h", "127.0.0.1", "Aerospike server seed hostnames or IP addresses")
44-
port = flag.Int("p", 3000, "Aerospike server seed hostname or IP address port number.")
44+
port = flag.Int("p", 3100, "Aerospike server seed hostname or IP address port number.")
4545
user = flag.String("U", "", "Username.")
4646
password = flag.String("P", "", "Password.")
4747
authMode = flag.String("A", "internal", "Authentication mode: internal | external")

client.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func (clnt *Client) PutPayload(policy *WritePolicy, key *Key, payload []byte) Er
328328
// handled when the record already exists.
329329
// If the policy is nil, the default relevant policy will be used.
330330
func (clnt *Client) Put(policy *WritePolicy, key *Key, binMap BinMap) Error {
331-
policy = clnt.getUsableWritePolicy(policy)
331+
policy = clnt.getUsableWritePolicyWithConfig(policy, applyConfigToWritePolicy)
332332

333333
if policy.Txn != nil {
334334
if err := txnMonitor.addKey(clnt.cluster, policy, key); err != nil {
@@ -350,7 +350,7 @@ func (clnt *Client) Put(policy *WritePolicy, key *Key, binMap BinMap) Error {
350350
// This method avoids using the BinMap allocation and iteration and is lighter on GC.
351351
// If the policy is nil, the default relevant policy will be used.
352352
func (clnt *Client) PutBins(policy *WritePolicy, key *Key, bins ...*Bin) Error {
353-
policy = clnt.getUsableWritePolicy(policy)
353+
policy = clnt.getUsableWritePolicyWithConfig(policy, applyConfigToWritePolicy)
354354

355355
if policy.Txn != nil {
356356
if err := txnMonitor.addKey(clnt.cluster, policy, key); err != nil {
@@ -2140,7 +2140,6 @@ func (clnt *Client) getUsableBatchUDFPolicy(policy *BatchUDFPolicy) *BatchUDFPol
21402140
}
21412141

21422142
func (clnt *Client) getUsableWritePolicy(policy *WritePolicy) *WritePolicy {
2143-
21442143
if policy == nil {
21452144
if clnt.DefaultWritePolicy != nil {
21462145
return clnt.DefaultWritePolicy
@@ -2150,6 +2149,10 @@ func (clnt *Client) getUsableWritePolicy(policy *WritePolicy) *WritePolicy {
21502149
return policy
21512150
}
21522151

2152+
func (clnt *Client) getUsableWritePolicyWithConfig(policy *WritePolicy, fn func(*WritePolicy, *DynConfig) *WritePolicy) *WritePolicy {
2153+
return fn(policy, clnt.dynConfig)
2154+
}
2155+
21532156
func (clnt *Client) getUsableScanPolicy(policy *ScanPolicy) *ScanPolicy {
21542157
if policy == nil {
21552158
if clnt.DefaultScanPolicy != nil {

config.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ func register(provider *dynconfig.ConfigProvider) {
4646
}
4747

4848
func (dc *DynConfig) loadConfig() error {
49-
configProviderMu.RLock()
50-
defer configProviderMu.RUnlock()
49+
configProviderMu.Lock()
50+
defer configProviderMu.Unlock()
5151

5252
if !dc.configInitialized.Load() && configProvider != nil {
5353
logger.Logger.Debug("Initializing configuration...")
@@ -70,12 +70,13 @@ func (dc *DynConfig) providerLoadConfig() {
7070
}
7171

7272
func (dc *DynConfig) initConfig() {
73+
dc.lock.RLock()
74+
defer dc.lock.RUnlock()
75+
7376
loadedConfig := (*configProvider).LoadConfig()
74-
dc.lock.Lock()
7577
if loadedConfig != nil {
7678
dc.config = loadedConfig
7779
}
78-
dc.lock.Unlock()
7980
}
8081

8182
func (dc *DynConfig) watchConfig() {

config/dynconfig.go

+47-42
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package dynconfig
22

3+
import "time"
4+
35
type ConfigProvider interface {
46
LoadConfig() *Config
57
}
@@ -60,36 +62,37 @@ type Client struct {
6062
type Read struct {
6163
ReadModeAp *ReadModeAp `yaml:"read_mode_ap"`
6264
ReadModeSc *ReadModeSc `yaml:"read_mode_sc"`
63-
ConnectTimeout *int `yaml:"connect_timeout"`
65+
ConnectTimeout *Duration `yaml:"connect_timeout"`
6466
FailOnFilteredOut *bool `yaml:"fail_on_filtered_out"`
6567
Replica *Replica `yaml:"replica"`
66-
SleepBetweenRetries *int `yaml:"sleep_between_retries"`
67-
SocketTimeout *int `yaml:"socket_timeout"`
68+
SleepBetweenRetries *Duration `yaml:"sleep_between_retries"`
69+
SocketTimeout *Duration `yaml:"socket_timeout"`
6870
TimeoutDelay *int `yaml:"timeout_delay"`
69-
TotalTimeout *int `yaml:"total_timeout"`
71+
TotalTimeout *Duration `yaml:"total_timeout"`
7072
MaxRetries *int `yaml:"max_retries"`
7173
}
7274

7375
type Write struct {
74-
ConnectTimeout *int `yaml:"connect_timeout"`
75-
FailOnFilteredOut *bool `yaml:"fail_on_filtered_out"`
76-
Replica *Replica `yaml:"replica"`
77-
SendKey *bool `yaml:"send_key"`
78-
SleepBetweenRetries *int `yaml:"sleep_between_retries"`
79-
SocketTimeout *int `yaml:"socket_timeout"`
80-
MaxRetries *int `yaml:"max_retries"`
81-
DurableDelete *bool `yaml:"durable_delete"`
76+
ConnectTimeout *Duration `yaml:"connect_timeout"`
77+
FailOnFilteredOut *bool `yaml:"fail_on_filtered_out"`
78+
Replica *Replica `yaml:"replica"`
79+
SendKey *bool `yaml:"send_key"`
80+
SleepBetweenRetries *Duration `yaml:"sleep_between_retries"`
81+
SocketTimeout *Duration `yaml:"socket_timeout"`
82+
TotalTimeout *Duration `yaml:"total_timeout"`
83+
MaxRetries *int `yaml:"max_retries"`
84+
DurableDelete *bool `yaml:"durable_delete"`
8285
}
8386

8487
type Query struct {
8588
ReadModeAp *ReadModeAp `yaml:"read_mode_ap"`
8689
ReadModeSc *ReadModeSc `yaml:"read_mode_sc"`
8790
ConnectTimeout *int `yaml:"connect_timeout"`
8891
Replica *Replica `yaml:"replica"`
89-
SleepBetweenRetries *int `yaml:"sleep_between_retries"`
90-
SocketTimeout *int `yaml:"socket_timeout"`
92+
SleepBetweenRetries *Duration `yaml:"sleep_between_retries"`
93+
SocketTimeout *Duration `yaml:"socket_timeout"`
9194
TimeoutDelay *int `yaml:"timeout_delay"`
92-
TotalTimeout *int `yaml:"total_timeout"`
95+
TotalTimeout *Duration `yaml:"total_timeout"`
9396
MaxRetries *int `yaml:"max_retries"`
9497
IncludeBinData *bool `yaml:"include_bin_data"`
9598
InfoTimeout *int `yaml:"info_timeout"`
@@ -197,57 +200,59 @@ type Metrics struct {
197200
type ReadModeAp int
198201

199202
const (
200-
One ReadModeAp = iota
203+
ONE ReadModeAp = iota
201204
All
202205
)
203206

204207
var readModeAp = map[ReadModeAp]string{
205-
One: "one",
206-
All: "all",
208+
ONE: "ONE",
209+
All: "ALL",
207210
}
208211

209212
type ReadModeSc int
210213

211214
const (
212-
Session ReadModeSc = iota
213-
Linearize
214-
AllowReplica
215-
AllowUnavailable
215+
SESSION ReadModeSc = iota
216+
LINEARIZE
217+
ALLOWREPLICA
218+
ALLOWUNAVAILABLE
216219
)
217220

218221
var readModeSc = map[ReadModeSc]string{
219-
Session: "session",
220-
Linearize: "linearize",
221-
AllowReplica: "allow_replica",
222-
AllowUnavailable: "allow_unavailable",
222+
SESSION: "SESSION",
223+
LINEARIZE: "LINEARIZE",
224+
ALLOWREPLICA: "ALLOW_REPLICA",
225+
ALLOWUNAVAILABLE: "ALLOW_UNAVAILABLE",
223226
}
224227

225228
type Replica int
226229

227230
const (
228-
Master Replica = iota
229-
MasterProles
230-
Sequence
231-
PreferRack
231+
MASTER Replica = iota
232+
MASTER_PROLES
233+
SEQUENCE
234+
PREFER_RACK
232235
)
233236

234237
var replica = map[Replica]string{
235-
Master: "master",
236-
MasterProles: "master_proles",
237-
Sequence: "sequence",
238-
PreferRack: "prefer_rack",
238+
MASTER: "MASTER",
239+
MASTER_PROLES: "MASTER_PROLES",
240+
SEQUENCE: "SEQUENCE",
241+
PREFER_RACK: "PREFER_RACK",
239242
}
240243

241-
type Duration int
244+
type ExpectedDuration int
242245

243246
const (
244-
Long Duration = iota
245-
Short
246-
LongRelaxAp
247+
LONG ExpectedDuration = iota
248+
SHORT
249+
LONG_RELAX_AP
247250
)
248251

249-
var duration = map[Duration]string{
250-
Long: "long",
251-
Short: "short",
252-
LongRelaxAp: "long_relax_ap",
252+
var duration = map[ExpectedDuration]string{
253+
LONG: "LONG",
254+
SHORT: "SHORT",
255+
LONG_RELAX_AP: "LONG_RELAX_AP",
253256
}
257+
258+
type Duration time.Duration

config/provider/yaml-config-provider.go

+23-8
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package provider
22

33
import (
4-
"log"
4+
"fmt"
55
"os"
66
"time"
77

88
dynconfig "github.com/aerospike/aerospike-client-go/v8/config"
9-
"github.com/stretchr/testify/assert/yaml"
9+
"github.com/aerospike/aerospike-client-go/v8/logger"
10+
"gopkg.in/yaml.v3"
1011
)
1112

12-
const defaultFilePath = "~/aerospikeconfig.yaml"
13+
const defaultFilePath = "aerospikeconfig.yaml"
1314

1415
type YamlConfigProvider struct {
1516
configFilePath string
@@ -36,25 +37,39 @@ func (yc *YamlConfigProvider) LoadConfig() *dynconfig.Config {
3637
if err != nil {
3738
// handle error
3839
}
40+
if info == nil {
41+
logger.Logger.Debug("File does not exist %s . Nothing to do...", defaultFilePath)
42+
return nil
43+
}
3944

4045
modTime := info.ModTime()
4146
// Compare to previously stored modTime
4247
if modTime.After(yc.oldModTime) {
43-
// file changed
4448
yc.oldModTime = modTime
45-
// re-unmarshal your struct
4649

47-
data, err := os.ReadFile("config.yaml")
50+
data, err := os.ReadFile(defaultFilePath)
4851
if err != nil {
49-
log.Fatalf("failed to read config file: %v", err)
52+
logger.Logger.Error("Failed to read file %s. Error: %v", defaultFilePath, err)
5053
}
5154
var config dynconfig.Config
5255
if err := yaml.Unmarshal(data, &config); err != nil {
53-
log.Fatalf("failed to unmarshal yaml: %v", err)
56+
fmt.Printf("Failed to serialize file %s to object. Error: %v", defaultFilePath, err)
57+
logger.Logger.Error("Failed to serialize file %s to object. Error: %v", defaultFilePath, err)
5458
}
5559

5660
return &config
5761
}
5862

5963
return nil
6064
}
65+
66+
type Duration time.Duration
67+
68+
func (d *Duration) UnmarshalYAML(b []byte) error {
69+
var value int64
70+
if err := yaml.Unmarshal(b, &value); err != nil {
71+
return err
72+
}
73+
*d = Duration(time.Duration(value))
74+
return nil
75+
}

go.mod

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,20 @@ go 1.23.0
55
require (
66
github.com/onsi/ginkgo/v2 v2.22.2
77
github.com/onsi/gomega v1.36.2
8+
github.com/stretchr/testify v1.10.0
89
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad
910
github.com/yuin/gopher-lua v1.1.1
1011
golang.org/x/sync v0.12.0
1112
)
1213

1314
require (
15+
github.com/davecgh/go-spew v1.1.1 // indirect
1416
github.com/go-logr/logr v1.4.2 // indirect
1517
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
1618
github.com/google/go-cmp v0.6.0 // indirect
1719
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
1820
github.com/kr/pretty v0.3.1 // indirect
19-
github.com/stretchr/testify v1.10.0 // indirect
21+
github.com/pmezard/go-difflib v1.0.0 // indirect
2022
golang.org/x/net v0.37.0 // indirect
2123
golang.org/x/sys v0.31.0 // indirect
2224
golang.org/x/text v0.23.0 // indirect

policy.go

+63
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package aerospike
1616

1717
import (
1818
"time"
19+
20+
dynconfig "github.com/aerospike/aerospike-client-go/v8/config"
1921
)
2022

2123
// Policy Interface
@@ -205,3 +207,64 @@ func (p *BasePolicy) deadline() time.Time {
205207
func (p *BasePolicy) compress() bool {
206208
return p.UseCompression
207209
}
210+
211+
// copyBasePolicy creates a new BasePolicy instance and copies the values from the source BasePolicy.
212+
func copyBasePolicy(src *BasePolicy) *BasePolicy {
213+
response := NewPolicy()
214+
response.Txn = src.Txn
215+
response.FilterExpression = src.FilterExpression
216+
response.ReadModeAP = src.ReadModeAP
217+
response.ReadModeSC = src.ReadModeSC
218+
response.TotalTimeout = src.TotalTimeout
219+
response.SocketTimeout = src.SocketTimeout
220+
response.MaxRetries = src.MaxRetries
221+
response.ReadTouchTTLPercent = src.ReadTouchTTLPercent
222+
response.SleepBetweenRetries = src.SleepBetweenRetries
223+
response.SleepMultiplier = src.SleepMultiplier
224+
response.ExitFastOnExhaustedConnectionPool = src.ExitFastOnExhaustedConnectionPool
225+
response.SendKey = src.SendKey
226+
response.UseCompression = src.UseCompression
227+
response.ReplicaPolicy = src.ReplicaPolicy
228+
229+
return response
230+
}
231+
232+
// applyConfigToBasePolicy applies the dynamic configuration and generates a new policy. This function
233+
// will NOT override any custom settings in the BasePolicy.
234+
func applyConfigToBasePolicy(policy *BasePolicy, config *dynconfig.Config) *BasePolicy {
235+
if config != nil && config.Dynamic != nil && config.Dynamic.Read != nil {
236+
var responsePolicy *BasePolicy
237+
if policy != nil {
238+
// Copy the existing write policy to preserve any custom settings.
239+
responsePolicy = copyBasePolicy(policy)
240+
} else {
241+
responsePolicy = NewPolicy()
242+
}
243+
244+
if config.Dynamic.Read.ReadModeAp != nil {
245+
responsePolicy.ReadModeAP = mapReadModeAPToReadModeAP(*config.Dynamic.Read.ReadModeAp)
246+
}
247+
if config.Dynamic.Read.ReadModeSc != nil {
248+
responsePolicy.ReadModeSC = mapReadModeSCToReadModeSC(*config.Dynamic.Read.ReadModeSc)
249+
}
250+
if config.Dynamic.Read.TotalTimeout != nil {
251+
responsePolicy.TotalTimeout = time.Duration(*config.Dynamic.Read.TotalTimeout)
252+
}
253+
if config.Dynamic.Read.SocketTimeout != nil {
254+
responsePolicy.SocketTimeout = time.Duration(*config.Dynamic.Read.SocketTimeout)
255+
}
256+
if config.Dynamic.Read.MaxRetries != nil {
257+
responsePolicy.MaxRetries = *config.Dynamic.Read.MaxRetries
258+
}
259+
if config.Dynamic.Read.SleepBetweenRetries != nil {
260+
responsePolicy.SleepBetweenRetries = time.Duration(*config.Dynamic.Read.SleepBetweenRetries)
261+
}
262+
if config.Dynamic.Read.Replica != nil {
263+
responsePolicy.ReplicaPolicy = mapReplicaToReplicaPolicy(*config.Dynamic.Read.Replica)
264+
}
265+
266+
return responsePolicy
267+
} else {
268+
return policy
269+
}
270+
}

0 commit comments

Comments
 (0)