Skip to content

Conversation

gradyzhuo
Copy link
Owner

@gradyzhuo gradyzhuo commented Sep 24, 2025

PR Type

Enhancement


Description

  • Add graceful shutdown mechanism to streaming operations

  • Update UnaryStream protocol with finished callback parameter

  • Integrate beginGracefulShutdown() calls across stream usecases

  • Remove deprecated ReadAnyTarget implementation


Diagram Walkthrough

flowchart LR
  A["UnaryStream Protocol"] -- "add finished callback" --> B["Stream Operations"]
  B -- "call beginGracefulShutdown()" --> C["GRPC Client"]
  C -- "graceful connection close" --> D["Clean Termination"]
Loading

File Walkthrough

Relevant files
Enhancement
14 files
EventStoreDB.swift
Remove deprecated persistent subscription method                 
+0/-17   
ReadAnyTarget.swift
Delete entire deprecated ReadAnyTarget implementation       
+0/-67   
UnaryStream.swift
Add finished callback parameter to send method                     
+1/-1     
UnaryStream.swift
Implement graceful shutdown in perform methods                     
+6/-2     
Monitoring.Stats.swift
Add finished callback and graceful shutdown                           
+2/-1     
PersistentSubscriptions.Subscription.swift
Add onTermination callback to subscription initialization
+5/-2     
PersistentSubscriptions.AllStream.Read.swift
Integrate graceful shutdown in AllStream read                       
+3/-1     
PersistentSubscriptions.SpecifiedStream.Read.swift
Integrate graceful shutdown in SpecifiedStream read           
+3/-1     
Projections.Statistics.swift
Add finished callback and graceful shutdown                           
+2/-2     
Streams.ReadAll.swift
Add finished callback and graceful shutdown                           
+2/-1     
Streams.SubscribeAll.swift
Add termination callback for graceful shutdown                     
+4/-1     
Streams.Read.swift
Add finished callback and graceful shutdown                           
+2/-1     
Streams.Subscribe.swift
Add termination callback for graceful shutdown                     
+4/-1     
Users.Details.swift
Add finished callback and graceful shutdown                           
+2/-1     

Summary by CodeRabbit

  • Chores

    • Removed a deprecated helper for subscribing to persistent subscriptions.
  • Refactor

    • Streaming operations now support a completion callback and trigger graceful connection shutdown when streams finish, improving resource cleanup and stability.
    • Subscription instances can execute a termination hook when their underlying stream ends, enabling safer teardown behavior.

@gradyzhuo gradyzhuo requested a review from Copilot September 24, 2025 07:17
Copy link
Contributor

coderabbitai bot commented Sep 24, 2025

Walkthrough

Removes deprecated persistent-subscription APIs, adds a finished/onTermination callback across stream send paths, and wires graceful shutdown (beginGracefulShutdown) after stream completion/termination. Updates UnaryStream protocol to require a finished callback and propagates the change through monitoring, projections, streams, users, and persistent-subscription flows.

Changes

Cohort / File(s) Summary of Changes
Deprecated persistent-subscription API removals
Sources/EventStoreDB/EventStoreDB.swift, Sources/EventStoreDB/PersistentSubscriptions/ReadAnyTarget.swift
Removed deprecated subscribePersistentSubscription(...) helper and deprecated PersistentSubscriptions.ReadAnyTarget type and its streaming logic.
UnaryStream protocol update
Sources/GRPCEncapsulates/Usecase/UnaryStream.swift
Added finished: @sendable @escaping ()->Void parameter to UnaryStream.send signature.
Propagate finished callback across KurrentDB streaming sends
Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift, Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift, Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift, Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift, Sources/KurrentDB/Streams/Usecase/Specified/Streams.Read.swift, Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift, Sources/KurrentDB/Users/Usecase/Users.Details.swift
Updated send(...) signatures to include finished callback and invoke it when streams finish or terminate; adjusted continuation.finish() placement where applicable.
Graceful shutdown hooks after send completion
Sources/KurrentDB/Core/Additions/Usecase/UnaryStream.swift, Sources/KurrentDB/PersistentSubscriptions/Usecase/AllStream/PersistentSubscriptions.AllStream.Read.swift, Sources/KurrentDB/PersistentSubscriptions/Usecase/Specified/PersistentSubscriptions.SpecifiedStream.Read.swift
Added trailing-closure hooks to invoke connection.beginGracefulShutdown() after send/initialization; no public signatures changed in these files.
Persistent subscription termination callback
Sources/KurrentDB/PersistentSubscriptions/PersistentSubscriptions.Subscription.swift
Public initializer now accepts onTermination: @sendable ()->Void; wired to continuation.onTermination to trigger on stream termination.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Caller
  participant Usecase as Usecase (UnaryStream impl)
  participant GRPC as GRPCClient
  participant Stream as AsyncThrowingStream

  Caller->>Usecase: send(request, callOptions, finished)
  Usecase->>GRPC: start stream
  GRPC-->>Stream: yield messages
  Stream-->>Caller: iterate responses
  GRPC-->>Usecase: stream completed/closed
  Usecase->>Caller: invoke finished()
  Note over Usecase,GRPC: Optional: beginGracefulShutdown() in finished closure
