Skip to content

Commit 17de58b

Browse files
committed
Add streaming responses (NDJSON / SSE / chunked) — 1.6.0
- New URLSessionStreamingProtocol with default URLSession conformance bridging URLSession.bytes(for:) into a Sendable AsyncThrowingStream<UInt8, Error>. - New NetworkStreaming protocol and StreamingResponse value type with bytes / lines() / ndjson(as:decoder:) helpers (CRLF-aware, blank-line skipping, resilient to multi-byte UTF-8 split across chunks). - New NetworkManager.stream(_:accessToken:) reusing the same request pipeline as send(_:): headers, Authorization, User-Agent, body, baseURL. Handles 401 via tokenRefresher (one retry before any byte is emitted), drains non-2xx body up to 1 MiB into HTTPError / errorDecoder. - New StreamingError cases: invalidResponse, errorPayloadTooLarge. - NetworkManager init gains optional streamingSession parameter; existing call sites stay source-compatible (URLSession satisfies the new protocol by default). - Internal refactor: extracted buildURLRequest(_:accessToken:) as the single source of truth for URLRequest construction, shared by send and stream. No behaviour change for send (162 existing tests pass). - 8 new tests for streaming pipeline (170 total). - Documentation: CHANGELOG, API.md / API_RU.md (new "Streaming" section), README badges and Key Features, PROJECT_STRUCTURE. - Version bumped 1.5.1 -> 1.6.0.
1 parent 73c74f4 commit 17de58b

11 files changed

Lines changed: 996 additions & 76 deletions

File tree

.github/workflows/swift.yml

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -191,50 +191,3 @@ jobs:
191191
echo "- Total lines: ${{ steps.calc.outputs.total }}" >> $GITHUB_STEP_SUMMARY
192192
echo "- Covered lines: ${{ steps.calc.outputs.covered }}" >> $GITHUB_STEP_SUMMARY
193193
echo "- Coverage: ${{ steps.calc.outputs.coverage }}%" >> $GITHUB_STEP_SUMMARY
194-
195-
release:
196-
name: Tag and Release
197-
runs-on: ubuntu-latest
198-
needs: test
199-
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
200-
permissions:
201-
contents: write
202-
203-
steps:
204-
- name: Checkout code
205-
uses: actions/checkout@v4
206-
with:
207-
fetch-depth: 0
208-
209-
- name: Read version
210-
id: version
211-
run: |
212-
VERSION=$(sed -n 's/.*EKNetworkVersionString = "\(.*\)".*/\1/p' Sources/EKNetwork/Version.swift)
213-
if [ -z "$VERSION" ]; then
214-
echo "❌ Failed to read version from Version.swift"
215-
exit 1
216-
fi
217-
echo "version=$VERSION" >> "$GITHUB_OUTPUT"
218-
echo "tag=v$VERSION" >> "$GITHUB_OUTPUT"
219-
220-
- name: Create tag
221-
id: tag
222-
run: |
223-
TAG="${{ steps.version.outputs.tag }}"
224-
if git rev-parse "$TAG" >/dev/null 2>&1; then
225-
echo "exists=true" >> "$GITHUB_OUTPUT"
226-
echo "Tag $TAG already exists, skipping tag creation."
227-
exit 0
228-
fi
229-
git config user.name "github-actions[bot]"
230-
git config user.email "github-actions[bot]@users.noreply.github.com"
231-
git tag -a "$TAG" -m "Release $TAG"
232-
git push origin "$TAG"
233-
echo "exists=false" >> "$GITHUB_OUTPUT"
234-
235-
- name: Create GitHub release
236-
if: steps.tag.outputs.exists == 'false'
237-
uses: softprops/action-gh-release@v2
238-
with:
239-
tag_name: ${{ steps.version.outputs.tag }}
240-
generate_release_notes: true

