Skip to content

feat(ingest): parallell batches #2504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

Conversation

hekike
Copy link
Contributor

@hekike hekike commented Mar 21, 2025

Sent events in parallel chunks to Kafka.

Summary by CodeRabbit

  • New Features
    • Introduced an enhanced ingestion service supporting configurable batch sizes and concurrent event processing.
    • Added a new interface for event ingestion with improved method signatures.
    • Added a new field for specifying process batch size in the configuration.
  • Bug Fixes
    • Enforced validations with sensible defaults, ensuring optimal batch configuration.
  • Refactor
    • Streamlined service initialization and dependency management for a more robust event ingestion workflow.
    • Updated event processing logic to enhance concurrency and encapsulation of service state.

@hekike hekike added area/ingest release-note/feature Release note: Exciting New Features labels Mar 21, 2025
@hekike hekike requested a review from a team as a code owner March 21, 2025 15:57
Copy link

coderabbitai bot commented Mar 21, 2025

📝 Walkthrough

Walkthrough

The changes introduce a new ingest service mechanism. A constructor function is added to create this service with configurable batch size support. The ingestion configuration is enhanced with a new field and validation logic. The dependency injection setup in the server’s wiring is updated to include the new ingest service, and related references in the server’s main package are simplified. In the ingest package, a new Service interface is defined, and its implementation is refactored to process events concurrently in batches. Test cases have been modified to use the new service constructor with an additional configuration parameter.

Changes

File(s) Change Summary
app/common/openmeter_server.go Added NewIngestService function that wraps a call to the ingest service constructor.
app/config/ingest.go Added ProcessBatchSize field in IngestConfiguration, with validation and default value in the configuration.
cmd/server/main.go Removed ingestion package import and local instantiation; updated to use the integrated app.IngestService.
cmd/server/wire.go,
cmd/server/wire_gen.go
Added IngestService field to the Application struct and initialized it via a call to common.NewIngestService.
openmeter/ingest/ingest.go Introduced a new Service interface defining IngestEvents method.
openmeter/ingest/ingestdriver/http_transport_test.go Updated tests to instantiate the service using NewService with a batch configuration parameter instead of a struct literal.
openmeter/ingest/service.go Refactored service implementation to use a private struct with collector, logger, and batchSize fields; modified IngestEvents and event processing for concurrent batched execution.
app/config/config_test.go Added ProcessBatchSize field in IngestConfiguration within the TestComplete function to reflect changes in expected configuration structure.

Possibly related PRs

  • refactor: ingest telemetry #2413: The changes in the main PR, which introduce a new function NewIngestService that utilizes the ProcessBatchSize from the IngestConfiguration, are related to the retrieved PR as both involve modifications to the ingest package, specifically in how services are constructed and configured, including the handling of parameters like logger and tracer.
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@tothandras tothandras assigned chrisgacsal and unassigned tothandras Mar 21, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
app/config/ingest.go (1)

16-17: Add documentation for the new field.

Consider adding a documentation comment for the ProcessBatchSize field to explain its purpose, expected values, and impact on performance, similar to other documented fields in this file.

 type IngestConfiguration struct {
 	Kafka            KafkaIngestConfiguration
-	ProcessBatchSize int
+	// ProcessBatchSize defines the number of events to process in parallel
+	// when ingesting events. Higher values may improve throughput at the cost
+	// of increased memory usage.
+	ProcessBatchSize int
 }
openmeter/ingest/ingestdriver/http_transport_test.go (3)

27-31: Consider making batch size a test constant.

Repeatedly passing 1000 to NewService may clutter the tests if more references surface. Extracting it into a named constant (e.g., const testBatchSize = 1000) can simplify updates and improve clarity.


78-82: Optional test refactor for clarity.

Same suggestion as above: unify 1000 into a shared test constant or variable to maintain readability and consistency across tests.


105-109: Repeated batch size usage in test.

Again, for maintainability, consider referencing a single constant for the batch size instead of hardcoding 1000.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6d00527 and 17f0ffc.

📒 Files selected for processing (8)
  • app/common/openmeter_server.go (1 hunks)
  • app/config/ingest.go (2 hunks)
  • cmd/server/main.go (1 hunks)
  • cmd/server/wire.go (2 hunks)
  • cmd/server/wire_gen.go (3 hunks)
  • openmeter/ingest/ingest.go (1 hunks)
  • openmeter/ingest/ingestdriver/http_transport_test.go (3 hunks)
  • openmeter/ingest/service.go (2 hunks)
🧰 Additional context used
🧬 Code Definitions (3)
openmeter/ingest/ingest.go (1)
openmeter/ingest/service.go (1) (1)
  • IngestEventsRequest (32-35)
cmd/server/wire.go (2)
openmeter/ingest/ingest.go (1) (1)
  • Service (17-19)
app/common/openmeter_server.go (1) (1)
  • NewIngestService (23-25)
openmeter/ingest/ingestdriver/http_transport_test.go (1)
openmeter/ingest/service.go (2) (2)
  • service (25-29)
  • NewService (16-22)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Test
  • GitHub Check: E2E
  • GitHub Check: CI
🔇 Additional comments (19)
openmeter/ingest/ingest.go (1)

16-19: Clean and well-defined interface.

The new Service interface is concise and clearly defines the contract for event ingestion with a single method IngestEvents. This follows Go's idiomatic interface design pattern of defining small, focused interfaces that serve a specific purpose.

app/common/openmeter_server.go (1)

23-25: Function correctly implements the factory pattern.

The NewIngestService function properly constructs the ingest service by delegating to ingest.NewService and passing the appropriate parameters, including the new batch size configuration. This function adheres to the factory pattern used throughout the codebase.

cmd/server/wire.go (2)

53-53: Good addition to the Application struct.

The IngestService field is properly added to the Application struct, following the same pattern as other service dependencies.


96-96: Correct wire provider integration.

The common.NewIngestService provider is properly added to the wire.Build set, ensuring it will be injected correctly during application initialization.

app/config/ingest.go (2)

24-26: Proper validation check.

The validation logic correctly ensures that the batch size is greater than zero, which is essential for the parallel processing feature to work correctly.


170-170: Good default batch size.

Setting a default batch size of 20 is reasonable. It provides parallelism benefits without excessive resource consumption, striking a good balance for most use cases.

cmd/server/wire_gen.go (2)

337-337: Ensure that ingestConfiguration is validated.

Instantiating the ingest service here looks fine, but confirm that ingestConfiguration has already been validated (especially the batch size) to avoid potential runtime misconfigurations.


429-429: Approved addition of IngestService field.

This integrates the new ingest service into the application’s DI setup in a clean, consistent manner.

cmd/server/main.go (1)

109-109: Direct usage of app.IngestService.IngestEvents.

Referencing IngestService injected from wire_gen ensures consistent configuration without extra instantiation. The approach is sound and keeps the code DRY.

openmeter/ingest/service.go (10)

5-5: New "errors" import.

Introduced for leveraging errors.Join(...). This addition cleanly handles aggregated errors later in the code.


8-8: New "sync" import.

Necessary for concurrency features (WaitGroup). Approved for usage in chunked event processing.


12-12: New import "github.com/samber/lo".

Used to chunk events. This is a concise approach for splitting arrays.


15-22: New NewService constructor introduced.

Encapsulating collector, logger, and batch size in one place is a clear design choice. Ensure upstream config validation keeps batchSize > 0 to prevent edge cases.


25-29: Private service struct.

Switching from a public to private struct improves encapsulation. The defined fields match the constructor usage nicely.


31-31: Doc comment addition.

Adding a short doc comment for IngestEventsRequest clarifies intent.


37-41: Early return for single-event ingestion.

Processing single events through processEvent directly keeps it simple. No concerns here.


43-45: Using lo.Chunk for batch segmentation.

Neatly splits events into chunks of batchSize. This is a straightforward solution.


77-79: Improved logging context for processEvent.

This is a nice touch, including event details and namespace in log messages for better traceability.


94-94: Collector ingestion call.

Forwarding the current event to s.collector.Ingest is straightforward. The attached error logging is appropriate.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
openmeter/ingest/service.go (1)

37-76: Consider adding context cancellation handling.

While the concurrent implementation is solid, it might be beneficial to respect context cancellation to abort processing if the parent context is canceled.

func (s service) IngestEvents(ctx context.Context, request IngestEventsRequest) (bool, error) {
    if len(request.Events) == 1 {
        return true, s.processEvent(ctx, request.Events[0], request.Namespace)
    }

    // Split events into chunks of size s.batchSize and process events in chunks in parallel
    chunks := lo.Chunk(request.Events, s.batchSize)

    for _, chunk := range chunks {
+       // Check if context is canceled before processing chunk
+       if ctx.Err() != nil {
+           return false, ctx.Err()
+       }
        
        wg := sync.WaitGroup{}
        wg.Add(len(chunk))
        chErr := make(chan error, len(chunk))

        for _, ev := range chunk {
            go func(ev event.Event, wg *sync.WaitGroup) {
                defer wg.Done()

+               // Check if context is canceled before processing event
+               if ctx.Err() != nil {
+                   chErr <- ctx.Err()
+                   return
+               }
                
                err := s.processEvent(ctx, ev, request.Namespace)
                if err != nil {
                    chErr <- err
                }
            }(ev, &wg)
        }

        wg.Wait()
        close(chErr)

        var errs []error

        for err := range chErr {
            errs = append(errs, err)
        }

        if len(errs) > 0 {
            return false, errors.Join(errs...)
        }
    }

    return true, nil
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 17f0ffc and 02d78b6.

📒 Files selected for processing (1)
  • openmeter/ingest/service.go (2 hunks)
🔇 Additional comments (8)
openmeter/ingest/service.go (8)

5-12: Import additions align with new functionality.

The new imports support the parallel processing feature:

  • errors for joining multiple errors with errors.Join
  • sync for coordination with sync.WaitGroup
  • github.com/samber/lo for chunking the events with lo.Chunk

All these imports are properly utilized in the implementation.


15-22: Good constructor pattern implementation.

The constructor function follows best practices by:

  • Returning an interface type rather than concrete implementation
  • Properly initializing all required fields
  • Having clear parameter naming
  • Including appropriate documentation

This enables better dependency injection and testability.


25-29: Good encapsulation through private implementation.

Converting the public Service struct to a private service struct is a good practice that:

  • Hides implementation details
  • Forces consumers to use the constructor
  • Provides flexibility to change implementation without breaking API

The new batchSize field is properly integrated in the struct.


37-41: Optimization for single events.

Good optimization to bypass the chunking and parallelization overhead when there's only one event to process.


43-45: Clear chunking implementation.

Using lo.Chunk is an efficient way to split the events into manageable batches. The comment clearly explains the intent of this section.


46-73: Well-implemented concurrent processing with proper error handling.

The concurrent processing logic is well structured:

  1. Creates appropriate synchronization primitives
  2. Properly handles goroutine lifecycle with wg.Done()
  3. Efficiently collects errors through a channel
  4. Aggregates errors using errors.Join

The previous deadlock issue has been correctly addressed with defer wg.Done().


79-107: Method appropriately updated for new struct.

The processEvent method has been correctly updated to match the private struct implementation while maintaining its functionality.


1-108:

✅ Verification successful

Service interface not visible in the file.

While the code mentions a Service interface type in the constructor return value, the interface definition isn't visible in this file. Ensure it's properly defined elsewhere in the codebase.


🏁 Script executed:

#!/bin/bash
# Check for Service interface definition in the codebase
grep -r "type Service interface" --include="*.go" .

Length of output: 808


Verified: The Service interface is defined in the codebase.

After checking, the Service interface is indeed declared in openmeter/ingest/ingest.go (among other implementations). This confirms that the implementation in openmeter/ingest/service.go correctly returns a concrete type that complies with the interface. No changes are required here.

@hekike hekike enabled auto-merge (squash) March 21, 2025 19:17
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
openmeter/ingest/ingestdriver/http_transport_test.go (3)

28-32: Updated service initialization to use the new constructor with batch size.

The service instantiation has been updated to use ingest.NewService with the new batch size parameter of 1000, which supports the parallel batch processing feature.

Consider extracting the hardcoded batch size (1000) to a constant or variable at the package level for better maintainability.

+const defaultTestBatchSize = 1000

 func TestIngestEvents(t *testing.T) {
 	collector := ingest.NewInMemoryCollector()

 	service := ingest.NewService(
 		collector,
 		slog.Default(),
-		1000,
+		defaultTestBatchSize,
 	)

79-83: Updated service initialization in invalid event test.

The service instantiation in this test has been consistently updated to match the other test functions.

Use the same constant suggested earlier for the batch size value here as well.


106-110: Updated service initialization in batch handler test.

The service instantiation has been updated consistently with the new constructor pattern.

Use the same constant suggested earlier for the batch size value here as well.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 02d78b6 and 76b843d.

📒 Files selected for processing (2)
  • app/config/config_test.go (1 hunks)
  • openmeter/ingest/ingestdriver/http_transport_test.go (5 hunks)
🔇 Additional comments (6)
app/config/config_test.go (1)

91-91: Configuration update for parallel batch processing looks good.

The addition of ProcessBatchSize: 20 to the IngestConfiguration struct correctly aligns with the PR objective of implementing parallel batches for event ingestion. This test now ensures that the configuration system properly handles this new parameter.

openmeter/ingest/ingestdriver/http_transport_test.go (5)

15-15: New import added for event processing.

The lo library is now used to create a map of events by their ID to handle non-deterministic ordering in batch processing.


147-150: Improved event validation to handle non-deterministic ordering.

Great improvement! The comment clearly explains why we're switching to ID-based lookup, and the lo.KeyBy function provides a clean implementation for creating a map of events by their IDs.


155-158: Updated assertion logic to verify events by ID.

The updated assertion logic properly handles the non-deterministic ordering of events that may result from parallel batch processing. This is more robust than the previous approach that relied on preserved order.


160-163: Updated assertion statements to use expected event lookup.

The assertion statements have been correctly updated to compare against the expected event retrieved by ID from the map.


1-166:

❓ Verification inconclusive

Request verification of batch size impact on test behavior.

While the test changes to support the new parallel batch processing are good, I don't see any tests that specifically verify how different batch sizes affect behavior.

Consider adding a test case that verifies the behavior with different batch sizes (e.g., very small batch size vs. large batch size) to ensure the batch size parameter is properly utilized by the service.


🏁 Script executed:

#!/bin/bash
# Let's check the service implementation to verify how batch size is used
rg -A 20 "func NewService" --type go

Length of output: 4241


Action Required: Enhance batch size test coverage

While the changes to support new parallel batch processing are solid, the current tests (in openmeter/ingest/ingestdriver/http_transport_test.go) use a fixed batch size (set to 1000) and do not verify how different batch sizes impact behavior. In the service implementation (see openmeter/ingest/service.go), the batch size parameter is simply stored without any in-depth testing of its effects. Please consider adding test cases that vary the batch size—for example, testing with very small values (such as 1 or 2) versus much larger ones—to ensure that the batch processing logic correctly respects and behaves according to the configured batch size.

@chrisgacsal
Copy link
Contributor

@hekike Is this something we still want to merge?

@chrisgacsal chrisgacsal disabled auto-merge March 31, 2025 08:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/ingest release-note/feature Release note: Exciting New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants