diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bde2946ae6bb..33134228691e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for `_nodes/stats` URIs that work with legacy versions of Elasticsearch {pull}44307[44307] - Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553] - Remove fallback to the node limit for the `kubernetes.pod.cpu.usage.limit.pct` and `kubernetes.pod.memory.usage.limit.pct` metrics calculation +- Fixed a bug where `event.duration` could be missing from an event on Windows systems due to low-resolution clock. {pull}44440[44440] *Osquerybeat* diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 4681976f2e14..fb8811fad0a9 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -21,7 +21,7 @@ import ( "context" "errors" "fmt" - "math/rand" + "math/rand/v2" "sync" "time" @@ -205,7 +205,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { // Start each metricset randomly over a period of MaxDelayPeriod. if msw.module.maxStartDelay > 0 { - delay := time.Duration(rand.Int63n(int64(msw.module.maxStartDelay))) + delay := rand.N(msw.module.maxStartDelay) debugf("%v/%v will start after %v", msw.module.Name(), msw.Name(), delay) select { case <-done: @@ -320,7 +320,7 @@ func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporte reporter.Error(err) msw.stats.consecutiveFailures.Set(0) // mark module as running if metrics are partially available and display the error message - msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) + msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) default: @@ -328,7 +328,7 @@ func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporte msw.stats.consecutiveFailures.Inc() if msw.failureThreshold > 0 && msw.stats.consecutiveFailures != nil && uint(msw.stats.consecutiveFailures.Get()) >= msw.failureThreshold { // mark it as degraded for any other issue encountered - msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) + msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.Name(), err)) } logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) @@ -401,7 +401,8 @@ func (r reporterV2) Done() <-chan struct{} { return r.done } func (r reporterV2) Error(err error) bool { return r.Event(mb.Event{Error: err}) } func (r reporterV2) Event(event mb.Event) bool { if event.Took == 0 && !r.start.IsZero() { - event.Took = time.Since(r.start) + // ensure elapsed time is always > 0 + event.Took = max(time.Since(r.start), time.Microsecond) } if r.msw.periodic { event.Period = r.msw.Module().Config().Period @@ -428,7 +429,7 @@ func (r reporterV2) Event(event mb.Event) bool { if event.Namespace == "" { event.Namespace = r.msw.Registration().Namespace } - beatEvent := event.BeatEvent(r.msw.module.Name(), r.msw.MetricSet.Name(), r.msw.module.eventModifiers...) + beatEvent := event.BeatEvent(r.msw.module.Name(), r.msw.Name(), r.msw.module.eventModifiers...) if !writeEvent(r.done, r.out, beatEvent) { return false } diff --git a/metricbeat/mb/module/wrapper_test.go b/metricbeat/mb/module/wrapper_test.go index ad273f9566c8..f01fd234cce0 100644 --- a/metricbeat/mb/module/wrapper_test.go +++ b/metricbeat/mb/module/wrapper_test.go @@ -202,6 +202,30 @@ func TestPeriodIsAddedToEvent(t *testing.T) { } } +func TestDurationIsAddedToEvent(t *testing.T) { + hosts := []string{"alpha"} + config := newConfig(t, map[string]interface{}{ + "module": moduleName, + "metricsets": []string{reportingFetcherName}, + "hosts": hosts, + }) + + registry := newTestRegistry(t) + m, err := module.NewWrapper(config, registry, module.WithMetricSetInfo()) + require.NoError(t, err) + + done := make(chan struct{}) + defer close(done) + + output := m.Start(done) + + event := <-output + + fields := event.Fields.Flatten() + assert.Contains(t, fields, "event.duration", "event.duration should be present in event") + assert.Greater(t, fields["event.duration"], time.Duration(0), "event.duration should be greater than 0") +} + func TestNewWrapperForMetricSet(t *testing.T) { hosts := []string{"alpha"} c := newConfig(t, map[string]interface{}{