Skip to content

Commit 8c5c9e0

Browse files
committed
Implement concurrency in data retrieval and synchronization
Introduced goroutines with WaitGroup and Mutex to parallelize retrieval from previous owners, replicas, and during read repair in DMap operations. This improves efficiency by leveraging concurrent data fetching and synchronization across nodes.
1 parent 40ffe2f commit 8c5c9e0

File tree

1 file changed

+112
-72
lines changed

1 file changed

+112
-72
lines changed

internal/dmap/get.go

Lines changed: 112 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"errors"
2020
"sort"
21+
"sync"
2122

2223
"github.com/olric-data/olric/config"
2324
"github.com/olric-data/olric/internal/cluster/partitions"
@@ -153,25 +154,40 @@ func (dm *DMap) lookupOnOwners(hkey uint64, key string) []*version {
153154
panic("partition owners list cannot be empty")
154155
}
155156

156-
var versions []*version
157+
var (
158+
wg sync.WaitGroup
159+
mtx sync.Mutex
160+
versions []*version
161+
)
157162
versions = append(versions, dm.lookupOnThisNode(hkey, key))
158163

159164
// Run a query on the previous owners.
160165
// Traverse in reverse order. Except from the latest host, this one.
161166
for i := len(owners) - 2; i >= 0; i-- {
162167
owner := owners[i]
163-
v, err := dm.lookupOnPreviousOwner(&owner, key)
164-
if err != nil {
165-
if dm.s.log.V(6).Ok() {
166-
dm.s.log.V(6).Printf("[ERROR] Failed to call get on a previous "+
167-
"primary owner: %s: %v", owner, err)
168+
169+
wg.Add(1)
170+
go func(member *discovery.Member) {
171+
defer wg.Done()
172+
173+
v, err := dm.lookupOnPreviousOwner(member, key)
174+
if err != nil {
175+
if dm.s.log.V(6).Ok() {
176+
dm.s.log.V(6).Printf("[ERROR] Failed to call get on a previous "+
177+
"primary owner: %s: %v", member, err)
178+
}
179+
return
168180
}
169-
continue
170-
}
171-
// Ignore failed owners. The data on those hosts will be wiped out
172-
// by the balancer.
173-
versions = append(versions, v)
181+
182+
mtx.Lock()
183+
// Ignore failed owners. The balancer will wipe out
184+
// the data on those hosts.
185+
versions = append(versions, v)
186+
mtx.Unlock()
187+
}(&owner)
174188
}
189+
190+
wg.Wait()
175191
return versions
176192
}
177193