API.md

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Complete API documentation for EKNetwork library.
1212
- [RetryPolicy](#retrypolicy)
1313
- [NetworkProgress](#networkprogress)
1414
- [TokenRefreshProvider](#tokenrefreshprovider)
15+
- [Streaming (NDJSON / SSE)](#streaming-ndjson--sse)
1516
- [Error Types](#error-types)
1617
- [Response Types](#response-types)
1718
- [UserAgentConfiguration](#useragentconfiguration)
@@ -28,6 +29,7 @@ The main class for managing network requests.
2829
public init(
2930
baseURL: @escaping (() -> URL),
3031
session: URLSessionProtocol = URLSession.shared,
32+
streamingSession: URLSessionStreamingProtocol? = nil,
3133
loggerSubsystem: String = "com.yourapp.networking",
3234
userAgentConfiguration: UserAgentConfiguration? = nil,
3335
responseDecoderProvider: (() -> JSONDecoder)? = nil
@@ -37,6 +39,7 @@ public init(
3739
**Parameters:**
3840
- `baseURL`: Closure that returns the base URL for each request. Use `{ myURL }` for a fixed URL or a closure that reads from config/environment for dynamic base URL (avoids race conditions when switching environments).
3941
- `session`: The `URLSessionProtocol` to use for making requests (defaults to `URLSession.shared`)
42+
- `streamingSession`: Optional session used by `stream(_:accessToken:)` for byte-stream responses (NDJSON / SSE / chunked transfer). When `nil` (default) the manager reuses `session` if it conforms to `URLSessionStreamingProtocol` (default `URLSession` does), otherwise falls back to `URLSession.shared`. Added in 1.6.0; existing call sites stay source-compatible.
4043
- `loggerSubsystem`: The subsystem identifier for the `Logger` instance
4144
- `userAgentConfiguration`: Optional User-Agent configuration
4245
- `responseDecoderProvider`: Optional global JSON decoder provider for responses (overrides per-request decoding when enabled)
@@ -407,6 +410,113 @@ class TokenManager: TokenRefreshProvider {
407410

408411
---
409412

413+
## Streaming (NDJSON / SSE)
414+
415+
> Available since **1.6.0**.
416+
417+
`send(_:accessToken:)` is designed for endpoints that return a complete `Decodable` body. For endpoints that emit data incrementally — newline-delimited JSON, Server-Sent Events, chunked log/inference streams — use `stream(_:accessToken:)`. The streaming entry point reuses the **exact same** request-construction pipeline (headers, `Authorization`, `User-Agent`, body, base URL), so app-level code never has to build a `URLRequest` by hand and risk dropping required headers like `X-Device-ID` or custom auth.
418+
419+
### NetworkStreaming protocol
420+
421+
```swift
422+
public protocol NetworkStreaming: AnyObject {
423+
func stream<T: NetworkRequest>(
424+
_ request: T,
425+
accessToken: (() -> String?)?
426+
) async throws -> StreamingResponse
427+
}
428+
```
429+
430+
`NetworkManager` conforms to both `NetworkManaging` and `NetworkStreaming`. Existing mocks of `NetworkManaging` are unaffected.
431+
432+
### StreamingResponse
433+
434+
```swift
435+
public struct StreamingResponse: Sendable {
436+
public let statusCode: Int
437+
public let headers: [String: String]
438+
public let bytes: AsyncThrowingStream<UInt8, Error>
439+
440+
public func lines() -> AsyncThrowingStream<String, Error>
441+
public func ndjson<Item: Decodable & Sendable>(
442+
as itemType: Item.Type,
443+
decoder: JSONDecoder = JSONDecoder()
444+
) -> AsyncThrowingStream<Item, Error>
445+
}
446+
```
447+
448+
- `bytes` — raw octet stream, one `UInt8` per element, in arrival order.
449+
- `lines()` — UTF-8 lines split on `\n`, trailing `\r` trimmed (CRLF-aware), blank lines skipped. Resilient to multi-byte sequences split across TCP segments.
450+
- `ndjson(as:decoder:)` — one `Decodable` record per non-empty line. A bad line throws and finishes the stream.
451+
452+
Cancellation propagates automatically: breaking out of iteration or cancelling the surrounding `Task` cancels the underlying network task.
453+
454+
### URLSessionStreamingProtocol
455+
456+
```swift
457+
public protocol URLSessionStreamingProtocol: Sendable {
458+
func byteStream(for request: URLRequest) async throws -> (AsyncThrowingStream<UInt8, Error>, URLResponse)
459+
}
460+
```
461+
462+
`URLSession` conforms by default (bridges `URLSession.bytes(for:)` into a fully `Sendable` stream). Implement this protocol for unit-testing the streaming pipeline without hitting the network.
463+
464+
### Behaviour summary
465+
466+
| Concern | `send(_:)` | `stream(_:)` |
467+
|---|---|---|
468+
| Headers, body, auth | `buildURLRequest` | `buildURLRequest` (same path) |
469+
| 401 → token refresh + retry | once, when `allowsRetry == true` | once, before any body byte is emitted |
470+
| Mid-stream 401 | n/a | not retried (body has already started) |
471+
| Non-2xx error | `HTTPError` / `errorDecoder` | drain ≤1 MiB, then `HTTPError` / `errorDecoder` |
472+
| Retry policy (`RetryPolicy`) | applied | not applied (streams cannot be replayed deterministically) |
473+
| Progress (`NetworkProgress`) | applied | not applied |
474+
475+
### StreamingError
476+
477+
```swift
478+
public enum StreamingError: Error, Equatable {
479+
case invalidResponse // response was not an HTTPURLResponse
480+
case errorPayloadTooLarge(limitBytes: Int) // non-2xx body exceeded 1 MiB cap
481+
}
482+
```
483+
484+
### Example: NDJSON search
485+
486+
```swift
487+
struct PlayerSearchRequest: NetworkRequest {
488+
typealias Response = EmptyResponse // unused for streaming
489+
var path: String { "/api/v1/players/search" }
490+
var method: HTTPMethod { .get }
491+
var queryParameters: [String: String]? { ["q": query, "stream": "true"] }
492+
var headers: [String: String]? { DeviceHeaders.current() }
493+
let query: String
494+
}
495+
496+
let response = try await manager.stream(
497+
PlayerSearchRequest(query: "Bobr"),
498+
accessToken: { TokenStore.shared.accessToken }
499+
)
500+
501+
for try await item in response.ndjson(as: SearchEvent.self) {
502+
handle(item) // render incrementally as items arrive
503+
if case .end = item { break }
504+
}
505+
```
506+
507+
### Example: Server-Sent Events
508+
509+
```swift
510+
let response = try await manager.stream(MyEventsRequest(), accessToken: nil)
511+
for try await line in response.lines() {
512+
guard line.hasPrefix("data:") else { continue }
513+
let payload = line.dropFirst("data:".count).trimmingCharacters(in: .whitespaces)
514+
process(payload)
515+
}
516+
```
517+
518+
---
519+
410520
## Error Types
411521

412522
### NonRetriableError

API_RU.md

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
- [RetryPolicy](#retrypolicy)
1313
- [NetworkProgress](#networkprogress)
1414
- [TokenRefreshProvider](#tokenrefreshprovider)
15+
- [Стриминг (NDJSON / SSE)](#стриминг-ndjson--sse)
1516
- [Типы ошибок](#типы-ошибок)
1617
- [Типы ответов](#типы-ответов)
1718
- [UserAgentConfiguration](#useragentconfiguration)
@@ -28,6 +29,7 @@
2829
public init(
2930
baseURL: @escaping (() -> URL),
3031
session: URLSessionProtocol = URLSession.shared,
32+
streamingSession: URLSessionStreamingProtocol? = nil,
3133
loggerSubsystem: String = "com.yourapp.networking",
3234
userAgentConfiguration: UserAgentConfiguration? = nil,
3335
responseDecoderProvider: (() -> JSONDecoder)? = nil
@@ -37,6 +39,7 @@ public init(
3739
**Параметры:**
3840
- `baseURL`: Замыкание, возвращающее базовый URL для каждого запроса. Используйте `{ myURL }` для фиксированного URL или замыкание, читающее из конфига/окружения, для динамического базового URL (без гонок при переключении окружений).
3941
- `session`: `URLSessionProtocol` для выполнения запросов (по умолчанию `URLSession.shared`)
42+
- `streamingSession`: Опциональная сессия для `stream(_:accessToken:)` (NDJSON / SSE / chunked transfer). Если не передана, менеджер использует `session`, если та поддерживает `URLSessionStreamingProtocol` (`URLSession` поддерживает по умолчанию), иначе — `URLSession.shared`. Добавлено в 1.6.0; существующие вызовы инициализатора остаются совместимыми.
4043
- `loggerSubsystem`: Идентификатор подсистемы для экземпляра `Logger`
4144
- `userAgentConfiguration`: Опциональная конфигурация User-Agent
4245
- `responseDecoderProvider`: Опциональный глобальный JSON-декодер для ответов (может переопределять декодирование запросов)
@@ -404,6 +407,113 @@ class TokenManager: TokenRefreshProvider {
404407

405408
---
406409

410+
## Стриминг (NDJSON / SSE)
411+
412+
> Доступно начиная с **1.6.0**.
413+
414+
`send(_:accessToken:)` рассчитан на эндпоинты, отдающие тело целиком одним `Decodable`-объектом. Для эндпоинтов, которые отдают данные постепенно — newline-delimited JSON, Server-Sent Events, chunked-логи / inference-стримы — используйте `stream(_:accessToken:)`. Стриминг переиспользует **тот же самый** пайплайн построения запроса (заголовки, `Authorization`, `User-Agent`, тело, baseURL), что и `send(_:)`. То есть прикладному коду никогда не нужно собирать `URLRequest` вручную и рисковать потерять обязательные заголовки вроде `X-Device-ID` или кастомной авторизации.
415+
416+
### Протокол NetworkStreaming
417+
418+
```swift
419+
public protocol NetworkStreaming: AnyObject {
420+
func stream<T: NetworkRequest>(
421+
_ request: T,
422+
accessToken: (() -> String?)?
423+
) async throws -> StreamingResponse
424+
}
425+
```
426+
427+
`NetworkManager` соответствует и `NetworkManaging`, и `NetworkStreaming`. Существующие моки `NetworkManaging` остаются рабочими.
428+
429+
### StreamingResponse
430+
431+
```swift
432+
public struct StreamingResponse: Sendable {
433+
public let statusCode: Int
434+
public let headers: [String: String]
435+
public let bytes: AsyncThrowingStream<UInt8, Error>
436+
437+
public func lines() -> AsyncThrowingStream<String, Error>
438+
public func ndjson<Item: Decodable & Sendable>(
439+
as itemType: Item.Type,
440+
decoder: JSONDecoder = JSONDecoder()
441+
) -> AsyncThrowingStream<Item, Error>
442+
}
443+
```
444+
445+
- `bytes` — поток сырых байт (по одному `UInt8`), в порядке прихода.
446+
- `lines()` — UTF-8 строки, разделённые `\n`, с обрезкой `\r` (CRLF-aware), пустые строки пропускаются. Корректно собирает многобайтовые UTF-8 последовательности, разрезанные TCP-сегментами.
447+
- `ndjson(as:decoder:)` — по одному `Decodable`-объекту на каждую непустую строку. Битая строка — стрим завершается ошибкой.
448+
449+
Отмена пробрасывается автоматически: выход из `for try await` или отмена внешнего `Task` отменяет сетевую задачу.
450+
451+
### URLSessionStreamingProtocol
452+
453+
```swift
454+
public protocol URLSessionStreamingProtocol: Sendable {
455+
func byteStream(for request: URLRequest) async throws -> (AsyncThrowingStream<UInt8, Error>, URLResponse)
456+
}
457+
```
458+
459+
`URLSession` соответствует протоколу по умолчанию (мостит `URLSession.bytes(for:)` в полностью `Sendable`-стрим). Реализуйте этот протокол в моках, если хотите тестировать стриминговый пайплайн без сети.
460+
461+
### Поведение в сравнении с `send(_:)`
462+
463+
| Аспект | `send(_:)` | `stream(_:)` |
464+
|---|---|---|
465+
| Заголовки, тело, авторизация | `buildURLRequest` | `buildURLRequest` (та же точка) |
466+
| 401 → refresh + retry | один раз, если `allowsRetry == true` | один раз, до того как пришёл хоть один байт тела |
467+
| 401 в середине стрима | n/a | не ретраится (тело уже начали отдавать) |
468+
| Не-2xx ошибка | `HTTPError` / `errorDecoder` | drain ≤1 МиБ, затем `HTTPError` / `errorDecoder` |
469+
| RetryPolicy | применяется | не применяется (стрим нельзя детерминированно проиграть заново) |
470+
| NetworkProgress | применяется | не применяется |
471+
472+
### StreamingError
473+
474+
```swift
475+
public enum StreamingError: Error, Equatable {
476+
case invalidResponse // ответ не HTTPURLResponse
477+
case errorPayloadTooLarge(limitBytes: Int) // тело не-2xx ответа превысило 1 МиБ
478+
}
479+
```
480+
481+
### Пример: NDJSON-поиск
482+
483+
```swift
484+
struct PlayerSearchRequest: NetworkRequest {
485+
typealias Response = EmptyResponse // не используется в стриминге
486+
var path: String { "/api/v1/players/search" }
487+
var method: HTTPMethod { .get }
488+
var queryParameters: [String: String]? { ["q": query, "stream": "true"] }
489+
var headers: [String: String]? { DeviceHeaders.current() }
490+
let query: String
491+
}
492+
493+
let response = try await manager.stream(
494+
PlayerSearchRequest(query: "Бобр"),
495+
accessToken: { TokenStore.shared.accessToken }
496+
)
497+
498+
for try await item in response.ndjson(as: SearchEvent.self) {
499+
handle(item) // отрисовка по мере поступления
500+
if case .end = item { break }
501+
}
502+
```
503+
504+
### Пример: Server-Sent Events
505+
506+
```swift
507+
let response = try await manager.stream(MyEventsRequest(), accessToken: nil)
508+
for try await line in response.lines() {
509+
guard line.hasPrefix("data:") else { continue }
510+
let payload = line.dropFirst("data:".count).trimmingCharacters(in: .whitespaces)
511+
process(payload)
512+
}
513+
```
514+
515+
---
516+
407517
## Типы ошибок
408518

409519
### NetworkError

CHANGELOG.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
### Changed
1313

14+
## [1.6.0] - 2026-05-09
15+
16+
### Added
17+
- **Streaming responses** — first-class support for endpoints that emit data as it is produced (NDJSON, Server-Sent Events, chunked transfer):
18+
- New `URLSessionStreamingProtocol` with default `URLSession` conformance (bridges `URLSession.bytes(for:)` into a `Sendable` `AsyncThrowingStream<UInt8, Error>`).
19+
- New `NetworkStreaming` protocol exposing `stream(_:accessToken:)`.
20+
- New `StreamingResponse` value type with `bytes`, `lines()` (UTF-8 lines, CRLF/LF aware, blank lines skipped) and `ndjson(as:decoder:)` helpers.
21+
- New `StreamingError` cases: `invalidResponse`, `errorPayloadTooLarge`.
22+
- `NetworkManager.stream(_:accessToken:)` reuses the same request-construction pipeline as `send(_:)` (headers, access token, body, User-Agent, base URL) — no duplication of header logic in app code.
23+
- Streaming requests handle 401 by calling `tokenRefresher` and retrying once when `request.allowsRetry == true` (mid-stream 401s are not retried).
24+
- Non-2xx streaming responses drain up to 1 MiB into `HTTPError` (or run `request.errorDecoder` if provided), so error handling matches `send(_:)`.
25+
- **`NetworkManager` initializer** gained a new optional `streamingSession: URLSessionStreamingProtocol? = nil` parameter. When omitted, the manager reuses the regular `session` if it conforms to streaming (default `URLSession` does), otherwise falls back to `URLSession.shared`. Existing initializer call sites stay source-compatible.
26+
- **Tests**: 8 new tests covering streaming pipeline, NDJSON decoding across chunk boundaries, CRLF handling, 401 + token refresh, custom `errorDecoder`, and default-session resolution. Total **170 tests**.
27+
28+
### Changed
29+
- **Internal refactor (no behaviour change)**: extracted `NetworkManager.buildURLRequest(_:accessToken:)` so both `send(_:)` and `stream(_:accessToken:)` share a single `URLRequest`-construction routine. Existing test suite (162 tests) passes unchanged.
30+
31+
### Migration (1.5.x → 1.6.0)
32+
- **No source changes required**. All previous code keeps compiling and behaving identically.
33+
- To stream a response, call `manager.stream(MyRequest(), accessToken: { token })` instead of `manager.send(...)` and consume `response.ndjson(as: Item.self)` / `response.lines()` / `response.bytes`. Headers and authentication are applied via the same `NetworkRequest.headers` and `accessToken` you already use.
34+
1435
## [1.5.0] - 2026-02-11
1536

1637
### Added

0 commit comments

Comments
 (0)