Loading
sequenceDiagram
  autonumber
  participant Caller
  participant ReadOp as PersistentSubs Read (All/Specified)
  participant Sub as Subscription
  participant GRPC as GRPCClient

  Caller->>ReadOp: send(..., finished)
  ReadOp->>GRPC: establish subscription stream
  GRPC-->>Sub: deliver events
  Sub-->>Caller: expose AsyncThrowingStream
  note over Sub: onTermination -> callback
  GRPC-->>Sub: termination/close
  Sub->>Caller: onTermination()
  Sub->>GRPC: beginGracefulShutdown() (via closure)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • Kalvar
  • cattybaby723
  • abner-tungchi

Poem

I twitch my ears at streams that flow,
A finished hop, and off we go—
Shutdown graceful, burrows neat,
Subscriptions end with thumping beat.
Old trails pruned, new paths align,
Carrot logs: all green, so fine. 🥕🐇

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "[UPDATE] add beginGracefulShutdown() to stream response usecase." accurately and concisely captures the primary change in the diff: introducing a graceful-shutdown invocation for streaming response usecases (and related send-call changes) across several modules. It is specific, short, and directly related to the modifications shown in the provided summaries.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/connection-should-close

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds graceful shutdown functionality to streaming response use cases by adding a finished callback parameter and calling beginGracefulShutdown() on GRPC connections when streams terminate.

  • Adds finished parameter to send method signatures across streaming use cases
  • Implements graceful shutdown by calling connection.beginGracefulShutdown() when streams complete
  • Removes deprecated ReadAnyTarget functionality and related API methods

Reviewed Changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
Users.Details.swift Adds finished callback parameter and invocation after stream completion
Streams.Subscribe.swift Adds finished callback using onTermination handler
Streams.Read.swift Adds finished callback parameter and invocation after stream completion
Streams.SubscribeAll.swift Adds finished callback using onTermination handler
Streams.ReadAll.swift Adds finished callback parameter and invocation after stream completion
Projections.Statistics.swift Adds finished callback and removes duplicate continuation.finish() call
PersistentSubscriptions.SpecifiedStream.Read.swift Implements graceful shutdown in subscription initialization
PersistentSubscriptions.AllStream.Read.swift Implements graceful shutdown in subscription initialization
PersistentSubscriptions.Subscription.swift Adds onTermination parameter and callback setup
Monitoring.Stats.swift Adds finished callback parameter and invocation after stream completion
UnaryStream.swift (KurrentDB) Implements graceful shutdown in stream execution
UnaryStream.swift (GRPCEncapsulates) Updates protocol signature to include finished parameter
ReadAnyTarget.swift Complete file removal of deprecated functionality
EventStoreDB.swift Removes deprecated persistent subscription API method

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines 53 to 55
continuation.finish()
finished()
}
Copy link

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a duplicate continuation.finish() call. Line 55 shows a removed continuation.finish() that was likely misplaced, but the diff shows it as removed. The continuation should only be finished once to avoid potential issues.

Copilot uses AI. Check for mistakes.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for connection close.

Copy link

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

API Change

The UnaryStream protocol now requires a 'finished' callback in 'send'. Verify all conforming types were updated and external callers are adapted to the new signature to avoid compile breaks or missed shutdowns.

