Skip to content

Commit 60ec04f

Browse files
authored
Merge pull request #75 from opensciencegrid/copilot/sub-pr-73
Address PR review feedback: config validation, test race safety, and documentation
2 parents 003bc5c + 1422f23 commit 60ec04f

6 files changed

Lines changed: 59 additions & 32 deletions

File tree

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,12 @@ See [config-collector.yaml](config/config-collector.yaml) for a complete example
189189
* `SHOVELER_INPUT_FOLLOW` - Follow file mode like tail: `true` or `false`
190190

191191
**State Management (Collector Mode):**
192-
* `SHOVELER_STATE_ENTRY_TTL` - TTL for state entries in seconds (default: `300`)
193-
* `SHOVELER_STATE_MAX_ENTRIES` - Maximum state entries, 0 for unlimited (default: `0`)
192+
* `COLLECTOR_STATE_ENTRY_TTL` - TTL for state entries in seconds (default: `300`)
193+
* `COLLECTOR_STATE_MAX_ENTRIES` - Maximum state entries, 0 for unlimited (default: `0`)
194+
* `COLLECTOR_STATE_ENABLE_DNS_ENRICHMENT` - Enable DNS enrichment of monitoring records: `true` or `false` (default: `false`)
195+
* `COLLECTOR_STATE_DNS_CACHE_TTL` - DNS cache time-to-live in seconds (default: `3600`)
196+
* `COLLECTOR_STATE_DNS_WORKERS` - Number of concurrent DNS worker routines (default: `5`)
197+
* `COLLECTOR_STATE_DNS_TIMEOUT` - DNS query timeout in seconds (default: `2`)
194198

195199
**Output Configuration (Collector Mode):**
196200
* `SHOVELER_OUTPUT_TYPE` - Output destination: `mq`, `file`, or `both` (default: `mq`)

collector/correlator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,14 +187,14 @@ func NewCorrelatorWithConfig(config CorrelatorConfig) *Correlator {
187187
config.Logger = logrus.New()
188188
}
189189

190-
// Set DNS enrichment defaults
191-
if config.DNSCacheTTL == 0 {
190+
// Set DNS enrichment defaults (treat non-positive values as unset)
191+
if config.DNSCacheTTL <= 0 {
192192
config.DNSCacheTTL = 1 * time.Hour // Default 1 hour cache
193193
}
194-
if config.DNSWorkers == 0 {
194+
if config.DNSWorkers <= 0 {
195195
config.DNSWorkers = 5 // Default 5 concurrent DNS lookups
196196
}
197-
if config.DNSTimeout == 0 {
197+
if config.DNSTimeout <= 0 {
198198
config.DNSTimeout = 2 * time.Second // Default 2 second timeout
199199
}
200200

collector/dns_enrichment_test.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"strconv"
7+
"sync/atomic"
78
"testing"
89
"time"
910

@@ -15,11 +16,11 @@ import (
1516
// mockDNSResolver implements DNSResolver for testing
1617
type mockDNSResolver struct {
1718
lookupFunc func(ctx context.Context, addr string) ([]string, error)
18-
lookupCount int
19+
lookupCount atomic.Int64
1920
}
2021

2122
func (m *mockDNSResolver) LookupAddr(ctx context.Context, addr string) ([]string, error) {
22-
m.lookupCount++
23+
m.lookupCount.Add(1)
2324
if m.lookupFunc != nil {
2425
return m.lookupFunc(ctx, addr)
2526
}
@@ -84,7 +85,7 @@ func TestDNSEnrichment_CacheHit(t *testing.T) {
8485
assert.False(t, needsAsync, "Should not need async on cache hit")
8586

8687
// Verify no DNS lookup was performed (cache hit)
87-
assert.Equal(t, 0, mockResolver.lookupCount, "DNS lookup should not be called on cache hit")
88+
assert.Equal(t, int64(0), mockResolver.lookupCount.Load(), "DNS lookup should not be called on cache hit")
8889
}
8990

9091
// TestDNSEnrichment_CacheMiss_Success tests cache miss with successful lookup
@@ -122,19 +123,19 @@ func TestDNSEnrichment_CacheMiss_Success(t *testing.T) {
122123
assert.Equal(t, "resolved.example.com", result, "Should return resolved hostname")
123124

124125
// Verify DNS lookup was performed
125-
assert.Equal(t, 1, mockResolver.lookupCount, "DNS lookup should be called on cache miss")
126+
assert.Equal(t, int64(1), mockResolver.lookupCount.Load(), "DNS lookup should be called on cache miss")
126127

127128
// Verify result was cached
128129
val, exists := c.dnsCache.Get("192.0.2.2")
129130
assert.True(t, exists, "Result should be cached")
130131
assert.Equal(t, "resolved.example.com", val.(string), "Cached value should match resolved hostname")
131132

132133
// Second call should hit cache (no additional lookup)
133-
mockResolver.lookupCount = 0
134+
mockResolver.lookupCount.Store(0)
134135
hostname2, needsAsync2 := c.enrichWithDNSSync("192.0.2.2")
135136
assert.Equal(t, "resolved.example.com", hostname2, "Should return cached hostname")
136137
assert.False(t, needsAsync2, "Should not need async on cache hit")
137-
assert.Equal(t, 0, mockResolver.lookupCount, "Second call should hit cache")
138+
assert.Equal(t, int64(0), mockResolver.lookupCount.Load(), "Second call should hit cache")
138139
}
139140

140141
// TestDNSEnrichment_CacheMiss_Timeout tests cache miss with timeout
@@ -173,7 +174,7 @@ func TestDNSEnrichment_CacheMiss_Timeout(t *testing.T) {
173174
assert.Equal(t, "", result, "Should return empty on timeout")
174175

175176
// Verify DNS lookup was attempted
176-
assert.Equal(t, 1, mockResolver.lookupCount, "DNS lookup should be attempted")
177+
assert.Equal(t, int64(1), mockResolver.lookupCount.Load(), "DNS lookup should be attempted")
177178
}
178179

179180
// TestDNSEnrichment_CacheMiss_Failure tests cache miss with DNS failure
@@ -206,7 +207,7 @@ func TestDNSEnrichment_CacheMiss_Failure(t *testing.T) {
206207
assert.Equal(t, "", result, "Should return empty on DNS failure")
207208

208209
// Verify DNS lookup was attempted
209-
assert.Equal(t, 1, mockResolver.lookupCount, "DNS lookup should be attempted")
210+
assert.Equal(t, int64(1), mockResolver.lookupCount.Load(), "DNS lookup should be attempted")
210211
}
211212

212213
// TestDNSEnrichment_CacheTTL tests that cache entries expire after TTL
@@ -239,22 +240,27 @@ func TestDNSEnrichment_CacheTTL(t *testing.T) {
239240
// First lookup - cache miss (blocking)
240241
result1 := c.enrichWithDNSBlocking("192.0.2.5")
241242
assert.Equal(t, "ttl-test.example.com", result1)
242-
assert.Equal(t, 1, mockResolver.lookupCount)
243+
assert.Equal(t, int64(1), mockResolver.lookupCount.Load())
243244

244245
// Immediate second lookup - cache hit
245-
mockResolver.lookupCount = 0
246+
mockResolver.lookupCount.Store(0)
246247
hostname2, needsAsync2 := c.enrichWithDNSSync("192.0.2.5")
247248
assert.Equal(t, "ttl-test.example.com", hostname2)
248249
assert.False(t, needsAsync2, "Should hit cache")
249-
assert.Equal(t, 0, mockResolver.lookupCount, "Should hit cache")
250+
assert.Equal(t, int64(0), mockResolver.lookupCount.Load(), "Should hit cache")
250251

251-
// Wait for cache entry to expire
252-
time.Sleep(600 * time.Millisecond)
252+
// Wait for cache entry to expire and verify a new lookup occurs
253+
assert.Eventually(t, func() bool {
254+
mockResolver.lookupCount.Store(0)
255+
_, needsAsync := c.enrichWithDNSSync("192.0.2.5")
256+
return needsAsync
257+
}, 5*time.Second, 100*time.Millisecond, "Cache entry should eventually expire")
253258

254-
// Third lookup after expiry - cache miss again
259+
// Now do a blocking lookup to get fresh result
260+
mockResolver.lookupCount.Store(0)
255261
result3 := c.enrichWithDNSBlocking("192.0.2.5")
256262
assert.Equal(t, "ttl-test.example.com", result3)
257-
assert.Equal(t, 1, mockResolver.lookupCount, "Should do new lookup after TTL expiry")
263+
assert.Equal(t, int64(1), mockResolver.lookupCount.Load(), "Should do new lookup after TTL expiry")
258264
}
259265

260266
// TestDNSEnrichment_Concurrency tests multiple concurrent DNS lookups
@@ -306,7 +312,7 @@ func TestDNSEnrichment_Concurrency(t *testing.T) {
306312

307313
// All requests should succeed (workers handle them concurrently)
308314
assert.Equal(t, numRequests, successCount, "All concurrent lookups should succeed")
309-
assert.GreaterOrEqual(t, mockResolver.lookupCount, numRequests, "All lookups should be performed")
315+
assert.GreaterOrEqual(t, mockResolver.lookupCount.Load(), int64(numRequests), "All lookups should be performed")
310316
}
311317

312318
// TestDNSEnrichment_Integration tests full integration with record creation
@@ -426,7 +432,7 @@ func TestDNSEnrichment_Integration(t *testing.T) {
426432
assert.Equal(t, "university.edu", record.UserDomain, "User domain should be extracted from enriched hostname")
427433

428434
// Verify DNS lookup was performed (once for user IP, once for server IP)
429-
assert.Equal(t, 2, mockResolver.lookupCount, "DNS lookup should be performed for user and server IPs")
435+
assert.Equal(t, int64(2), mockResolver.lookupCount.Load(), "DNS lookup should be performed for user and server IPs")
430436

431437
// Verify result was cached for user IP
432438
val, exists := c.dnsCache.Get("192.0.2.10")

collector/integration_dns_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package collector
33
import (
44
"context"
55
"net"
6-
"sync"
76
"testing"
87
"time"
98

@@ -136,26 +135,30 @@ func TestEndToEnd_DNSEnrichmentFlow(t *testing.T) {
136135
// Verify the record needs DNS enrichment
137136
assert.True(t, record.NeedsDNSEnrichment(), "Record should need DNS enrichment")
138137

139-
// Simulate what the main.go publish loop does
140-
var wg sync.WaitGroup
141-
wg.Add(1)
138+
// Simulate what the main.go publish loop does with a timeout to avoid hanging
139+
doneCh := make(chan struct{})
142140

143141
var enrichedRecord *CollectorRecord
144142
correlator.EnrichRecordAsync(record, func(r *CollectorRecord) {
145143
enrichedRecord = r
146-
wg.Done()
144+
close(doneCh)
147145
})
148146

149-
// Wait for enrichment to complete
150-
wg.Wait()
147+
// Wait for enrichment to complete with timeout
148+
select {
149+
case <-doneCh:
150+
// Success
151+
case <-time.After(10 * time.Second):
152+
t.Fatal("Timeout waiting for DNS enrichment to complete")
153+
}
151154

152155
// Verify the enriched record has the user domain set
153156
assert.NotNil(t, enrichedRecord)
154157
assert.Equal(t, "example.com", enrichedRecord.UserDomain, "UserDomain should be extracted from DNS result")
155158
assert.Equal(t, "server.example.com", enrichedRecord.ServerHostname, "ServerHostname should be resolved via DNS")
156159

157160
// Verify DNS was called for both user IP and server IP
158-
assert.Equal(t, 2, mockResolver.lookupCount, "DNS resolver should have been called twice")
161+
assert.Equal(t, int64(2), mockResolver.lookupCount.Load(), "DNS resolver should have been called twice")
159162
}
160163

161164
// TestEndToEnd_DNSEnrichmentCacheHit tests that cached DNS results don't trigger async enrichment
@@ -280,5 +283,5 @@ func TestEndToEnd_DNSEnrichmentCacheHit(t *testing.T) {
280283
assert.Equal(t, "server.example.com", record.ServerHostname, "ServerHostname should be set from cache")
281284

282285
// Verify DNS was NOT called (cache hit)
283-
assert.Equal(t, 0, mockResolver.lookupCount, "DNS resolver should NOT have been called (cache hit)")
286+
assert.Equal(t, int64(0), mockResolver.lookupCount.Load(), "DNS resolver should NOT have been called (cache hit)")
284287
}

config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,19 @@ func (c *Config) ReadConfigWithPathAndPrefix(configPath string, envPrefix string
145145
c.State.EnableDNSEnrichment = viper.GetBool("state.enable_dns_enrichment")
146146
viper.SetDefault("state.dns_cache_ttl", 3600) // 1 hour default
147147
c.State.DNSCacheTTL = viper.GetInt("state.dns_cache_ttl")
148+
if c.State.DNSCacheTTL <= 0 {
149+
c.State.DNSCacheTTL = 3600
150+
}
148151
viper.SetDefault("state.dns_workers", 5) // 5 workers default
149152
c.State.DNSWorkers = viper.GetInt("state.dns_workers")
153+
if c.State.DNSWorkers <= 0 {
154+
c.State.DNSWorkers = 5
155+
}
150156
viper.SetDefault("state.dns_timeout", 2) // 2 seconds default
151157
c.State.DNSTimeout = viper.GetInt("state.dns_timeout")
158+
if c.State.DNSTimeout <= 0 {
159+
c.State.DNSTimeout = 2
160+
}
152161

153162
// Output configuration (for collector mode)
154163
viper.SetDefault("output.type", "mq") // message queue by default

config/config-collector.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ state:
2222
entry_ttl: 300
2323
# Maximum number of state entries (0 for unlimited)
2424
max_entries: 10000
25+
# DNS enrichment settings (replaces reverse DNS lookup)
26+
# enable_dns_enrichment: false
27+
# dns_cache_ttl: 3600 # Time-to-live for DNS cache entries in seconds
28+
# dns_workers: 5 # Number of worker goroutines for DNS lookups
29+
# dns_timeout: 2 # DNS lookup timeout in seconds
2530

2631
# Select which protocol to use in order to connect to the MQ
2732
# mq: amqp/stomp

0 commit comments

Comments
 (0)