|
| 1 | +//===----------------------------------------------------------------------===// |
| 2 | +// Copyright © 2026 Apple Inc. and the Containerization project authors. |
| 3 | +// |
| 4 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +// you may not use this file except in compliance with the License. |
| 6 | +// You may obtain a copy of the License at |
| 7 | +// |
| 8 | +// https://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +// |
| 10 | +// Unless required by applicable law or agreed to in writing, software |
| 11 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +// See the License for the specific language governing permissions and |
| 14 | +// limitations under the License. |
| 15 | +//===----------------------------------------------------------------------===// |
| 16 | + |
| 17 | +import ContainerizationError |
| 18 | +import GRPCCore |
| 19 | +import GRPCNIOTransportCore |
| 20 | +import NIOCore |
| 21 | +import NIOPosix |
| 22 | + |
| 23 | +/// Buffers incoming bytes until the full gRPC HTTP/2 pipeline is configured, then replays them. |
| 24 | +/// |
| 25 | +/// This prevents the race condition where the vminitd server's initial HTTP/2 SETTINGS frame |
| 26 | +/// arrives and is discarded before `configureGRPCClientPipeline` has finished installing |
| 27 | +/// `ClientConnectionHandler`. |
| 28 | +/// |
| 29 | +/// The handler is added via `ClientBootstrap.channelInitializer`, which runs before |
| 30 | +/// `registerAlreadyConfigured0` adds the fd to epoll/kqueue — guaranteeing it is in place |
| 31 | +/// before any bytes can arrive on the socket. |
| 32 | +/// |
| 33 | +/// When `NIOHTTP2Handler` is added to the pipeline (inside `configureGRPCClientPipeline`), its |
| 34 | +/// `handlerAdded` fires an outbound flush (the HTTP/2 client preface). We intercept that flush |
| 35 | +/// and schedule a deferred removal via the event loop. Because `configureGRPCClientPipeline` runs |
| 36 | +/// as a single synchronous event loop task, the deferred removal is guaranteed to run after that |
| 37 | +/// entire task completes — i.e., after `ClientConnectionHandler` is also in the pipeline. |
| 38 | +/// Buffered bytes are replayed atomically as part of the pipeline removal. |
| 39 | + |
| 40 | +// FIXME: This handler is needed until the swift GRPC libraries offers us a way to create a |
| 41 | +// client transport from an existing fd. Remove this type when such an API exists. |
| 42 | +public final class HTTP2ConnectBufferingHandler: ChannelDuplexHandler, RemovableChannelHandler { |
| 43 | + public typealias InboundIn = ByteBuffer |
| 44 | + public typealias InboundOut = ByteBuffer |
| 45 | + public typealias OutboundIn = ByteBuffer |
| 46 | + public typealias OutboundOut = ByteBuffer |
| 47 | + |
| 48 | + private var removalScheduled = false |
| 49 | + private var bufferedReads: [NIOAny] = [] |
| 50 | + |
| 51 | + public init() {} |
| 52 | + |
| 53 | + public func channelRead(context: ChannelHandlerContext, data: NIOAny) { |
| 54 | + bufferedReads.append(data) |
| 55 | + } |
| 56 | + |
| 57 | + public func channelReadComplete(context: ChannelHandlerContext) { |
| 58 | + // Suppress while buffering; a single readComplete is emitted after replay. |
| 59 | + } |
| 60 | + |
| 61 | + public func flush(context: ChannelHandlerContext) { |
| 62 | + if !removalScheduled { |
| 63 | + removalScheduled = true |
| 64 | + // Defer removal to the next event loop task. configureGRPCClientPipeline runs as a |
| 65 | + // single synchronous event loop task, so this deferred task is guaranteed to run |
| 66 | + // after that whole task completes (including ClientConnectionHandler being added). |
| 67 | + context.eventLoop.assumeIsolatedUnsafeUnchecked().execute { |
| 68 | + context.pipeline.syncOperations.removeHandler(self, promise: nil) |
| 69 | + } |
| 70 | + } |
| 71 | + context.flush() |
| 72 | + } |
| 73 | + |
| 74 | + public func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) { |
| 75 | + var didRead = false |
| 76 | + while !bufferedReads.isEmpty { |
| 77 | + context.fireChannelRead(bufferedReads.removeFirst()) |
| 78 | + didRead = true |
| 79 | + } |
| 80 | + if didRead { |
| 81 | + context.fireChannelReadComplete() |
| 82 | + } |
| 83 | + context.leavePipeline(removalToken: removalToken) |
| 84 | + } |
| 85 | + |
| 86 | + public func channelInactive(context: ChannelHandlerContext) { |
| 87 | + bufferedReads.removeAll() |
| 88 | + context.fireChannelInactive() |
| 89 | + } |
| 90 | +} |
0 commit comments