Skip to content

EmitPartialWindow doesn't emit on early close #190

@ruleoftwo

Description

@ruleoftwo

Hey,

There seems to be a race condition related bug in the SlidingWindow implementation.

Events are not always emitted from the first partial window even when EmitPartialWindow is set to true.

I think the issue lies in the first select statement of the emit function:
https://github.com/reugn/go-streams/blob/3872dfc1b7df7cfcd4ffd8148c2c975d90616138/flow/sliding_window.go#L219C1-L227C3

The <-sw.done case can be entered when EmitPartialWindow is true, and then the emit function will return without calling sw.dispatchWindow().

The timing for this bug to appear is very narrow, but if the input channel closes almost immediately (e.g., after sending 1 or 2 events), it seems to happen sometimes.

I tested this on the latest v0.13.0 release version:

window := flow.NewSlidingWindowWithOpts(
	5*time.Second,
	5*time.Second,
	flow.SlidingWindowOpts[string]{
		EmitPartialWindow: true,
	},
)
go func() {
	window.In() <- "event1"
	window.In() <- "event2"
	window.In() <- "event3"
	close(window.In())
}()
timeout := time.After(6 * time.Second)
select {
case result, ok := <-window.Out():
	if !ok {
		fmt.Println("channel closed")
	} else {
		fmt.Printf("received window: %v\n", result)
	}
case <-timeout:
	fmt.Println("timeout")
}

Results:

> go run .\test.go
got window: [event1 event2 event3]

> go run .\test.go
channel closed

> go run .\test.go
got window: [event1 event2 event3]

If you artificially increase the timer in emit, the race condition can be consistently reproduced.

The simplest fix could be something like:

case <-sw.done:
       timer.Stop()
       if sw.opts.EmitPartialWindow {
           sw.dispatchWindow()
       }
       return

There also seems to be a bug in the initialDelay calculation when running the previous example with EmitPartialWindow: false.

The window still sometimes emits results even though the input channel was closed immediately:

> go run .\test_emit_false.go
channel closed

> go run .\test_emit_false.go
got window: [event1 event2 event3]

> go run .\test_emit_false.go
channel closed

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions