Skip to content

Commit 003bc5c

Browse files
authored
Merge pull request #74 from opensciencegrid/copilot/sub-pr-73
Fix unbounded goroutines in EnrichRecordAsync via callback-based DNS enrichment
2 parents c3911e6 + fde6aec commit 003bc5c

2 files changed

Lines changed: 135 additions & 47 deletions

File tree

collector/correlator.go

Lines changed: 68 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ type PathInfo struct {
118118

119119
// dnsEnrichmentRequest represents a request to enrich a record with DNS lookup
120120
type dnsEnrichmentRequest struct {
121-
ip string
122-
resultChan chan dnsEnrichmentResult
121+
ip string
122+
callback func(dnsEnrichmentResult) // invoked by the worker goroutine when the lookup completes
123123
}
124124

125125
// dnsEnrichmentResult contains the result of a DNS lookup
@@ -262,11 +262,8 @@ func (c *Correlator) processDNSRequest(req dnsEnrichmentRequest) {
262262
c.dnsCache.Set(req.ip, hostname)
263263
}
264264

265-
// Send result back (non-blocking with context check)
266-
select {
267-
case req.resultChan <- result:
268-
case <-c.ctx.Done():
269-
}
265+
// Invoke the callback directly in the worker goroutine (no channel needed)
266+
req.callback(result)
270267
}
271268

272269
// performDNSLookup does the actual reverse DNS lookup
@@ -292,7 +289,7 @@ func (c *Correlator) performDNSLookup(ctx context.Context, ipStr string) string
292289

