Skip to content

Commit 0574b56

Browse files
committed
Expose ResourceWatch providing error and event callbacks
1 parent c1d1be9 commit 0574b56

File tree

6 files changed

+130
-46
lines changed

6 files changed

+130
-46
lines changed

Sources/SwiftkubeClient/Client/ClusterScopedGenericKubernetesClient.swift

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,36 @@ public extension ClusterScopedGenericKubernetesClient where Resource: ReadableRe
5555
/// event paired with the corresponding resource as a pair to the `eventHandler`.
5656
///
5757
/// - Returns: A cancellable `HTTPClient.Task` instance, representing a streaming connetion to the API server.
58-
func watch(options: [ListOption]? = nil, eventHandler: @escaping ResourceWatch<Resource>.EventHandler) throws -> HTTPClient.Task<Void> {
59-
try super.watch(in: .allNamespaces, options: options, using: ResourceWatch<Resource>(eventHandler))
58+
func watch(
59+
options: [ListOption]? = nil,
60+
eventHandler: @escaping ResourceWatch<Resource>.EventHandler
61+
) throws -> HTTPClient.Task<Void> {
62+
let resourceWatch = ResourceWatch<Resource>(onError: nil, onEvent: eventHandler)
63+
return try watch(options: options, resourceWatch: resourceWatch)
64+
}
65+
66+
/// Watches cluster-scoped resources.
67+
///
68+
/// Watching resources opens a persistent connection to the API server. The connection is represented by a `HTTPClient.Task` instance, that acts
69+
/// as an active "subscription" to the events stream. The task can be cancelled any time to stop the watch.
70+
///
71+
/// ```swift
72+
/// let task: HTTPClient.Task<Void> = client.namespaces.watch() { (event, namespace) in
73+
/// print("\(event): \(namespace)")
74+
/// }
75+
///
76+
/// task.cancel()
77+
/// ```
78+
///
79+
/// - Parameter eventHandler: A `ResourceWatch` instance, which is used as a callback for new events. The clients sends each
80+
/// event paired with the corresponding resource as a pair to the `eventHandler`. Errors are sent to the `errorHandler`.
81+
///
82+
/// - Returns: A cancellable `HTTPClient.Task` instance, representing a streaming connetion to the API server.
83+
func watch(
84+
options: [ListOption]? = nil,
85+
resourceWatch: ResourceWatch<Resource>
86+
) throws -> HTTPClient.Task<Void> {
87+
try super.watch(in: .allNamespaces, options: options, using: resourceWatch)
6088
}
6189
}
6290

