Skip to content

Commit 25f2ccd

Browse files
authored
Merge pull request #199 from mrpalide/feat/server-specific-dmsg-server
Selected DMSG Server
2 parents d2a53ff + e11aabc commit 25f2ccd

File tree

1 file changed

+34
-10
lines changed

1 file changed

+34
-10
lines changed

pkg/dmsg/client.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,16 +155,36 @@ func (ce *Client) Serve(ctx context.Context) {
155155
if isClosed(ce.done) {
156156
return
157157
}
158-
158+
var entries []*disc.Entry
159+
var err error
159160
ce.log.Debug("Discovering dmsg servers...")
160-
entries, err := ce.discoverServers(cancellabelCtx)
161-
if err != nil {
162-
ce.log.WithError(err).Warn("Failed to discover dmsg servers.")
163-
if err == context.Canceled || err == context.DeadlineExceeded {
164-
return
161+
if ctx.Value("dmsgServer") != nil {
162+
entries, err = ce.discoverServers(cancellabelCtx, true)
163+
if err != nil {
164+
ce.log.WithError(err).Warn("Failed to discover dmsg servers.")
165+
if err == context.Canceled || err == context.DeadlineExceeded {
166+
return
167+
}
168+
ce.serveWait()
169+
continue
170+
}
171+
172+
for ind, entry := range entries {
173+
if entry.Static.Hex() == ctx.Value("dmsgServer").(string) {
174+
entries = entries[ind : ind+1]
175+
}
176+
}
177+
} else {
178+
entries, err = ce.discoverServers(cancellabelCtx, false)
179+
180+
if err != nil {
181+
ce.log.WithError(err).Warn("Failed to discover dmsg servers.")
182+
if err == context.Canceled || err == context.DeadlineExceeded {
183+
return
184+
}
185+
ce.serveWait()
186+
continue
165187
}
166-
ce.serveWait()
167-
continue
168188
}
169189
if len(entries) == 0 {
170190
ce.log.Warnf("No entries found. Retrying after %s...", ce.bo.String())
@@ -228,9 +248,13 @@ func (ce *Client) Ready() <-chan struct{} {
228248
return ce.ready
229249
}
230250

231-
func (ce *Client) discoverServers(ctx context.Context) (entries []*disc.Entry, err error) {
251+
func (ce *Client) discoverServers(ctx context.Context, all bool) (entries []*disc.Entry, err error) {
232252
err = netutil.NewDefaultRetrier(ce.log).Do(ctx, func() error {
233-
entries, err = ce.dc.AvailableServers(ctx)
253+
if all {
254+
entries, err = ce.dc.AllServers(ctx)
255+
} else {
256+
entries, err = ce.dc.AvailableServers(ctx)
257+
}
234258
return err
235259
})
236260
return entries, err

0 commit comments

Comments
 (0)