293290
// enrichWithDNSSync performs synchronous DNS enrichment for an IP address
294291
// Only returns hostname if it's already in cache (fast path)
295-
// Returns (hostname, needsAsync) where needsAsync=true means caller should use enrichWithDNSBlocking
292+
// Returns (hostname, needsAsync) where needsAsync=true means caller should use submitDNSCallback
296293
func (c *Correlator) enrichWithDNSSync(ipStr string) (string, bool) {
297294
if !c.enableDNSEnrichment || ipStr == "" {
298295
return "", false
@@ -311,36 +308,50 @@ func (c *Correlator) enrichWithDNSSync(ipStr string) (string, bool) {
311308
return "", true
312309
}
313310

314-
// enrichWithDNSBlocking sends a DNS lookup request to the worker pool and blocks until the result is ready
315-
func (c *Correlator) enrichWithDNSBlocking(ipStr string) string {
316-
if !c.enableDNSEnrichment || ipStr == "" {
317-
return ""
318-
}
319-
320-
resultChan := make(chan dnsEnrichmentResult, 1)
311+
// submitDNSCallback submits a DNS lookup to the worker pool with a callback that is
312+
// invoked by the worker goroutine when the lookup completes. It is non-blocking: if
313+
// the request channel has capacity the request is sent immediately and the function
314+
// returns; if the channel is full a short-lived goroutine is spawned that blocks only
315+
// until a slot becomes available (or the context is cancelled), but never waits for
316+
// the DNS result itself. This keeps the number of long-lived waiting goroutines bounded
317+
// by the worker pool size rather than the number of in-flight records.
318+
func (c *Correlator) submitDNSCallback(ipStr string, callback func(dnsEnrichmentResult)) {
321319
req := dnsEnrichmentRequest{
322-
ip: ipStr,
323-
resultChan: resultChan,
320+
ip: ipStr,
321+
callback: callback,
324322
}
325323

326-
// Send request to worker pool (with timeout).
327-
// Use NewTimer instead of time.After so the timer can be stopped
328-
// immediately when the send succeeds, avoiding timer accumulation.
329-
queueTimer := time.NewTimer(c.dnsTimeout)
330324
select {
331325
case c.dnsRequestChan <- req:
332-
queueTimer.Stop()
333-
case <-queueTimer.C:
334-
c.logger.Warnf("DNS enrichment queue timeout for %s", ipStr)
335-
return ""
336-
case <-c.ctx.Done():
337-
queueTimer.Stop()
326+
// Fast path: submitted immediately
327+
default:
328+
// Queue is full; spawn a minimal goroutine that only blocks until a slot
329+
// opens up, not until the DNS result is ready.
330+
go func() {
331+
select {
332+
case c.dnsRequestChan <- req:
333+
case <-c.ctx.Done():
334+
callback(dnsEnrichmentResult{ip: ipStr})
335+
}
336+
}()
337+
}
338+
}
339+
340+
// enrichWithDNSBlocking sends a DNS lookup request to the worker pool and blocks until the result is ready.
341+
// It is implemented on top of submitDNSCallback and is used in tests and as a convenience for
342+
// callers that can tolerate blocking (e.g. tests in the same package).
343+
func (c *Correlator) enrichWithDNSBlocking(ipStr string) string {
344+
if !c.enableDNSEnrichment || ipStr == "" {
338345
return ""
339346
}
340347

348+
resultChan := make(chan dnsEnrichmentResult, 1)
349+
c.submitDNSCallback(ipStr, func(result dnsEnrichmentResult) {
350+
resultChan <- result
351+
})
352+
341353
// Wait for result. Use 2x the DNS timeout to account for both
342354
// queuing delay and the actual DNS lookup performed by the worker.
343-
// Use NewTimer so it can be stopped as soon as the result arrives.
344355
resultTimer := time.NewTimer(c.dnsTimeout * 2)
345356
select {
346357
case result := <-resultChan:
@@ -355,35 +366,45 @@ func (c *Correlator) enrichWithDNSBlocking(ipStr string) string {
355366
}
356367
}
357368

358-
// EnrichRecordAsync enriches a record with DNS and calls the callback
359-
// This is a helper for callers who want to handle async enrichment
360-
// Spawns a goroutine, returns immediately (non-blocking)
369+
// EnrichRecordAsync enriches a record with DNS and calls the callback when complete.
370+
// It returns immediately without spawning a goroutine that waits for DNS results;
371+
// instead, callbacks are chained through the DNS worker goroutines. This keeps
372+
// concurrency bounded by the DNS worker pool size regardless of the number of
373+
// in-flight records.
361374
func (c *Correlator) EnrichRecordAsync(record *CollectorRecord, callback func(*CollectorRecord)) {
362375
if !record.needsDNSEnrichment && !record.needsServerDNS {
363376
callback(record)
364377
return
365378
}
366379

367-
go func() {
368-
if record.needsDNSEnrichment {
369-
hostname := c.enrichWithDNSBlocking(record.enrichmentIP)
370-
if hostname != "" {
371-
parts := strings.Split(hostname, ".")
372-
if len(parts) >= 2 {
373-
record.UserDomain = strings.Join(parts[len(parts)-2:], ".")
380+
// Server DNS step: submitted after user DNS completes (or directly if no user DNS needed).
381+
doServerDNS := func() {
382+
if record.needsServerDNS {
383+
c.submitDNSCallback(record.serverEnrichmentIP, func(result dnsEnrichmentResult) {
384+
if result.hostname != "" {
385+
record.ServerHostname = result.hostname
374386
}
375-
}
387+
callback(record)
388+
})
389+
} else {
390+
callback(record)
376391
}
392+
}
377393

378-
if record.needsServerDNS {
379-
hostname := c.enrichWithDNSBlocking(record.serverEnrichmentIP)
380-
if hostname != "" {
381-
record.ServerHostname = hostname
394+
// User DNS step: submitted first; on completion, triggers the server DNS step.
395+
if record.needsDNSEnrichment {
396+
c.submitDNSCallback(record.enrichmentIP, func(result dnsEnrichmentResult) {
397+
if result.hostname != "" {
398+
parts := strings.Split(result.hostname, ".")
399+
if len(parts) >= 2 {
400+
record.UserDomain = strings.Join(parts[len(parts)-2:], ".")
401+
}
382402
}
383-
}
384-
385-
callback(record)
386-
}()
403+
doServerDNS()
404+
})
405+
} else {
406+
doServerDNS()
407+
}
387408
}
388409

389410
// NeedsDNSEnrichment returns true if the record needs async DNS enrichment

collector/dns_enrichment_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,3 +529,70 @@ func TestDNSEnrichment_InvalidIP(t *testing.T) {
529529
result := c.enrichWithDNSBlocking("not-an-ip")
530530
assert.Equal(t, "", result, "Invalid IP should return empty hostname")
531531
}
532+
533+
// TestEnrichRecordAsync_BoundedConcurrency verifies that EnrichRecordAsync does not
534+
// spawn goroutines that wait for DNS results. All callbacks must be invoked even
535+
// when many records are submitted concurrently.
536+
func TestEnrichRecordAsync_BoundedConcurrency(t *testing.T) {
537+
logger := logrus.New()
538+
logger.SetLevel(logrus.ErrorLevel)
539+
540+
const numWorkers = 3
541+
const numRecords = 50
542+
543+
// Slow resolver so workers are occupied during the burst
544+
ready := make(chan struct{})
545+
config := CorrelatorConfig{
546+
TTL: 5 * time.Minute,
547+
MaxEntries: 1000,
548+
EnableDNSEnrichment: true,
549+
DNSCacheTTL: 1 * time.Hour,
550+
DNSWorkers: numWorkers,
551+
DNSTimeout: 5 * time.Second,
552+
Logger: logger,
553+
}
554+
555+
c := NewCorrelatorWithConfig(config)
556+
defer c.Stop()
557+
558+
c.dnsResolver = &mockDNSResolver{
559+
lookupFunc: func(ctx context.Context, addr string) ([]string, error) {
560+
// Block until all records have been submitted so the queue is full
561+
// during submission, exercising the non-blocking fallback path.
562+
select {
563+
case <-ready:
564+
case <-ctx.Done():
565+
return nil, ctx.Err()
566+
}
567+
return []string{"host.example.com."}, nil
568+
},
569+
}
570+
571+
results := make(chan *CollectorRecord, numRecords)
572+
573+
for i := 0; i < numRecords; i++ {
574+
record := &CollectorRecord{
575+
needsDNSEnrichment: true,
576+
enrichmentIP: "192.0.2." + strconv.Itoa(i%200),
577+
}
578+
c.EnrichRecordAsync(record, func(r *CollectorRecord) {
579+
results <- r
580+
})
581+
}
582+
583+
// Signal the resolver to proceed
584+
close(ready)
585+
586+
// All callbacks must be invoked within a generous timeout
587+
received := 0
588+
timeout := time.After(10 * time.Second)
589+
for received < numRecords {
590+
select {
591+
case <-results:
592+
received++
593+
case <-timeout:
594+
t.Fatalf("Only %d/%d callbacks received before timeout", received, numRecords)
595+
}
596+
}
597+
assert.Equal(t, numRecords, received)
598+
}

0 commit comments

Comments
 (0)