Sources/SwiftkubeClient/Client/GenericKubernetesClient.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ internal extension GenericKubernetesClient {
378378
/// - Returns: A cancellable `HTTPClient.Task` instance, representing a streaming connetion to the API server.
379379
func watch(in namespace: NamespaceSelector, options: [ListOption]? = nil, using watch: ResourceWatch<Resource>) throws -> HTTPClient.Task<Void> {
380380
let request = try makeRequest().toWatch().in(namespace).with(options: options).build()
381-
let delegate = WatchDelegate(watch: watch, logger: logger)
381+
let delegate = WatchDelegate(watcher: watch, logger: logger)
382382

383383
return httpClient.execute(request: request, delegate: delegate, logger: logger)
384384
}
@@ -407,7 +407,7 @@ internal extension GenericKubernetesClient {
407407
/// - Returns: A cancellable `HTTPClient.Task` instance, representing a streaming connetion to the API server.
408408
func follow(in namespace: NamespaceSelector, name: String, container: String?, using watch: LogWatch) throws -> HTTPClient.Task<Void> {
409409
let request = try makeRequest().toFollow(pod: name, container: container).in(namespace).build()
410-
let delegate = WatchDelegate(watch: watch, logger: logger)
410+
let delegate = WatchDelegate(watcher: watch, logger: logger)
411411

412412
return httpClient.execute(request: request, delegate: delegate, logger: logger)
413413
}

Sources/SwiftkubeClient/Client/NamespacedGenericKubernetesClient+Pod.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@ import SwiftkubeModel
2020

2121
public extension NamespacedGenericKubernetesClient where Resource == core.v1.Pod {
2222

23-
func follow(in namespace: NamespaceSelector? = nil, name: String, container: String?, lineHandler: @escaping LogWatch.LineHandler) throws -> HTTPClient.Task<Void> {
24-
try super.follow(in: namespace ?? .namespace(config.namespace), name: name, container: container, using: LogWatch(logger: logger, lineHandler))
23+
func follow(
24+
in namespace: NamespaceSelector? = nil,
25+
name: String, container: String?,
26+
lineHandler: @escaping LogWatch.LineHandler
27+
) throws -> HTTPClient.Task<Void> {
28+
let logWatch = LogWatch(onError: nil, onNext: lineHandler)
29+
return try super.follow(in: namespace ?? .namespace(config.namespace), name: name, container: container, using: logWatch)
2530
}
2631

2732
func status(in namespace: NamespaceSelector? = nil, name: String) throws -> EventLoopFuture<core.v1.Pod> {

Sources/SwiftkubeClient/Client/NamespacedGenericKubernetesClient.swift

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,42 @@ public extension NamespacedGenericKubernetesClient where Resource: ReadableResou
6363
/// event paired with the corresponding resource as a pair to the `eventHandler`.
6464
///
6565
/// - Returns: A cancellable `HTTPClient.Task` instance, representing a streaming connetion to the API server.
66-
func watch(in namespace: NamespaceSelector? = nil, options: [ListOption]? = nil, eventHandler: @escaping ResourceWatch<Resource>.EventHandler) throws -> HTTPClient.Task<Void> {
67-
try super.watch(in: namespace ?? .namespace(config.namespace), options: options, using: ResourceWatch<Resource>(logger: logger, eventHandler))
66+
func watch(
67+
in namespace: NamespaceSelector? = nil,
68+
options: [ListOption]? = nil,
69+
eventHandler: @escaping ResourceWatch<Resource>.EventHandler
70+
) throws -> HTTPClient.Task<Void> {
71+
let resourceWatch = ResourceWatch<Resource>(onError: nil, onEvent: eventHandler)
72+
return try watch(in: namespace, options: options, resourceWatch: resourceWatch)
73+
}
74+
75+
/// Watches the API resources in the given namespace.
76+
///
77+
/// Watching resources opens a persistent connection to the API server. The connection is represented by a `HTTPClient.Task` instance, that acts
78+
/// as an active "subscription" to the events stream. The task can be cancelled any time to stop the watch.
79+
///
80+
/// If the namespace is not specified then the default namespace defined in the `KubernetesClientConfig` will be used instead.
81+
///
82+
/// ```swift
83+
/// let watch = ResourceWatch<core.v1.Pod>(logger: logger, onError: errorHandler) { (event, pod) in
84+
/// print("\(event): \(pod)")
85+
/// }
86+
/// let task: HTTPClient.Task<Void> = client.pods.watch(in: .namespace("default"), resourceWatch: watch)
87+
/// task.cancel()
88+
/// ```
89+
///
90+
/// - Parameters:
91+
/// - namespace: The namespace for this API request.
92+
/// - resourceWatch: A `ResourceWatch` instance, which is used for error and event callbacks. The clients sends each
93+
/// event paired with the corresponding resource as a pair to the `eventHandler`. Errors are sent to the `errorHandler`.
94+
///
95+
/// - Returns: A cancellable `HTTPClient.Task` instance, representing a streaming connetion to the API server.
96+
func watch(
97+
in namespace: NamespaceSelector? = nil,
98+
options: [ListOption]? = nil,
99+
resourceWatch: ResourceWatch<Resource>
100+
) throws -> HTTPClient.Task<Void> {
101+
try super.watch(in: namespace ?? .namespace(config.namespace), options: options, using: resourceWatch)
68102
}
69103
}
70104

Sources/SwiftkubeClient/SwiftkubeClient.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public enum SwiftkubeClientError: Error {
3030
case decodingError(String)
3131
/// Thrown on all errors returned from the Kubernetes API server.
3232
case requestError(meta.v1.Status)
33+
/// Thrown when the underlying HTTPClient reports an error.
34+
case clientError(Error)
3335

3436
internal static func methodNotAllowed(_ method: HTTPMethod) -> SwiftkubeClientError {
3537
let status = sk.status {

Sources/SwiftkubeClient/Watch/ResourceWatch.swift

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -32,28 +32,40 @@ public enum EventType: String, RawRepresentable {
3232

3333
// MARK: - Watcher
3434

35-
protocol Watcher {
36-
func handle(payload: Data)
35+
public protocol Watcher {
36+
typealias ErrorHandler = (SwiftkubeClientError) -> Void
37+
38+
func onError(error: SwiftkubeClientError)
39+
func onNext(payload: Data)
3740
}
3841

3942
// MARK: - ResourceWatch
4043

41-
final public class ResourceWatch<Resource: KubernetesAPIResource>: Watcher {
44+
open class ResourceWatch<Resource: KubernetesAPIResource>: Watcher {
4245

4346
public typealias EventHandler = (EventType, Resource) -> Void
4447

45-
private let decoder = JSONDecoder()
46-
private let handler: EventHandler
47-
private let logger: Logger
48+
private let decoder: JSONDecoder
49+
private let errorHandler: ErrorHandler?
50+
private let eventHandler: EventHandler
51+
52+
public init(
53+
decoder: JSONDecoder = JSONDecoder(),
54+
onError errorHandler: ErrorHandler? = nil,
55+
onEvent eventHandler: @escaping EventHandler
56+
) {
57+
self.decoder = decoder
58+
self.errorHandler = errorHandler
59+
self.eventHandler = eventHandler
60+
}
4861

49-
init(logger: Logger? = nil, _ handler: @escaping EventHandler) {
50-
self.handler = handler
51-
self.logger = logger ?? KubernetesClient.loggingDisabled
62+
public func onError(error: SwiftkubeClientError) {
63+
errorHandler?(error)
5264
}
5365

54-
internal func handle(payload: Data) {
66+
public func onNext(payload: Data) {
5567
guard let string = String(data: payload, encoding: .utf8) else {
56-
logger.warning("Could not deserialize payload")
68+
errorHandler?(.decodingError("Could not deserialize payload"))
5769
return
5870
}
5971

@@ -62,45 +74,52 @@ final public class ResourceWatch<Resource: KubernetesAPIResource>: Watcher {
6274
let data = line.data(using: .utf8),
6375
let event = try? self.decoder.decode(meta.v1.WatchEvent.self, from: data)
6476
else {
65-
self.logger.warning("Error decoding meta.v1.WatchEvent payload")
77+
self.errorHandler?(.decodingError("Error decoding meta.v1.WatchEvent payload"))
6678
return
6779
}
6880

6981
guard let eventType = EventType(rawValue: event.type) else {
70-
self.logger.warning("Error parsing EventType")
82+
self.errorHandler?(.decodingError("Error parsing EventType"))
7183
return
7284
}
7385

7486
guard
7587
let jsonData = try? JSONSerialization.data(withJSONObject: event.object),
7688
let resource = try? self.decoder.decode(Resource.self, from: jsonData)
7789
else {
78-
self.logger.warning("Error deserializing \(String(describing: Resource.self))")
90+
self.errorHandler?(.decodingError("Error deserializing \(String(describing: Resource.self))"))
7991
return
8092
}
8193

82-
self.handler(eventType, resource)
94+
self.eventHandler(eventType, resource)
8395
}
8496
}
8597
}
8698

8799
// MARK: - LogWatch
88100

89-
final public class LogWatch: Watcher {
101+
open class LogWatch: Watcher {
90102

91103
public typealias LineHandler = (String) -> Void
92104

93-
private let logger: Logger
105+
private let errorHandler: ErrorHandler?
94106
private let lineHandler: LineHandler
95107

96-
public init(logger: Logger? = nil, _ lineHandler: @escaping LineHandler = { line in print(line) }) {
97-
self.logger = logger ?? KubernetesClient.loggingDisabled
108+
public init(
109+
onError errorHandler: ErrorHandler? = nil,
110+
onNext lineHandler: @escaping LineHandler
111+
) {
112+
self.errorHandler = errorHandler
98113
self.lineHandler = lineHandler
99114
}
100115

101-
internal func handle(payload: Data) {
116+
public func onError(error: SwiftkubeClientError) {
117+
errorHandler?(error)
118+
}
119+
120+
public func onNext(payload: Data) {
102121
guard let string = String(data: payload, encoding: .utf8) else {
103-
logger.warning("Could not deserialize payload")
122+
errorHandler?(.decodingError("Could not deserialize payload"))
104123
return
105124
}
106125

@@ -116,35 +135,30 @@ internal class WatchDelegate: HTTPClientResponseDelegate {
116135

117136
typealias Response = Void
118137

119-
private let watch: Watcher
138+
private let watcher: Watcher
120139
private let logger: Logger
121140

122-
init(watch: Watcher, logger: Logger) {
123-
self.watch = watch
141+
init(watcher: Watcher, logger: Logger) {
142+
self.watcher = watcher
124143
self.logger = logger
125144
}
126145

127-
func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead) {
128-
logger.debug("Did send request head: \(head.headers)")
129-
}
130-
131-
func didSendRequestPart(task: HTTPClient.Task<Response>, _ part: IOData) {
132-
logger.debug("Did send request part: \(part)")
133-
}
134-
135-
func didSendRequest(task: HTTPClient.Task<Response>) {
136-
logger.debug("Did send request: \(task)")
137-
}
138-
139146
func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
140147
logger.debug("Did receive response head: \(head.headers)")
148+
switch head.status.code {
149+
case HTTPResponseStatus.badRequest.code:
150+
watcher.onError(error: .badRequest(head.status.reasonPhrase))
151+
default:
152+
watcher.onError(error: .emptyResponse)
153+
}
154+
141155
return task.eventLoop.makeSucceededFuture(())
142156
}
143157

144158
func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
145159
logger.debug("Did receive body part: \(task)")
146160
let payload = Data(buffer: buffer)
147-
watch.handle(payload: payload)
161+
watcher.onNext(payload: payload)
148162
return task.eventLoop.makeSucceededFuture(())
149163
}
150164

@@ -154,6 +168,7 @@ internal class WatchDelegate: HTTPClientResponseDelegate {
154168
}
155169

156170
func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
157-
logger.warning("Did receive error: \(error.localizedDescription)")
171+
logger.debug("Did receive error: \(error.localizedDescription)")
172+
watcher.onError(error: .clientError(error))
158173
}
159174
}

0 commit comments

Comments
 (0)