|
7 | 7 | "io" |
8 | 8 | "log" |
9 | 9 | "net/http" |
| 10 | + "time" |
10 | 11 |
|
11 | 12 | "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" |
12 | 13 | "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging" |
@@ -61,7 +62,9 @@ func InitDmsgWithFlags(ctx context.Context, dlog *logging.Logger, pk cipher.PubK |
61 | 62 | dlog.Infof("Received response from dmsg-discovery server %s/health:\n%s", flags.DmsgDiscURL, string(body)) |
62 | 63 | } |
63 | 64 |
|
64 | | - return StartDmsg(ctx, dlog, pk, sk, httpClient, flags.DmsgDiscURL, flags.DmsgSessions) |
| 65 | + // Use direct client with synthetic entries for discovery server and all dmsg servers |
| 66 | + // This allows dialing the discovery server which doesn't register itself |
| 67 | + return StartDmsgWithDirectClient(ctx, dlog, pk, sk, flags.DmsgSessions) |
65 | 68 | } |
66 | 69 |
|
67 | 70 | // Default dmsghttp mode |
@@ -111,7 +114,61 @@ func InitDmsgWithFlags(ctx context.Context, dlog *logging.Logger, pk cipher.PubK |
111 | 114 | dlog.Infof("Received response from dmsg-discovery server %s/health:\n%s", flags.DmsgDiscAddr, string(body)) |
112 | 115 | } |
113 | 116 |
|
114 | | - return StartDmsg(ctx, dlog, pk, sk, dmsgHTTP, flags.DmsgDiscAddr, flags.DmsgSessions) |
| 117 | + return StartDmsgWithSyntheticDiscovery(ctx, dlog, pk, sk, dmsgHTTP, flags.DmsgDiscAddr, flags.DmsgSessions) |
| 118 | +} |
| 119 | + |
| 120 | +// StartDmsgWithSyntheticDiscovery starts dmsg with a synthetic discovery entry for the discovery server itself |
| 121 | +func StartDmsgWithSyntheticDiscovery(ctx context.Context, dlog *logging.Logger, pk cipher.PubKey, sk cipher.SecKey, httpClient *http.Client, dmsgDisc string, dmsgSessions int) (dmsgC *dmsg.Client, stop func(), err error) { |
| 122 | + if dlog == nil { |
| 123 | + return nil, nil, fmt.Errorf("nil logger") |
| 124 | + } |
| 125 | + |
| 126 | + // Create base discovery client |
| 127 | + baseDiscClient := disc.NewHTTP(dmsgDisc, httpClient, dlog) |
| 128 | + |
| 129 | + // Wrap with caching client that includes synthetic entry for discovery server |
| 130 | + discPK := dmsg.ExtractPKFromDmsgAddr(dmsgDisc) |
| 131 | + if discPK != "" { |
| 132 | + var discoveryPK cipher.PubKey |
| 133 | + if err := discoveryPK.UnmarshalText([]byte(discPK)); err == nil { |
| 134 | + // Get all available dmsg servers as delegated servers |
| 135 | + var delegatedServers []cipher.PubKey |
| 136 | + for _, server := range dmsg.Prod.DmsgServers { |
| 137 | + delegatedServers = append(delegatedServers, server.Static) |
| 138 | + } |
| 139 | + syntheticEntry := &disc.Entry{ |
| 140 | + Static: discoveryPK, |
| 141 | + Client: &disc.Client{ |
| 142 | + DelegatedServers: delegatedServers, |
| 143 | + }, |
| 144 | + } |
| 145 | + baseDiscClient = newCachingDiscClient(baseDiscClient, syntheticEntry, dlog) |
| 146 | + dlog.Debug("Created synthetic discovery entry for dialing") |
| 147 | + } |
| 148 | + } |
| 149 | + |
| 150 | + dmsgC = dmsg.NewClient(pk, sk, baseDiscClient, &dmsg.Config{MinSessions: dmsgSessions}) |
| 151 | + dlog.Debug("Created dmsg client.") |
| 152 | + |
| 153 | + go dmsgC.Serve(ctx) |
| 154 | + dlog.Debug("dmsgclient.Serve(ctx)") |
| 155 | + |
| 156 | + stop = func() { |
| 157 | + err := dmsgC.Close() |
| 158 | + dlog.WithError(err).Debug("Disconnected from dmsg network.\n") |
| 159 | + log.Println() |
| 160 | + } |
| 161 | + dlog.WithField("dmsg_disc", dmsgDisc).Debug("Connecting to dmsg network...\n") |
| 162 | + dlog.WithField("client public_key", pk.String()).Debug("\n") |
| 163 | + select { |
| 164 | + case <-ctx.Done(): |
| 165 | + stop() |
| 166 | + return nil, nil, ctx.Err() |
| 167 | + |
| 168 | + case <-dmsgC.Ready(): |
| 169 | + dlog.Debug("Dmsg network ready.") |
| 170 | + return dmsgC, stop, nil |
| 171 | + } |
115 | 172 | } |
116 | 173 |
|
117 | 174 | // StartDmsg starts dmsg returns a dmsg client for the given dmsg discovery |
@@ -203,18 +260,159 @@ func StartDmsgDirectWithServers(ctx context.Context, dlog *logging.Logger, pk ci |
203 | 260 | } |
204 | 261 | if dmsgDiscAddr != "" { |
205 | 262 | // Validate that we can access discovery over DMSG |
| 263 | + // Retry with exponential backoff to handle session initialization timing |
206 | 264 | dmsgHTTP := &http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} |
207 | | - resp, err := dmsgHTTP.Get(dmsgDiscAddr + "/health") |
| 265 | + var resp *http.Response |
| 266 | + maxRetries := 5 |
| 267 | + for i := 0; i < maxRetries; i++ { |
| 268 | + resp, err = dmsgHTTP.Get(dmsgDiscAddr + "/health") |
| 269 | + if err == nil { |
| 270 | + resp.Body.Close() //nolint |
| 271 | + break |
| 272 | + } |
| 273 | + if i < maxRetries-1 { |
| 274 | + backoff := time.Duration(200*(i+1)) * time.Millisecond |
| 275 | + dlog.WithError(err).Debugf("Failed to reach discovery, retrying in %v (attempt %d/%d)", backoff, i+1, maxRetries) |
| 276 | + time.Sleep(backoff) |
| 277 | + } |
| 278 | + } |
208 | 279 | if err != nil { |
209 | 280 | stop() // Cleanup if validation fails |
210 | 281 | return nil, nil, fmt.Errorf("failed to reach discovery server via DMSG: %w", err) |
211 | 282 | } |
212 | | - resp.Body.Close() //nolint |
213 | 283 | } |
214 | 284 |
|
215 | 285 | return dmsgC, stop, nil |
216 | 286 | } |
217 | 287 |
|
| 288 | +// StartDmsgWithDirectClient starts dmsg with a direct client that includes synthetic entries |
| 289 | +// This allows dialing any client including the discovery server which doesn't register itself |
| 290 | +func StartDmsgWithDirectClient(ctx context.Context, dlog *logging.Logger, pk cipher.PubKey, sk cipher.SecKey, dmsgSessions int) (dmsgC *dmsg.Client, stop func(), err error) { |
| 291 | + if dlog == nil { |
| 292 | + return nil, nil, fmt.Errorf("nil logger") |
| 293 | + } |
| 294 | + |
| 295 | + // Build entries for all dmsg servers |
| 296 | + var entries []*disc.Entry |
| 297 | + for _, server := range dmsg.Prod.DmsgServers { |
| 298 | + entries = append(entries, &server) |
| 299 | + } |
| 300 | + |
| 301 | + // Add synthetic entry for discovery server |
| 302 | + discPK := dmsg.ExtractPKFromDmsgAddr(flags.DmsgDiscAddr) |
| 303 | + if discPK != "" { |
| 304 | + var discoveryPK cipher.PubKey |
| 305 | + if err := discoveryPK.UnmarshalText([]byte(discPK)); err == nil { |
| 306 | + var delegatedServers []cipher.PubKey |
| 307 | + for _, server := range dmsg.Prod.DmsgServers { |
| 308 | + delegatedServers = append(delegatedServers, server.Static) |
| 309 | + } |
| 310 | + discoveryEntry := &disc.Entry{ |
| 311 | + Static: discoveryPK, |
| 312 | + Client: &disc.Client{ |
| 313 | + DelegatedServers: delegatedServers, |
| 314 | + }, |
| 315 | + } |
| 316 | + entries = append(entries, discoveryEntry) |
| 317 | + dlog.Debug("Added synthetic discovery entry to direct client") |
| 318 | + } |
| 319 | + } |
| 320 | + |
| 321 | + // Add synthetic entry for our own client |
| 322 | + var delegatedServers []cipher.PubKey |
| 323 | + for _, server := range dmsg.Prod.DmsgServers { |
| 324 | + delegatedServers = append(delegatedServers, server.Static) |
| 325 | + } |
| 326 | + clientEntry := &disc.Entry{ |
| 327 | + Static: pk, |
| 328 | + Client: &disc.Client{ |
| 329 | + DelegatedServers: delegatedServers, |
| 330 | + }, |
| 331 | + } |
| 332 | + entries = append(entries, clientEntry) |
| 333 | + |
| 334 | + // Create direct client with all entries |
| 335 | + directClient := direct.NewClient(entries, dlog) |
| 336 | + |
| 337 | + dmsgC = dmsg.NewClient(pk, sk, directClient, &dmsg.Config{MinSessions: dmsgSessions}) |
| 338 | + dlog.Debug("Created dmsg client with direct client.") |
| 339 | + |
| 340 | + go dmsgC.Serve(ctx) |
| 341 | + dlog.Debug("dmsgclient.Serve(ctx)") |
| 342 | + |
| 343 | + stop = func() { |
| 344 | + err := dmsgC.Close() |
| 345 | + dlog.WithError(err).Debug("Disconnected from dmsg network.\n") |
| 346 | + log.Println() |
| 347 | + } |
| 348 | + dlog.Debug("Connecting to dmsg network...\n") |
| 349 | + dlog.WithField("client public_key", pk.String()).Debug("\n") |
| 350 | + select { |
| 351 | + case <-ctx.Done(): |
| 352 | + stop() |
| 353 | + return nil, nil, ctx.Err() |
| 354 | + |
| 355 | + case <-dmsgC.Ready(): |
| 356 | + dlog.Debug("Dmsg network ready.") |
| 357 | + return dmsgC, stop, nil |
| 358 | + } |
| 359 | +} |
| 360 | + |
| 361 | +// cachingDiscClient wraps a discovery client and caches a synthetic entry |
| 362 | +type cachingDiscClient struct { |
| 363 | + base disc.APIClient |
| 364 | + syntheticEntry *disc.Entry |
| 365 | + log *logging.Logger |
| 366 | +} |
| 367 | + |
| 368 | +// newCachingDiscClient creates a discovery client that caches a synthetic entry |
| 369 | +func newCachingDiscClient(base disc.APIClient, syntheticEntry *disc.Entry, log *logging.Logger) disc.APIClient { |
| 370 | + return &cachingDiscClient{ |
| 371 | + base: base, |
| 372 | + syntheticEntry: syntheticEntry, |
| 373 | + log: log, |
| 374 | + } |
| 375 | +} |
| 376 | + |
| 377 | +// Entry returns the synthetic entry if PK matches, otherwise queries base client |
| 378 | +func (c *cachingDiscClient) Entry(ctx context.Context, pk cipher.PubKey) (*disc.Entry, error) { |
| 379 | + if c.syntheticEntry != nil && c.syntheticEntry.Static == pk { |
| 380 | + c.log.WithField("pk", pk.String()).Debug("Returning synthetic discovery entry") |
| 381 | + return c.syntheticEntry, nil |
| 382 | + } |
| 383 | + return c.base.Entry(ctx, pk) |
| 384 | +} |
| 385 | + |
| 386 | +// PostEntry delegates to base client |
| 387 | +func (c *cachingDiscClient) PostEntry(ctx context.Context, entry *disc.Entry) error { |
| 388 | + return c.base.PostEntry(ctx, entry) |
| 389 | +} |
| 390 | + |
| 391 | +// PutEntry delegates to base client |
| 392 | +func (c *cachingDiscClient) PutEntry(ctx context.Context, sk cipher.SecKey, entry *disc.Entry) error { |
| 393 | + return c.base.PutEntry(ctx, sk, entry) |
| 394 | +} |
| 395 | + |
| 396 | +// DelEntry delegates to base client |
| 397 | +func (c *cachingDiscClient) DelEntry(ctx context.Context, entry *disc.Entry) error { |
| 398 | + return c.base.DelEntry(ctx, entry) |
| 399 | +} |
| 400 | + |
| 401 | +// AvailableServers delegates to base client |
| 402 | +func (c *cachingDiscClient) AvailableServers(ctx context.Context) ([]*disc.Entry, error) { |
| 403 | + return c.base.AvailableServers(ctx) |
| 404 | +} |
| 405 | + |
| 406 | +// AllServers delegates to base client |
| 407 | +func (c *cachingDiscClient) AllServers(ctx context.Context) ([]*disc.Entry, error) { |
| 408 | + return c.base.AllServers(ctx) |
| 409 | +} |
| 410 | + |
| 411 | +// AllEntries delegates to base client |
| 412 | +func (c *cachingDiscClient) AllEntries(ctx context.Context) ([]string, error) { |
| 413 | + return c.base.AllEntries(ctx) |
| 414 | +} |
| 415 | + |
218 | 416 | // FallbackRoundTripper tries multiple DMSG transports until one succeeds. |
219 | 417 | type FallbackRoundTripper struct { |
220 | 418 | ctx context.Context |
|
0 commit comments