-
Notifications
You must be signed in to change notification settings - Fork 494
loki.source.docker: update to use shared scheduler #5026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
18c614f to
bf16415
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors loki.source.docker to use a shared Scheduler pattern (previously only used by loki.source.file) to improve container scheduling performance and fix potential deadlocks during component shutdown. The scheduler and related utilities have been moved to the shared source package for reuse across components.
Key changes:
- Moved
Schedulerfromfilepackage to sharedsourcepackage, making it reusable by bothloki.source.fileandloki.source.docker - Created new shared utilities (
Fanout,Consume,Drain) that implement common patterns for entry forwarding and graceful shutdown - Replaced the docker component's custom
manager/runnerimplementation with theSchedulerpattern
Reviewed changes
Copilot reviewed 17 out of 19 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/component/loki/source/scheduler.go | Package renamed from file to source; removed IsRunning() method; added DebugSource interface |
| internal/component/loki/source/scheduler_test.go | Package renamed from file to source |
| internal/component/loki/source/drain.go | New shared utility for draining log entries during shutdown to prevent deadlocks |
| internal/component/loki/source/consume.go | New shared utility for consuming and forwarding log entries with context cancellation |
| internal/component/common/loki/fanout.go | New shared utility for distributing log entries to multiple receivers with thread-safe updates |
| internal/component/loki/source/file/file.go | Updated to use shared scheduler, fanout, consume, and drain utilities; removed custom receiver management |
| internal/component/loki/source/file/tailer.go | Removed IsRunning() method; added DebugInfo() method to implement DebugSource interface |
| internal/component/loki/source/file/tailer_test.go | Updated test to access running field directly instead of through removed IsRunning() method |
| internal/component/loki/source/file/decompresser.go | Removed IsRunning() method; added DebugInfo() method to implement DebugSource interface |
| internal/component/loki/source/docker/docker.go | Major refactoring to use scheduler pattern; replaced manager with Scheduler; uses shared Fanout for receiver management |
| internal/component/loki/source/docker/tailer.go | Moved from internal/dockertarget package; implements Source and DebugSource interfaces; added Run() method for scheduler integration |
| internal/component/loki/source/docker/tailer_test.go | Updated tests for new tailer structure; added restart and stress tests |
| internal/component/loki/source/docker/metrics.go | Changed from exported Metrics to unexported metrics; package comment updated |
| internal/component/loki/source/docker/runner.go | Deleted (replaced by scheduler) |
| internal/component/loki/source/docker/internal/dockertarget/target.go | Deleted (moved to tailer.go) |
| internal/component/loki/source/docker/docker_test.go | Updated component tests to work with new scheduler-based implementation |
| internal/component/loki/source/docker/testdata/flog_after_restart.log | New test data file for container restart tests |
| CHANGELOG.md | Added entries for scheduling improvement and deadlock fix |
Comments suppressed due to low confidence (1)
internal/component/loki/source/scheduler.go:104
- Inconsistent type parameter naming: The generic type parameter is lowercase
kinDebugSource[k comparable], but it's uppercaseKin all other generic types in this file (e.g.,Source[K comparable],Scheduler[K comparable],SourceWithRetry[K comparable]). For consistency, it should beDebugSource[K comparable].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 19 out of 21 changed files in this pull request and generated 2 comments.
a8b6fa7 to
627a9cc
Compare
dehaansa
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot to process here! A first pass at review, but will definitely take another.
257e395 to
9eae721
Compare
thampiotr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks correct, but I do feel like it's a bit of a risky change. I know you've done manual testing, so I'm approving this, but another pair of eyes will be a good idea.
| func (s *Scheduler[K]) Stop() { | ||
| s.cancel() | ||
| s.running.Wait() | ||
| s.sources = make(map[K]scheduledSource[K]) | ||
| } | ||
|
|
||
| // Reset will stop all running sources and wait for them to finish and reset | ||
| // Scheduler to a usable state. | ||
| func (s *Scheduler[K]) Reset() { | ||
| s.cancel() | ||
| s.running.Wait() | ||
| s.sources = make(map[K]scheduledSource[K]) | ||
| s.ctx, s.cancel = context.WithCancel(context.Background()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two are quite similar... Can we reduce to have just one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I can think of a better way to do it. but we don't want to create a new context when we stop a component, that would leak resources..
|
|
||
| func (d *decompressor) IsRunning() bool { | ||
| return d.running.Load() | ||
| func (d *decompressor) DebugInfo() any { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another gotcha with DebugInfo functions in the past was that the lock order was not consistent and we had a deadlock. Happened at least 2x. I looked and I don't see this issue here, but the fact that we need to depend on manual review worries me here. Maybe we can refactor in the future to avoid the multiple locks sequencing issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that is a issue for sure
Sure I can wait for @dehaansa do to a review too. |
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
2f46b74 to
797a5ae
Compare
PR Description
loki.source.dockersuffered from the same issue thatloki.source.filedid where scheduling could take some time for new targets.In this pr I move
Schedulerto source package so I can be reused by both components. With this I also moved "target" into docker package and rename totailer, this tailer now implementsSourceinterface.I also fixed a issue where stopping component could deadlock if nothing was reading from handler chan.
Which issue(s) this PR fixes
Related to: #4729
Notes to the Reviewer
Moved target and metrics from internal package to
tailer.goandmetrics.goand they are no longer exported.Created a shared structure
Fanout, this implements the common pattern to abort send operation if context is canceled.Create shared function
Consumeto run the consume loop that will abort if context is canceled.Create shared function
Drainthat can be used when component is stopped.PR Checklist