package protocol UnaryStream: Usecase, UnaryRequestBuildable, StreamResponseHandlable {
    func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses
Double Shutdown Risk

'finished()' is invoked via continuation.onTermination. Ensure that 'onTermination' won’t be called multiple times (e.g., cancellation + normal finish) or guard against multiple invocations to avoid repeated beginGracefulShutdown() calls.

continuation.onTermination = { _ in
    finished()
}
Task {
    let client = ServiceClient(wrapping: connection)
Finished Timing

'finished()' is called immediately after 'continuation.finish()' inside the response handler. Confirm that no more callbacks are pending and that shutdown won’t race with in-flight gRPC operations.

try await client.statistics(request: request, options: callOptions) {
    for try await message in $0.messages {
        try continuation.yield(handle(message: message))
    }
    continuation.finish()
    finished()
}

Copy link

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
High-level
Consider connection lifecycle for subscriptions

The current implementation ties the GRPC connection lifecycle to a single
stream's termination, forcing a new connection for each operation. It is
suggested to manage the connection at a higher level to allow reuse across
multiple operations for efficiency.

Examples:

Sources/KurrentDB/Core/Additions/Usecase/UnaryStream.swift [22-24]
            return try await send(connection: client, request: request, callOptions: callOptions){
                client.beginGracefulShutdown()
            }
Sources/KurrentDB/Core/Additions/Usecase/UnaryStream.swift [59-61]
            return try await send(connection: client, request: request, callOptions: callOptions){
                client.beginGracefulShutdown()
            }

Solution Walkthrough:

Before:

// File: Sources/KurrentDB/Core/Additions/Usecase/UnaryStream.swift

package extension UnaryStream {
    func perform(node: Node, ...) async throws -> Responses {
        // A new client (and connection) is created for each operation.
        let client = GRPCClient(...)
        Task { try await client.runConnections() }
        
        // The operation is tied to a closure that shuts down the client upon completion.
        return try await send(connection: client, ...) {
            client.beginGracefulShutdown()
        }
    }
}

After:

// Suggestion implies a broader architectural change where connection is managed externally.

// The client/connection is passed in, not created by the use case.
package extension UnaryStream {
    func perform(on client: GRPCClient, ...) async throws -> Responses {
        // No client creation or shutdown logic.
        // The `send` method would no longer need a `finished` callback for shutdown.
        return try await send(connection: client, ...)
    }
}

// Example usage:
let clientManager = ClientManager() // Manages shared connections
let client = await clientManager.getClient()

// Multiple operations can reuse the same client.
let subscription = try await subscribeUsecase.perform(on: client)
let otherData = try await readUsecase.perform(on: client)

// Shutdown is handled explicitly by the application.
await clientManager.shutdown()
Suggestion importance[1-10]: 9

__

Why: This suggestion correctly identifies a critical design flaw where each streaming operation creates and then destroys its own connection, which is inefficient and limits the client's ability to perform concurrent or sequential operations.

High
Possible issue
Ensure graceful shutdown on error

To prevent resource leaks, ensure the finished() closure is called even when the
client.statistics gRPC call fails by using a do-catch block for proper error
handling.

Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift [45-58]

 package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
     try await withThrowingTaskGroup(of: Void.self) { _ in
         let (stream, continuation) = AsyncThrowingStream.makeStream(of: Response.self)
         let client = ServiceClient(wrapping: connection)
-        try await client.statistics(request: request, options: callOptions) {
-            for try await message in $0.messages {
-                try continuation.yield(handle(message: message))
+        do {
+            try await client.statistics(request: request, options: callOptions) {
+                for try await message in $0.messages {
+                    try continuation.yield(handle(message: message))
+                }
+                continuation.finish()
+                finished()
             }
-            continuation.finish()
+        } catch {
             finished()
+            throw error
         }
         return stream
     }
 }
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential resource leak where the finished() closure, intended for graceful connection shutdown, is not called if the client.statistics call fails. This is a valid bug in the error handling logic of the new feature.

Medium
  • More

@gradyzhuo gradyzhuo self-assigned this Sep 24, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
Sources/KurrentDB/Users/Usecase/Users.Details.swift (1)

32-32: Call finished on stream termination instead of only after finish.

To also cover consumer cancellation and avoid possible double-calls, use continuation.onTermination and remove the direct finished() call.

-        package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
+        package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
             try await withThrowingTaskGroup(of: Void.self) { _ in
                 let (stream, continuation) = AsyncThrowingStream.makeStream(of: UserDetails.self)
+                continuation.onTermination = { _ in
+                    finished()
+                }
                 let client = ServiceClient(wrapping: connection)
                 try await client.details(request: request, options: callOptions) {
                     for try await message in $0.messages {
                         let response = try handle(message: message)
                         continuation.yield(response.userDetails)
                     }
                     continuation.finish()
-                    finished()
                 }
 
                 return stream
             }
         }

Also applies to: 42-42

Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift (1)

38-42: Also cancel the underlying RPC when the consumer terminates.

Wire onTermination to cancel the read task; otherwise the RPC may outlive the stream and leak resources.

-        package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
+        package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
             let (stream, continuation) = AsyncThrowingStream.makeStream(of: UnderlyingResponse.self)
-            continuation.onTermination = { _ in
-                finished()
-            }
-            Task {
+            var rpcTask: Task<Void, Never>?
+            continuation.onTermination = { _ in
+                rpcTask?.cancel()
+                finished()
+            }
+            rpcTask = Task {
                 let client = ServiceClient(wrapping: connection)
                 try await client.read(request: request, options: callOptions) {
                     for try await message in $0.messages {
                         continuation.yield(message)
                     }
                 }
             }
             return try await .init(messages: stream)
         }
Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift (1)

35-39: Mirror cancellation-on-termination to avoid lingering RPC.

Same as Specified.Subscribe: cancel the RPC when the stream terminates.

-        package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
+        package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
             let (stream, continuation) = AsyncThrowingStream.makeStream(of: UnderlyingResponse.self)
-            continuation.onTermination = { _ in
-                finished()
-            }
-            Task {
+            var rpcTask: Task<Void, Never>?
+            continuation.onTermination = { _ in
+                rpcTask?.cancel()
+                finished()
+            }
+            rpcTask = Task {
                 let client = ServiceClient(wrapping: connection)
                 try await client.read(request: request, options: callOptions) {
                     for try await message in $0.messages {
                         continuation.yield(message)
                     }
                 }
             }
             return try await .init(messages: stream)
         }
Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift (1)

45-45: Prefer onTermination callback and remove direct finished() call; avoid blocking with TaskGroup.

Align with other modules: call finished() via continuation.onTermination and return the stream immediately using Task {}.

-        package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
-            try await withThrowingTaskGroup(of: Void.self) { _ in
-                let (stream, continuation) = AsyncThrowingStream.makeStream(of: Response.self)
-                let client = ServiceClient(wrapping: connection)
-                try await client.statistics(request: request, options: callOptions) {
-                    for try await message in $0.messages {
-                        try continuation.yield(handle(message: message))
-                    }
-                    continuation.finish()
-                    finished()
-                }
-                return stream
-            }
-        }
+        package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
+            let (stream, continuation) = AsyncThrowingStream.makeStream(of: Response.self)
+            continuation.onTermination = { _ in
+                finished()
+            }
+            Task {
+                let client = ServiceClient(wrapping: connection)
+                try await client.statistics(request: request, options: callOptions) {
+                    for try await message in $0.messages {
+                        try continuation.yield(handle(message: message))
+                    }
+                    continuation.finish()
+                }
+            }
+            return stream
+        }

Also applies to: 54-55

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0eee269 and 6958cb0.

📒 Files selected for processing (14)
  • Sources/EventStoreDB/EventStoreDB.swift (0 hunks)
  • Sources/EventStoreDB/PersistentSubscriptions/ReadAnyTarget.swift (0 hunks)
  • Sources/GRPCEncapsulates/Usecase/UnaryStream.swift (1 hunks)
  • Sources/KurrentDB/Core/Additions/Usecase/UnaryStream.swift (2 hunks)
  • Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift (2 hunks)
  • Sources/KurrentDB/PersistentSubscriptions/PersistentSubscriptions.Subscription.swift (3 hunks)
  • Sources/KurrentDB/PersistentSubscriptions/Usecase/AllStream/PersistentSubscriptions.AllStream.Read.swift (1 hunks)
  • Sources/KurrentDB/PersistentSubscriptions/Usecase/Specified/PersistentSubscriptions.SpecifiedStream.Read.swift (1 hunks)
  • Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift (2 hunks)
  • Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift (2 hunks)
  • Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift (1 hunks)
  • Sources/KurrentDB/Streams/Usecase/Specified/Streams.Read.swift (2 hunks)
  • Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift (1 hunks)
  • Sources/KurrentDB/Users/Usecase/Users.Details.swift (2 hunks)
💤 Files with no reviewable changes (2)
  • Sources/EventStoreDB/EventStoreDB.swift
  • Sources/EventStoreDB/PersistentSubscriptions/ReadAnyTarget.swift
🧰 Additional context used
🧬 Code graph analysis (12)
Sources/KurrentDB/PersistentSubscriptions/Usecase/AllStream/PersistentSubscriptions.AllStream.Read.swift (2)
Sources/KurrentDB/PersistentSubscriptions/Usecase/AllStream/PersistentSubscriptions.AllStream.Update.swift (1)
  • send (43-48)
Sources/EventStoreDB/PersistentSubscriptions/ReadAnyTarget.swift (1)
  • send (47-65)
Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift (2)
Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift (1)
  • send (33-46)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift (1)
  • send (38-52)
Sources/GRPCEncapsulates/Usecase/UnaryStream.swift (7)
Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift (1)
  • send (33-46)
Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift (1)
  • send (45-58)
Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift (1)
  • send (31-44)
Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift (1)
  • send (35-49)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Read.swift (1)
  • send (34-47)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift (1)
  • send (38-52)
Sources/KurrentDB/Users/Usecase/Users.Details.swift (1)
  • send (32-47)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift (3)
Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift (1)
  • send (33-46)
Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift (1)
  • send (35-49)
Sources/KurrentDB/Streams/Streams.Subscription.swift (2)
  • events (12-51)
  • terminate (89-91)
Sources/KurrentDB/Users/Usecase/Users.Details.swift (6)
Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift (1)
  • send (33-46)
Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift (1)
  • send (45-58)
Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift (1)
  • send (31-44)
Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift (1)
  • send (35-49)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Read.swift (1)
  • send (34-47)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift (1)
  • send (38-52)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Read.swift (8)
Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift (1)
  • send (33-46)
Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift (1)
  • send (45-58)
Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift (1)
  • send (31-44)
Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift (1)
  • send (35-49)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift (1)
  • send (38-52)
Sources/KurrentDB/Users/Usecase/Users.Details.swift (1)
  • send (32-47)
Sources/KurrentDB/Core/Additions/Usecase/StreamStream.swift (1)
  • Transport (12-39)
Sources/GRPCEncapsulates/Usecase/StreamStream.swift (1)
  • send (10-12)
Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift (6)
Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift (1)
  • send (33-46)
Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift (1)
  • send (31-44)
Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift (1)
  • send (35-49)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Read.swift (1)
  • send (34-47)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift (1)
  • send (38-52)
Sources/KurrentDB/Users/Usecase/Users.Details.swift (1)
  • send (32-47)
Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift (6)
Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift (1)
  • send (45-58)
Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift (1)
  • send (31-44)
Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift (1)
  • send (35-49)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Read.swift (1)
  • send (34-47)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift (1)
  • send (38-52)
Sources/KurrentDB/Users/Usecase/Users.Details.swift (1)
  • send (32-47)
Sources/KurrentDB/Core/Additions/Usecase/UnaryStream.swift (5)
Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift (1)
  • send (33-46)
Sources/KurrentDB/PersistentSubscriptions/Usecase/AllStream/PersistentSubscriptions.AllStream.Read.swift (1)
  • send (49-69)
Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift (1)
  • send (31-44)
Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift (1)
  • send (35-49)
Sources/KurrentDB/Core/Additions/Usecase/StreamUnary.swift (2)
  • send (13-15)
  • Transport (12-46)
Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift (6)
Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift (1)
  • send (33-46)
Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift (1)
  • send (45-58)
Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift (1)
  • send (35-49)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Read.swift (1)
  • send (34-47)
Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift (1)
  • send (38-52)
Sources/KurrentDB/Users/Usecase/Users.Details.swift (1)
  • send (32-47)
Sources/KurrentDB/PersistentSubscriptions/PersistentSubscriptions.Subscription.swift (2)
Sources/KurrentDB/KurrentDBClient+PersistentSubscriptions.swift (3)
  • createPersistentSubscription (8-106)
  • restartPersistentSubscriptionSubsystem (73-75)
  • updatePersistentSubscriptionToAllStream (30-35)
Sources/KurrentDB/Streams/Streams.Subscription.swift (3)
  • events (12-51)
  • terminate (89-91)
  • events (31-50)
Sources/KurrentDB/PersistentSubscriptions/Usecase/Specified/PersistentSubscriptions.SpecifiedStream.Read.swift (2)
Sources/EventStoreDB/PersistentSubscriptions/ReadAnyTarget.swift (2)
  • deprecated (15-66)
  • Target (14-67)
Sources/KurrentDB/Streams/Streams.Subscription.swift (1)
  • events (12-51)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Swift 6.0 on ubuntu-latest
  • GitHub Check: Swift 6.1 on ubuntu-latest
🔇 Additional comments (9)
Sources/KurrentDB/Core/Additions/Usecase/UnaryStream.swift (1)

22-24: Also shut down on error paths (avoid orphaned runConnections task).

If send(...) throws before finished runs, client.beginGracefulShutdown() is never called — wrap the call in do/catch, call beginGracefulShutdown() on failure, then rethrow.

Apply this diff:

-            return try await send(connection: client, request: request, callOptions: callOptions){
-                client.beginGracefulShutdown()
-            }
+            do {
+                return try await send(connection: client, request: request, callOptions: callOptions) {
+                    client.beginGracefulShutdown()
+                }
+            } catch {
+                client.beginGracefulShutdown()
+                throw error
+            }

Verify that GRPCClient<HTTP2ClientTransport.Posix> is Sendable (or otherwise safely captured) for the @Sendable finished closure — its declaration lives in the GRPCCore dependency, not in this repo.

Sources/KurrentDB/PersistentSubscriptions/Usecase/AllStream/PersistentSubscriptions.AllStream.Read.swift (1)

66-68: Graceful shutdown hook added — LGTM, but confirm termination is reachable.

Hooking beginGracefulShutdown() via the Subscription init’s termination callback looks right. Ensure the events stream actually finishes or terminates so this closure fires (see Subscription initializer review about missing continuation.finish()).

Sources/KurrentDB/PersistentSubscriptions/Usecase/Specified/PersistentSubscriptions.SpecifiedStream.Read.swift (1)

66-68: Graceful shutdown hook added — LGTM, but ensure event stream completes.

The closure will only run when the Subscription’s event stream terminates; see Subscription initializer comment to add continuation.finish()/error propagation so this can actually trigger.

Sources/KurrentDB/PersistentSubscriptions/PersistentSubscriptions.Subscription.swift (1)

46-51: Confirm first-message assumption (confirmation).

If the server ever sends a non-confirmation first, that message is dropped. Confirm the protocol guarantees confirmation-first ordering; otherwise buffer and replay the first non-confirmation.

Sources/GRPCEncapsulates/Usecase/UnaryStream.swift (1)

11-11: Protocol signature change — verified: all UnaryStream conformers updated

Verified that the send(...) signatures include the new finished: @sendable @escaping ()->Void parameter in: Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift, Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift, Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift, Sources/KurrentDB/Streams/Usecase/All/Streams.SubscribeAll.swift, Sources/KurrentDB/Streams/Usecase/Specified/Streams.Read.swift, Sources/KurrentDB/Streams/Usecase/Specified/Streams.Subscribe.swift, Sources/KurrentDB/Users/Usecase/Users.Details.swift.

Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift (1)

33-33: LGTM! Consistent completion callback integration.

The addition of the finished parameter and its invocation after stream completion aligns with the broader pattern across streaming use cases in this PR. The implementation correctly calls finished() after continuation.finish() to ensure proper cleanup ordering.

Also applies to: 42-42

Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift (1)

31-31: LGTM! Proper completion callback integration.

The implementation correctly adds the finished parameter and invokes it after stream completion, maintaining consistency with other streaming use cases in the codebase. The callback is properly placed after continuation.finish() to ensure correct cleanup ordering.

Also applies to: 40-40

Sources/KurrentDB/Streams/Usecase/Specified/Streams.Read.swift (2)

34-34: LGTM! Completion callback properly integrated.

The changes consistently follow the pattern established across other streaming use cases in this PR. The finished callback is correctly invoked after continuation.finish(), ensuring proper cleanup sequence.

Also applies to: 43-43


34-47: UnaryStream signature updated to include finished

Sources/GRPCEncapsulates/Usecase/UnaryStream.swift declares func send(..., finished: @Sendable @escaping ()->Void) — implementations match the updated requirement.

Comment on lines +59 to +61
return try await send(connection: client, request: request, callOptions: callOptions){
client.beginGracefulShutdown()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Mirror the error-safe shutdown here too.

Same issue as above: shutdown only happens via finished. Ensure shutdown is triggered when send(...) throws.

Apply this diff:

-            return try await send(connection: client, request: request, callOptions: callOptions){
-                client.beginGracefulShutdown()
-            }
+            do {
+                return try await send(connection: client, request: request, callOptions: callOptions) {
+                    client.beginGracefulShutdown()
+                }
+            } catch {
+                client.beginGracefulShutdown()
+                throw error
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return try await send(connection: client, request: request, callOptions: callOptions){
client.beginGracefulShutdown()
}
do {
return try await send(connection: client, request: request, callOptions: callOptions) {
client.beginGracefulShutdown()
}
} catch {
client.beginGracefulShutdown()
throw error
}
🤖 Prompt for AI Agents
In Sources/KurrentDB/Core/Additions/Usecase/UnaryStream.swift around lines
59–61, the client is only shut down when the completion handler runs, so if
send(...) throws the graceful shutdown never occurs; modify the call to ensure
shutdown runs on both success and error by arranging a defer (or equivalent
ensure/finally) that invokes client.beginGracefulShutdown() before awaiting send
so the shutdown is always executed even when send throws.

/// - reader: An asynchronous stream of responses from the subscription service.
/// - Throws: An error if the initialization process fails, such as when the response stream cannot be processed.
package init(requests writer: Writer = .init(), responses reader: AsyncThrowingStream<PersistentSubscriptions.ReadResponse, any Error>) async throws {
package init(requests writer: Writer = .init(), responses reader: AsyncThrowingStream<PersistentSubscriptions.ReadResponse, any Error>, onTermination: @Sendable @escaping ()->Void) async throws {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Events stream never finishes and errors aren’t propagated.

Without continuation.finish() (or finish(throwing:)), the events stream can hang and the termination callback (used to trigger graceful shutdown) may never run. Propagate reader errors too.

         let (stream, continuation) = AsyncThrowingStream.makeStream(of: PersistentSubscription.EventResult.self)
         continuation.onTermination = { _ in
             onTermination()
         }
-        Task {
-            while let response = try await iterator.next() {
-                if case let .readEvent(event, retryCount) = response {
-                    continuation.yield(.init(event: event, retryCount: retryCount))
-                }
-            }
-        }
+        Task {
+            do {
+                while let response = try await iterator.next() {
+                    if case let .readEvent(event, retryCount) = response {
+                        continuation.yield(.init(event: event, retryCount: retryCount))
+                    }
+                }
+                continuation.finish()
+            } catch {
+                continuation.finish(throwing: error)
+            }
+        }
         events = stream

Also applies to: 53-55

🤖 Prompt for AI Agents
In
Sources/KurrentDB/PersistentSubscriptions/PersistentSubscriptions.Subscription.swift
around lines 42 and 53–55, the AsyncThrowingStream continuation is never
finished and reader errors aren’t propagated; modify the stream consumption to
catch errors from the reader, call continuation.finish() on normal completion
and continuation.finish(throwing: error) when the reader throws, and ensure the
onTermination callback is invoked in both success and failure paths so the
events stream can complete and termination is triggered for graceful shutdown.

@gradyzhuo gradyzhuo merged commit d08b115 into main Sep 24, 2025
3 checks passed
@gradyzhuo gradyzhuo deleted the fix/connection-should-close branch September 24, 2025 07:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant