Skip to content

Events → BatchMode → Flush doesn't block and wait #476

Open
@skyzyx

Description

@skyzyx

Description

The documentation for the Events.Flush method says:

Flush gives the user a way to manually flush the queue in the foreground. This is also used by watchdog when the timer expires.

I expect foreground to be synonymous with block and wait, and the client doesn't appear to be doing that. This is the culmination of research that was originally discussed in #464.

Go Version

go version go1.15.2 darwin/amd64

Current behavior

Reading thousands of records from one system, and pushing them into New Relic via pkg/events in batch mode. Code is very similar to what was posted in #464, but essentially boils down to something like this:

(I've stripped out some event handling and logging code, so there may be some phantom variables here.)

func main() {
	ctx := context.Background()

	cfg := config.New()
	cfg.InsightsInsertKey = os.Getenv("NEWRELIC_INSIGHTS_INSERT")
	cfg.Compression = config.Compression.Gzip
	cfg.LogLevel = "trace"

	client := events.New(cfg)
	logger := cfg.GetLogger()

	// Start batch events mode
	if err := client.BatchMode(
		ctx,
		nrAccountID,
		events.BatchConfigTimeout(batchTimeout),
	); err != nil {
		log.Fatalln("error starting batch mode:", err)
	}

	// Spawn goroutines...
	wgRawData.Add(1)
	go profileRaw(rawChannel, &client, &cfg)

	// Wait for goroutines to return...
	go func() {
		wgRawData.Wait()
		close(rawChannel)
	}()

	for rawData := range rawChannel {
		spew.Dump(rawData)
	}

	// Force-flush the events when we're done with this batch of data so that
	// the function ends and returns instead of listening indefinitely.
	if err := client.Flush(); err != nil {
		logger.Error(fmt.Sprintf("error flushing event queue: %s", err.Error()))
	}

	logger.Info("==> Events instructed to flush.")
}

// Leverages Go channels to execute the processing concurrently.
func profileRaw(c chan Response, client *events.Events, cfg *config.Config) {
	// Prepare to close the channel
	defer wgRawData.Done()

	ctx := context.Background()
	logger := cfg.GetLogger()

	logger.Info("==> Kicking off the profiler...")

	// Run the profiler
	output, ok := profiler(profilerDuration)
	if !ok {
		logger.Error("Profiler function failed.")
		c <- resp
	}

	logger.Info("==> Parsing profiler data...")

	payloads := strings.Split(output, "\n\n")

	logger.Info("==> Sending data chunks to New Relic as events...")

	for i := range payloads {
		payload := payloads[i]

		// Read the JSON into the appropriate Go struct
		err = json.Unmarshal([]byte(payload), &event)
		if err != nil {
			logger.Error(fmt.Sprintf("[] error unmarshaling JSON into struct: %s %s", err.Error(), payload))
			break
		}

		// ...do stuff to transform raw profiler event data into a flattened
		// structure that the New Relic Events API wants...

		// Queue a custom event
		if err := client.EnqueueEvent(ctx, flatEvent); err != nil {
			logger.Error(fmt.Sprintf("error posting custom event: %s", err.Error()))
		}
	}

	logger.Info("==> Sending events completed.")

	// Return data to the channel, closing it
	c <- resp
}

What happens is that client.Flush() isn't preventing the program from ending, so main() exits before the queue flushes (or, more accurately, even starts).

Expected behavior

My expectation is that client.Flush() should block and wait — preventing main() from exiting until the flushed events have responded successfully (or at least have been sent in the first place).

Steps To Reproduce

  1. Create a new events client.
  2. Put it into batch mode.
  3. Set it to flush every second.
  4. Enqueue a few thousand event messages.
  5. Trigger a flush.
  6. Program ends.

Additional Context

I was seeing cases where sometimes events made it, and other times they did not. The difference appeared to be related to which log level I was setting on the client. Once I narrowed-down the variables in the puzzle, I discovered that it wasn't the log level so much as the time it took for those log levels to write data to stdout/stderr.

  • debug and trace wrote a lot more data to stdout/stderr, which slowed-down the program enough to allow the goroutines to complete.

  • info wrote less data, enabling the program to run faster, and the goroutines didn't have enough time to finish (or even start).

A workaround has been to sleep at the end of the main() function.

time.Sleep(5 * time.Second)

But what this means is that it always ends up sleeping either too long or not long enough. Too long means that I'm missing out on other live events while I'm waiting for the program to finish sleeping before listening again. Too short means that some events I've read aren't making their way to the Events API.

Ideally, the client should intrinsically understand what it needs to block for, then stop blocking once the queue flushing has completed (for whatever definition of "completed" is appropriate — request has been sent, or response has been received).

References or Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    client-gopinnedPin an issue to prevent auto-closing by a bot.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions