Skip to content

Commit d62a9b4

Browse files
committed
Finished config to policy mapping changes
1 parent 7d4eff0 commit d62a9b4

30 files changed

+1211
-244
lines changed

batch_command_operate.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -246,16 +246,16 @@ func (cmd *batchCommandOperate) executeSingle(client *Client) Error {
246246
} else if len(ops) == 0 {
247247
ops = append(ops, GetOp())
248248
}
249-
res, err = client.Operate(cmd.client.getUsableBatchReadPolicyWithConfig(br.Policy, applyConfigToBatchReadPolicy).toWritePolicyWithConfig(cmd.policy, client.dynConfig), br.Key, ops...)
249+
res, err = client.Operate(cmd.client.getUsableBatchReadPolicyWithConfig(br.Policy, ApplyConfigToBatchReadPolicy).ToWritePolicyWithConfig(cmd.policy, client.dynConfig), br.Key, ops...)
250250
case *BatchWrite:
251-
policy := cmd.client.getUsableBatchWritePolicyWithConfig(br.Policy, applyConfigToBatchWritePolicy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
251+
policy := cmd.client.getUsableBatchWritePolicyWithConfig(br.Policy, ApplyConfigToBatchWritePolicy).ToWritePolicyWithConfig(cmd.policy, client.dynConfig)
252252
policy.RespondPerEachOp = true
253253
res, err = client.Operate(policy, br.Key, br.Ops...)
254254
case *BatchDelete:
255-
policy := cmd.client.getUsableBatchDeletePolicyWithConfig(br.Policy, applyConfigToBatchDeletePolicy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
255+
policy := cmd.client.getUsableBatchDeletePolicyWithConfig(br.Policy, ApplyConfigToBatchDeletePolicy).ToWritePolicyWithConfig(cmd.policy, client.dynConfig)
256256
res, err = client.Operate(policy, br.Key, DeleteOp())
257257
case *BatchUDF:
258-
policy := cmd.client.getUsableBatchUDFPolicyWithConfig(br.Policy, applyConfigToBatchUDFPolicy).toWritePolicyWithConfig(cmd.policy, client.dynConfig)
258+
policy := cmd.client.getUsableBatchUDFPolicyWithConfig(br.Policy, ApplyConfigToBatchUDFPolicy).ToWritePolicyWithConfig(cmd.policy, client.dynConfig)
259259
policy.RespondPerEachOp = true
260260
res, err = client.execute(policy, br.Key, br.PackageName, br.FunctionName, br.FunctionArgs...)
261261
}

batch_delete_policy.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (bdp *BatchDeletePolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
7474
return wp
7575
}
7676

77-
func (bdp *BatchDeletePolicy) toWritePolicyWithConfig(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
77+
func (bdp *BatchDeletePolicy) ToWritePolicyWithConfig(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
7878
wp := bp.toWritePolicy()
7979

8080
if bdp != nil {
@@ -88,7 +88,10 @@ func (bdp *BatchDeletePolicy) toWritePolicyWithConfig(bp *BatchPolicy, dynConfig
8888
wp.SendKey = bdp.SendKey
8989
}
9090

91-
config := dynConfig.config
91+
dynConfig.lock.RLock()
92+
defer dynConfig.lock.RUnlock()
93+
94+
config := dynConfig.Config
9295
if config != nil && config.Dynamic.BatchWrite != nil {
9396
if config.Dynamic.BatchWrite.DurableDelete != nil {
9497
wp.DurableDelete = *config.Dynamic.BatchWrite.DurableDelete
@@ -122,16 +125,19 @@ func copyBatchDeletePolicy(src *BatchDeletePolicy) *BatchDeletePolicy {
122125

123126
// applyConfigToQueryPolicy applies the dynamic configuration and generates a new policy. This function
124127
// will NOT override any custom settings in the QueryPolicy.
125-
func applyConfigToBatchDeletePolicy(policy *BatchDeletePolicy, dynConfig *DynConfig) *BatchDeletePolicy {
126-
config := dynConfig.config
128+
func ApplyConfigToBatchDeletePolicy(policy *BatchDeletePolicy, dynConfig *DynConfig) *BatchDeletePolicy {
129+
config := dynConfig.Config
127130

128131
if config == nil && !dynConfig.configInitialized.Load() {
129132
// On initial load it is possible that the config is not yet loaded. This will kick things off to make sure
130133
// config is loaded.
131134
dynConfig.loadConfig()
132-
config = dynConfig.config
135+
config = dynConfig.Config
133136
}
134137

138+
dynConfig.lock.RLock()
139+
defer dynConfig.lock.RUnlock()
140+
135141
if config != nil && config.Dynamic != nil && config.Dynamic.BatchWrite != nil {
136142
var responsePolicy *BatchDeletePolicy
137143
if policy != nil {

batch_policy.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -156,16 +156,19 @@ func copyBatchPolicy(src *BatchPolicy) *BatchPolicy {
156156

157157
// applyConfigToQueryPolicy applies the dynamic configuration and generates a new policy. This function
158158
// will NOT override any custom settings in the QueryPolicy.
159-
func applyConfigToBatchPolicy(policy *BatchPolicy, dynConfig *DynConfig) *BatchPolicy {
160-
config := dynConfig.config
159+
func ApplyConfigToBatchPolicy(policy *BatchPolicy, dynConfig *DynConfig) *BatchPolicy {
160+
config := dynConfig.Config
161161

162162
if config == nil && !dynConfig.configInitialized.Load() {
163163
// On initial load it is possible that the config is not yet loaded. This will kick things off to make sure
164164
// config is loaded.
165165
dynConfig.loadConfig()
166-
config = dynConfig.config
166+
config = dynConfig.Config
167167
}
168168

169+
dynConfig.lock.RLock()
170+
defer dynConfig.lock.RUnlock()
171+
169172
if config != nil && config.Dynamic != nil && config.Dynamic.BatchRead != nil {
170173
var responsePolicy *BatchPolicy
171174
if policy != nil {
@@ -176,10 +179,10 @@ func applyConfigToBatchPolicy(policy *BatchPolicy, dynConfig *DynConfig) *BatchP
176179
}
177180

178181
if config.Dynamic.BatchRead.ReadModeAp != nil {
179-
responsePolicy.ReadModeAP = mapReadModeAPToReadModeAP(*config.Dynamic.BatchRead.ReadModeAp)
182+
responsePolicy.ReadModeAP = MapReadModeAPToReadModeAP(*config.Dynamic.BatchRead.ReadModeAp)
180183
}
181184
if config.Dynamic.BatchRead.ReadModeSc != nil {
182-
responsePolicy.ReadModeSC = mapReadModeSCToReadModeSC(*config.Dynamic.BatchRead.ReadModeSc)
185+
responsePolicy.ReadModeSC = MapReadModeSCToReadModeSC(*config.Dynamic.BatchRead.ReadModeSc)
183186
}
184187
if config.Dynamic.BatchRead.TotalTimeout != nil {
185188
responsePolicy.TotalTimeout = time.Duration(*config.Dynamic.BatchRead.TotalTimeout)

batch_read_policy.go

+15-9
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (brp *BatchReadPolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
7171
return wp
7272
}
7373

74-
func (brp *BatchReadPolicy) toWritePolicyWithConfig(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
74+
func (brp *BatchReadPolicy) ToWritePolicyWithConfig(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
7575
wp := bp.toWritePolicy()
7676

7777
if brp != nil {
@@ -84,13 +84,16 @@ func (brp *BatchReadPolicy) toWritePolicyWithConfig(bp *BatchPolicy, dynConfig *
8484
wp.ReadTouchTTLPercent = brp.ReadTouchTTLPercent
8585
}
8686

87-
config := dynConfig.config
87+
dynConfig.lock.RLock()
88+
defer dynConfig.lock.RUnlock()
89+
90+
config := dynConfig.Config
8891
if config != nil && config.Dynamic.BatchRead != nil {
8992
if config.Dynamic.BatchRead.ReadModeAp != nil {
90-
wp.ReadModeAP = mapReadModeAPToReadModeAP(*config.Dynamic.BatchRead.ReadModeAp)
93+
wp.ReadModeAP = MapReadModeAPToReadModeAP(*config.Dynamic.BatchRead.ReadModeAp)
9194
}
9295
if config.Dynamic.BatchRead.ReadModeSc != nil {
93-
wp.ReadModeSC = mapReadModeSCToReadModeSC(*config.Dynamic.BatchRead.ReadModeSc)
96+
wp.ReadModeSC = MapReadModeSCToReadModeSC(*config.Dynamic.BatchRead.ReadModeSc)
9497
}
9598
if config.Dynamic.BatchRead.Replica != nil {
9699
wp.ReplicaPolicy = mapReplicaToReplicaPolicy(*config.Dynamic.BatchRead.Replica)
@@ -130,16 +133,19 @@ func copyBatchReadPolicy(src *BatchReadPolicy) *BatchReadPolicy {
130133

131134
// applyConfigToQueryPolicy applies the dynamic configuration and generates a new policy. This function
132135
// will NOT override any custom settings in the QueryPolicy.
133-
func applyConfigToBatchReadPolicy(policy *BatchReadPolicy, dynConfig *DynConfig) *BatchReadPolicy {
134-
config := dynConfig.config
136+
func ApplyConfigToBatchReadPolicy(policy *BatchReadPolicy, dynConfig *DynConfig) *BatchReadPolicy {
137+
config := dynConfig.Config
135138

136139
if config == nil && !dynConfig.configInitialized.Load() {
137140
// On initial load it is possible that the config is not yet loaded. This will kick things off to make sure
138141
// config is loaded.
139142
dynConfig.loadConfig()
140-
config = dynConfig.config
143+
config = dynConfig.Config
141144
}
142145

146+
dynConfig.lock.RLock()
147+
defer dynConfig.lock.RUnlock()
148+
143149
if config != nil && config.Dynamic != nil && config.Dynamic.BatchRead != nil {
144150
var responsePolicy *BatchReadPolicy
145151
if policy != nil {
@@ -150,10 +156,10 @@ func applyConfigToBatchReadPolicy(policy *BatchReadPolicy, dynConfig *DynConfig)
150156
}
151157

152158
if config.Dynamic.BatchRead.ReadModeAp != nil {
153-
responsePolicy.ReadModeAP = mapReadModeAPToReadModeAP(*config.Dynamic.BatchRead.ReadModeAp)
159+
responsePolicy.ReadModeAP = MapReadModeAPToReadModeAP(*config.Dynamic.BatchRead.ReadModeAp)
154160
}
155161
if config.Dynamic.BatchRead.ReadModeSc != nil {
156-
responsePolicy.ReadModeSC = mapReadModeSCToReadModeSC(*config.Dynamic.BatchRead.ReadModeSc)
162+
responsePolicy.ReadModeSC = MapReadModeSCToReadModeSC(*config.Dynamic.BatchRead.ReadModeSc)
157163
}
158164

159165
return responsePolicy

batch_read_policy_config_test.go

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package aerospike_test
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
as "github.com/aerospike/aerospike-client-go/v8"
8+
dynconfig "github.com/aerospike/aerospike-client-go/v8/config"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestApplyBatchReadConfig(t *testing.T) {
13+
// Create the full config.
14+
config := &as.DynConfig{
15+
Config: &dynconfig.Config{
16+
Dynamic: &dynconfig.DynamicConfig{
17+
BatchRead: &dynconfig.BatchRead{
18+
ReadModeAp: func() *dynconfig.ReadModeAp { r := dynconfig.All; return &r }(),
19+
ReadModeSc: func() *dynconfig.ReadModeSc { r := dynconfig.ALLOWUNAVAILABLE; return &r }(),
20+
Replica: func() *dynconfig.Replica { r := dynconfig.MASTER; return &r }(),
21+
SleepBetweenRetries: func() *dynconfig.Duration { d := dynconfig.Duration(time.Second * 1); return &d }(),
22+
SocketTimeout: func() *dynconfig.Duration { d := dynconfig.Duration(time.Second * 3); return &d }(),
23+
TotalTimeout: func() *dynconfig.Duration { r := dynconfig.Duration(time.Second * 15); return &r }(),
24+
MaxRetries: func() *int { r := 5; return &r }(),
25+
MaxConcurrentThread: func() *int { r := 5; return &r }(),
26+
AllowInline: func() *bool { r := true; return &r }(),
27+
RespondAllKeys: func() *bool { r := true; return &r }(),
28+
},
29+
},
30+
},
31+
}
32+
33+
// Create an initial WritePolicy.
34+
policy := as.NewBatchReadPolicy()
35+
36+
// Check defaults
37+
require.NotNil(t, policy)
38+
require.Equal(t, as.ReadModeAPOne, policy.ReadModeAP)
39+
require.Equal(t, as.ReadModeSCSession, policy.ReadModeSC)
40+
41+
// Apply the configuration.
42+
updatedPolicy := as.ApplyConfigToBatchReadPolicy(policy, config)
43+
44+
// Validate the applied configuration.
45+
require.NotNil(t, updatedPolicy)
46+
require.Equal(t, as.ReadModeAPAll, updatedPolicy.ReadModeAP)
47+
require.Equal(t, as.ReadModeSCAllowUnavailable, updatedPolicy.ReadModeSC)
48+
}
49+
func TestApplyBatchReadConfigToWritePolicyWithConfig(t *testing.T) {
50+
// Create the full config.
51+
config := &as.DynConfig{
52+
Config: &dynconfig.Config{
53+
Dynamic: &dynconfig.DynamicConfig{
54+
BatchRead: &dynconfig.BatchRead{
55+
ReadModeAp: func() *dynconfig.ReadModeAp { r := dynconfig.All; return &r }(),
56+
ReadModeSc: func() *dynconfig.ReadModeSc { r := dynconfig.ALLOWUNAVAILABLE; return &r }(),
57+
Replica: func() *dynconfig.Replica { r := dynconfig.MASTER; return &r }(),
58+
SleepBetweenRetries: func() *dynconfig.Duration { d := dynconfig.Duration(time.Second * 1); return &d }(),
59+
SocketTimeout: func() *dynconfig.Duration { d := dynconfig.Duration(time.Second * 3); return &d }(),
60+
TotalTimeout: func() *dynconfig.Duration { r := dynconfig.Duration(time.Second * 15); return &r }(),
61+
MaxRetries: func() *int { r := 5; return &r }(),
62+
MaxConcurrentThread: func() *int { r := 5; return &r }(),
63+
AllowInline: func() *bool { r := true; return &r }(),
64+
RespondAllKeys: func() *bool { r := true; return &r }(),
65+
},
66+
},
67+
},
68+
}
69+
70+
// Check defaults
71+
batchPolicy := as.NewBatchPolicy()
72+
require.NotNil(t, batchPolicy)
73+
require.Equal(t, as.ReadModeAPOne, batchPolicy.ReadModeAP)
74+
require.Equal(t, as.ReadModeSCSession, batchPolicy.ReadModeSC)
75+
require.Equal(t, 1000, int(batchPolicy.TotalTimeout.Milliseconds()))
76+
require.Equal(t, 30, int(batchPolicy.SocketTimeout.Seconds()))
77+
require.Equal(t, 2, batchPolicy.MaxRetries)
78+
require.Equal(t, 1, int(batchPolicy.SleepBetweenRetries.Milliseconds()))
79+
require.Equal(t, as.SEQUENCE, batchPolicy.ReplicaPolicy)
80+
require.Equal(t, false, batchPolicy.SendKey)
81+
require.Equal(t, false, batchPolicy.UseCompression)
82+
83+
// Apply the configuration.
84+
updatedPolicy := as.NewBatchReadPolicy()
85+
86+
updatedWritePolicy := updatedPolicy.ToWritePolicyWithConfig(batchPolicy, config)
87+
require.Equal(t, as.ReadModeAPAll, updatedWritePolicy.ReadModeAP)
88+
require.Equal(t, as.ReadModeSCAllowUnavailable, updatedWritePolicy.ReadModeSC)
89+
require.Equal(t, as.MASTER, updatedWritePolicy.ReplicaPolicy)
90+
require.Equal(t, 15, int(updatedWritePolicy.TotalTimeout.Seconds()))
91+
require.Equal(t, 3, int(updatedWritePolicy.SocketTimeout.Seconds()))
92+
require.Equal(t, 5, updatedWritePolicy.MaxRetries)
93+
require.Equal(t, 1000, int(updatedWritePolicy.SleepBetweenRetries.Milliseconds()))
94+
require.Equal(t, false, updatedWritePolicy.SendKey)
95+
// Validate the applied configuration.
96+
require.NotNil(t, updatedPolicy)
97+
}

batch_udf_policy.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (bup *BatchUDFPolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
8181
return wp
8282
}
8383

84-
func (bup *BatchUDFPolicy) toWritePolicyWithConfig(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
84+
func (bup *BatchUDFPolicy) ToWritePolicyWithConfig(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
8585
wp := bp.toWritePolicy()
8686

8787
if bup != nil {
@@ -94,7 +94,10 @@ func (bup *BatchUDFPolicy) toWritePolicyWithConfig(bp *BatchPolicy, dynConfig *D
9494
wp.SendKey = bup.SendKey
9595
}
9696

97-
config := dynConfig.config
97+
dynConfig.lock.RLock()
98+
defer dynConfig.lock.RUnlock()
99+
100+
config := dynConfig.Config
98101
if config != nil && config.Dynamic.BatchUdf != nil {
99102
if config.Dynamic.BatchWrite.DurableDelete != nil {
100103
wp.DurableDelete = *config.Dynamic.BatchWrite.DurableDelete
@@ -128,16 +131,19 @@ func copyBatchUDFPolicy(src *BatchUDFPolicy) *BatchUDFPolicy {
128131

129132
// applyConfigToQueryPolicy applies the dynamic configuration and generates a new policy. This function
130133
// will NOT override any custom settings in the QueryPolicy.
131-
func applyConfigToBatchUDFPolicy(policy *BatchUDFPolicy, dynConfig *DynConfig) *BatchUDFPolicy {
132-
config := dynConfig.config
134+
func ApplyConfigToBatchUDFPolicy(policy *BatchUDFPolicy, dynConfig *DynConfig) *BatchUDFPolicy {
135+
config := dynConfig.Config
133136

134137
if config == nil && !dynConfig.configInitialized.Load() {
135138
// On initial load it is possible that the config is not yet loaded. This will kick things off to make sure
136139
// config is loaded.
137140
dynConfig.loadConfig()
138-
config = dynConfig.config
141+
config = dynConfig.Config
139142
}
140143

144+
dynConfig.lock.RLock()
145+
defer dynConfig.lock.RUnlock()
146+
141147
if config != nil && config.Dynamic != nil && config.Dynamic.BatchUdf != nil {
142148
var responsePolicy *BatchUDFPolicy
143149
if policy != nil {

batch_write_policy.go

+31-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package aerospike
1616

17+
import "time"
18+
1719
// BatchWritePolicy attributes used in batch write commands.
1820
type BatchWritePolicy struct {
1921
// FilterExpression is optional expression filter. If FilterExpression exists and evaluates to false, the specific batch key
@@ -112,7 +114,7 @@ func (bwp *BatchWritePolicy) toWritePolicy(bp *BatchPolicy) *WritePolicy {
112114
return wp
113115
}
114116

115-
func (bwp *BatchWritePolicy) toWritePolicyWithConfig(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
117+
func (bwp *BatchWritePolicy) ToWritePolicyWithConfig(bp *BatchPolicy, dynConfig *DynConfig) *WritePolicy {
116118
wp := bp.toWritePolicy()
117119

118120
if bwp != nil {
@@ -128,8 +130,29 @@ func (bwp *BatchWritePolicy) toWritePolicyWithConfig(bp *BatchPolicy, dynConfig
128130
wp.SendKey = bwp.SendKey
129131
}
130132

131-
config := dynConfig.config
133+
dynConfig.lock.RLock()
134+
defer dynConfig.lock.RUnlock()
135+
136+
config := dynConfig.Config
132137
if config != nil && config.Dynamic.BatchWrite != nil {
138+
if config.Dynamic.BatchWrite.Replica != nil {
139+
wp.ReplicaPolicy = mapReplicaToReplicaPolicy(*config.Dynamic.BatchWrite.Replica)
140+
}
141+
if config.Dynamic.BatchWrite.SleepBetweenRetries != nil {
142+
wp.SleepBetweenRetries = time.Duration(*config.Dynamic.BatchWrite.SleepBetweenRetries)
143+
}
144+
if config.Dynamic.BatchWrite.SocketTimeout != nil {
145+
wp.SocketTimeout = time.Duration(*config.Dynamic.BatchWrite.SocketTimeout)
146+
}
147+
if config.Dynamic.BatchWrite.TotalTimeout != nil {
148+
wp.TotalTimeout = time.Duration(*config.Dynamic.BatchWrite.TotalTimeout)
149+
}
150+
if config.Dynamic.BatchWrite.MaxRetries != nil {
151+
wp.MaxRetries = *config.Dynamic.BatchWrite.MaxRetries
152+
}
153+
if config.Dynamic.BatchWrite.RespondAllKeys != nil {
154+
wp.RespondPerEachOp = *config.Dynamic.BatchWrite.RespondAllKeys
155+
}
133156
if config.Dynamic.BatchWrite.DurableDelete != nil {
134157
wp.DurableDelete = *config.Dynamic.BatchWrite.DurableDelete
135158
}
@@ -164,16 +187,19 @@ func copyBatchWritePolicy(src *BatchWritePolicy) *BatchWritePolicy {
164187

165188
// applyConfigToQueryPolicy applies the dynamic configuration and generates a new policy. This function
166189
// will NOT override any custom settings in the QueryPolicy.
167-
func applyConfigToBatchWritePolicy(policy *BatchWritePolicy, dynConfig *DynConfig) *BatchWritePolicy {
168-
config := dynConfig.config
190+
func ApplyConfigToBatchWritePolicy(policy *BatchWritePolicy, dynConfig *DynConfig) *BatchWritePolicy {
191+
config := dynConfig.Config
169192

170193
if config == nil && !dynConfig.configInitialized.Load() {
171194
// On initial load it is possible that the config is not yet loaded. This will kick things off to make sure
172195
// config is loaded.
173196
dynConfig.loadConfig()
174-
config = dynConfig.config
197+
config = dynConfig.Config
175198
}
176199

200+
dynConfig.lock.RLock()
201+
defer dynConfig.lock.RUnlock()
202+
177203
if config != nil && config.Dynamic != nil && config.Dynamic.BatchWrite != nil {
178204
var responsePolicy *BatchWritePolicy
179205
if policy != nil {

0 commit comments

Comments
 (0)