@@ -204,85 +220,108 @@ func (dm *DMap) sanitizeAndSortVersions(versions []*version) []*version {
204220
// lookupOnReplicas retrieves data from replica nodes for the given hash key and
205221
// key, returning a list of versioned entries.
206222
func (dm *DMap) lookupOnReplicas(hkey uint64, key string) []*version {
207-
// Check backup.
208-
backups := dm.s.backup.PartitionOwnersByHKey(hkey)
209-
versions := make([]*version, 0, len(backups))
210-
for _, replica := range backups {
211-
host := replica
212-
cmd := protocol.NewGetEntry(dm.name, key).SetReplica().Command(dm.s.ctx)
213-
rc := dm.s.client.Get(host.String())
214-
err := rc.Process(dm.s.ctx, cmd)
215-
err = protocol.ConvertError(err)
216-
if err != nil {
217-
if dm.s.log.V(6).Ok() {
218-
dm.s.log.V(6).Printf("[DEBUG] Failed to call get on"+
219-
" a replica owner: %s: %v", host, err)
223+
// Check replicas
224+
var (
225+
wg sync.WaitGroup
226+
mtx sync.Mutex
227+
)
228+
229+
replicas := dm.s.backup.PartitionOwnersByHKey(hkey)
230+
versions := make([]*version, 0, len(replicas))
231+
for _, replica := range replicas {
232+
wg.Add(1)
233+
go func(host *discovery.Member) {
234+
defer wg.Done()
235+
236+
cmd := protocol.NewGetEntry(dm.name, key).SetReplica().Command(dm.s.ctx)
237+
rc := dm.s.client.Get(host.String())
238+
err := rc.Process(dm.s.ctx, cmd)
239+
err = protocol.ConvertError(err)
240+
if err != nil {
241+
if dm.s.log.V(6).Ok() {
242+
dm.s.log.V(6).Printf("[DEBUG] Failed to call get on"+
243+
" a replica owner: %s: %v", host, err)
244+
}
245+
return
220246
}
221-
continue
222-
}
223247

224-
value, err := cmd.Bytes()
225-
err = protocol.ConvertError(err)
226-
if err != nil {
227-
if dm.s.log.V(6).Ok() {
228-
dm.s.log.V(6).Printf("[DEBUG] Failed to call get on"+
229-
" a replica owner: %s: %v", host, err)
248+
value, err := cmd.Bytes()
249+
err = protocol.ConvertError(err)
250+
if err != nil {
251+
if dm.s.log.V(6).Ok() {
252+
dm.s.log.V(6).Printf("[DEBUG] Failed to call get on"+
253+
" a replica owner: %s: %v", host, err)
254+
}
255+
return
230256
}
231-
}
232257

233-
v := &version{host: &host}
234-
e := dm.engine.NewEntry()
235-
e.Decode(value)
236-
v.entry = e
237-
versions = append(versions, v)
258+
v := &version{host: host}
259+
e := dm.engine.NewEntry()
260+
e.Decode(value)
261+
v.entry = e
262+
263+
mtx.Lock()
264+
versions = append(versions, v)
265+
mtx.Unlock()
266+
}(&replica)
238267
}
268+
wg.Wait()
239269
return versions
240270
}
241271

242272
// readRepair performs synchronization of inconsistent replicas by applying the
243273
// winning version to out-of-sync nodes.
244274
func (dm *DMap) readRepair(winner *version, versions []*version) {
275+
var wg sync.WaitGroup
245276
for _, value := range versions {
277+
278+
// Check the timestamp first, we apply the "last write wins" rule here.
246279
if value.entry != nil && winner.entry.Timestamp() == value.entry.Timestamp() {
247280
continue
248281
}
249282

250-
// Sync
251-
tmp := *value.host
252-
if tmp.CompareByID(dm.s.rt.This()) {
253-
hkey := partitions.HKey(dm.name, winner.entry.Key())
254-
part := dm.getPartitionByHKey(hkey, partitions.PRIMARY)
255-
f, err := dm.loadOrCreateFragment(part)
256-
if err != nil {
257-
dm.s.log.V(3).Printf("[ERROR] Failed to get or create the fragment for: %s on %s: %v",
258-
winner.entry.Key(), dm.name, err)
259-
return
260-
}
261-
262-
f.Lock()
263-
e := newEnv(context.Background())
264-
e.hkey = hkey
265-
e.fragment = f
266-
err = dm.putEntryOnFragment(e, winner.entry)
267-
if err != nil {
268-
dm.s.log.V(3).Printf("[ERROR] Failed to synchronize with replica: %v", err)
269-
}
270-
f.Unlock()
271-
} else {
272-
// If readRepair is enabled, this function is called by every GET request.
273-
cmd := protocol.NewPutEntry(dm.name, winner.entry.Key(), winner.entry.Encode()).Command(dm.s.ctx)
274-
rc := dm.s.client.Get(value.host.String())
275-
err := rc.Process(dm.s.ctx, cmd)
276-
if err != nil {
277-
dm.s.log.V(3).Printf("[ERROR] Failed to synchronize replica %s: %v", value.host, err)
278-
continue
283+
wg.Add(1)
284+
go func(v *version) {
285+
defer wg.Done()
286+
287+
// Sync
288+
tmp := *v.host
289+
if tmp.CompareByID(dm.s.rt.This()) {
290+
hkey := partitions.HKey(dm.name, winner.entry.Key())
291+
part := dm.getPartitionByHKey(hkey, partitions.PRIMARY)
292+
f, err := dm.loadOrCreateFragment(part)
293+
if err != nil {
294+
dm.s.log.V(3).Printf("[ERROR] Failed to get or create the fragment for: %s on %s: %v",
295+
winner.entry.Key(), dm.name, err)
296+
return
297+
}
298+
299+
f.Lock()
300+
e := newEnv(context.Background())
301+
e.hkey = hkey
302+
e.fragment = f
303+
err = dm.putEntryOnFragment(e, winner.entry)
304+
if err != nil {
305+
dm.s.log.V(3).Printf("[ERROR] Failed to synchronize with replica: %v", err)
306+
}
307+
f.Unlock()
308+
} else {
309+
// If readRepair is enabled, this function is called by every GET request.
310+
cmd := protocol.NewPutEntry(dm.name, winner.entry.Key(), winner.entry.Encode()).Command(dm.s.ctx)
311+
rc := dm.s.client.Get(v.host.String())
312+
err := rc.Process(dm.s.ctx, cmd)
313+
if err != nil {
314+
dm.s.log.V(3).Printf("[ERROR] Failed to synchronize replica %s: %v", v.host, err)
315+
return
316+
}
317+
err = cmd.Err()
318+
if err != nil {
319+
dm.s.log.V(3).Printf("[ERROR] Failed to synchronize replica %s: %v", v.host, err)
320+
}
279321
}
280-
err = cmd.Err()
281-
if err != nil {
282-
dm.s.log.V(3).Printf("[ERROR] Failed to synchronize replica %s: %v", value.host, err)
283-
}
284-
}
322+
}(value)
285323
}
324+
wg.Wait()
286325
}
287326

288327
// getOnCluster retrieves the storage.Entry for a given hashed key and key string
@@ -332,6 +371,7 @@ func (dm *DMap) getOnCluster(hkey uint64, key string) (storage.Entry, error) {
332371
func (dm *DMap) Get(ctx context.Context, key string) (storage.Entry, error) {
333372
hkey := partitions.HKey(dm.name, key)
334373
member := dm.s.primary.PartitionByHKey(hkey).Owner()
374+
335375
// We are on the partition owner
336376
if member.CompareByName(dm.s.rt.This()) {
337377
entry, err := dm.getOnCluster(hkey, key)

0 commit comments

Comments
 (0)