Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions packages/serinus/bin/serinus.dart
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ class AppController extends Controller {
final id = context.paramAs<int?>('id');
return {'message': 'Data for id: $id'};
});
onStream(Route.get('/stream'), (RequestContext context) async* {
for (int i = 0; i < 5; i++) {
yield 'Stream message $i';
await Future.delayed(Duration(seconds: 1));
}
});
}
}

Expand Down Expand Up @@ -123,15 +129,12 @@ class MyModelProvider extends ModelProvider {
}

void main(List<String> arguments) async {
final application = await serinus.createMinimalApplication(
final application = await serinus.createApplication(
entrypoint: AppModule(),
host: InternetAddress.anyIPv4.address,
port: 3002,
logger: ConsoleLogger(prefix: 'Serinus New Logger'),
modelProvider: MyModelProvider(),
);
application.provide(TestProvider());
application.get('/', (RequestContext context) async {
return context.use<TestProvider>().counter;
});
await application.serve();
}
33 changes: 33 additions & 0 deletions packages/serinus/lib/src/adapters/serinus_http_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,39 @@ class SerinusHttpAdapter
headers[io.HttpHeaders.contentTypeHeader] = contentTypeValue;

final bodyData = body.data;
if (bodyData is Stream) {
headers[io.HttpHeaders.dateHeader] = _cachedDate;
response.toggleBuffering(false);
headers[io.HttpHeaders.contentTypeHeader] = contentTypeValue;
headers[io.HttpHeaders.transferEncodingHeader] = 'chunked';
headers['X-Content-Type-Options'] = 'nosniff';
headers['Cache-Control'] = 'no-cache, no-transform';
headers['Connection'] = 'keep-alive';
response.headers(headers, preserveHeaderCase: preserveHeaderCase);
try {
await for (final chunk in bodyData) {
if (chunk is List<int>) {
response.add(chunk);
} else if (chunk is String) {
response.write(chunk);
} else {
throw StateError('Unsupported stream data type: ${chunk.runtimeType}');
}
await response.flush();
}
} catch (e) {
// Handle errors during streaming
if (!response.isClosed) {
io.stderr.writeln('Stream error: $e');
}
} finally {
// Close the connection when the stream is done.
if (!response.isClosed) {
await response.flushAndClose();
}
}
return;
}
if (bodyData is io.File) {
final fileStat = await bodyData.stat();
final rawFileName = bodyData.uri.pathSegments.last;
Expand Down
4 changes: 4 additions & 0 deletions packages/serinus/lib/src/contexts/route_context.dart
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ class RouteContext<T extends RouteHandlerSpec> {
hooksContainer.resHooks,
);

/// The [streaming] property determines if the route is streaming or not.
final bool streaming;

/// The [RouteContext] constructor initializes the route context with the provided parameters.
RouteContext({
required this.id,
Expand All @@ -120,6 +123,7 @@ class RouteContext<T extends RouteHandlerSpec> {
required this.spec,
required this.moduleScope,
required this.hooksContainer,
this.streaming = false,
this.isStatic = false,
this.pipes = const [],
this.queryParameters = const {},
Expand Down
60 changes: 55 additions & 5 deletions packages/serinus/lib/src/core/controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,43 @@ import '../contexts/contexts.dart';
import '../http/http.dart';
import 'core.dart';

/// The [RestHandler] class is an abstract class that defines a REST handler. It takes a [RequestContext] and returns a response of type [T].
abstract class RestHandler<T, B> {

/// The handler function.
T call(RequestContext<B> context);

}

/// Shortcut for a request-response handler. It takes a [RequestContext] and returns a [Response].
class ReqResHandler<T, B> {
class ReqResHandler<T, B> extends RestHandler<Future<T>, B> {
/// The handler function.
final Future<T> Function(RequestContext<B> context) _handler;

/// Creates a request-response handler.
ReqResHandler(this._handler);

/// Calls the handler function.
@override
Future<T> call(RequestContext<B> context) {
return _handler(context);
}
}

/// Shortcut for a stream handler. It takes a [RequestContext] and returns a [Stream].
class StreamHandler<T, B> extends RestHandler<Stream<T>, B> {
final Stream<T> Function(RequestContext<B> context) _handler;

/// Creates a stream handler.
StreamHandler(this._handler);

/// Calls the handler function.
@override
Stream<T> call(RequestContext<B> context) {
return _handler(context);
}
}

typedef _RequestContextBuilder =
Future<RequestContext> Function({
required Request request,
Expand All @@ -47,21 +70,25 @@ abstract class RouteHandlerSpec<T> {
}

/// The [RestRouteHandlerSpec] class is used to define a REST route handler specification.
class RestRouteHandlerSpec<T, B> extends RouteHandlerSpec<ReqResHandler<T, B>> {
class RestRouteHandlerSpec<T, B> extends RouteHandlerSpec<RestHandler<T, B>> {
/// The [shouldValidateMultipart] property determines if multipart form data should be validated.
final bool shouldValidateMultipart;

/// The [isStatic] property determines if the route is static.
final bool isStatic;

/// The [streaming] property determines if the route is streaming.
final bool streaming;

late final _RequestContextBuilder _requestContextBuilder;

/// The [RestRouteHandlerSpec] constructor is used to create a new instance of the [RestRouteHandlerSpec] class.
RestRouteHandlerSpec(
Route route,
ReqResHandler<T, B> handler, {
RestHandler<T, B> handler, {
this.shouldValidateMultipart = false,
this.isStatic = false,
this.streaming = false,
}) : super(route, handler) {
_requestContextBuilder =
({
Expand Down Expand Up @@ -158,7 +185,7 @@ abstract class Controller {
);
}

_routes[UuidV4().generate()] = RestRouteHandlerSpec<T, B>(
_routes[UuidV4().generate()] = RestRouteHandlerSpec<Future<T>, B>(
route,
ReqResHandler<T, B>(handler),
shouldValidateMultipart: shouldValidateMultipart,
Expand All @@ -183,11 +210,34 @@ abstract class Controller {
);
}

_routes[UuidV4().generate()] = RestRouteHandlerSpec<T, dynamic>(
_routes[UuidV4().generate()] = RestRouteHandlerSpec<Future<T>, dynamic>(
route,
ReqResHandler<T, dynamic>((_) async => handler),
shouldValidateMultipart: false,
isStatic: true,
);
}

@mustCallSuper
void onStream<R, B>(
Route route,
Stream<R> Function(RequestContext<B> context) handler, {
bool shouldValidateMultipart = false,
}) {
final routeExists = _routes.values.any(
(r) => r.route.path == route.path && r.route.method == route.method,
);
if (routeExists) {
throw StateError(
'A route with the same path and method already exists. [${route.path}] [${route.method}]',
);
}

_routes[UuidV4().generate()] = RestRouteHandlerSpec<Stream<R>, B>(
route,
StreamHandler<R, B>(handler),
shouldValidateMultipart: shouldValidateMultipart,
streaming: true
);
}
Comment thread
francescovallone marked this conversation as resolved.
}
24 changes: 24 additions & 0 deletions packages/serinus/lib/src/http/internal_response.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ abstract class OutgoingMessage<T, THeaders> {
/// The [write] method is used to write data to the response without closing it.
void write(String data);

/// The [add] method is used to add data to the response without closing it.
void add(List<int> data);

/// The [flush] method is used to flush the response.
Future<void> flush();

/// The [toggleBuffering] method is used to enable or disable buffering of the response.
void toggleBuffering(bool enable);

/// The [cookies] property is used to get the cookies of the response.
List<Cookie> get cookies;

Expand Down Expand Up @@ -77,6 +86,11 @@ class InternalResponse extends OutgoingMessage<HttpResponse, HttpHeaders> {
return original.detachSocket(writeHeaders: writeHeaders);
}

@override
void toggleBuffering(bool enable) {
original.bufferOutput = enable;
}

@override
void send([List<int> data = const []]) {
original.add(data);
Expand All @@ -89,6 +103,16 @@ class InternalResponse extends OutgoingMessage<HttpResponse, HttpHeaders> {
original.write(data);
}

@override
void add(List<int> data) {
original.add(data);
}

@override
Future<void> flush() {
return original.flush();
}

@override
List<Cookie> get cookies => original.cookies;

Expand Down
12 changes: 8 additions & 4 deletions packages/serinus/lib/src/routes/route_execution_context.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class RouteExecutionContext {
});

/// The [describe] method is used to describe the route execution context.
/// It returns a [HandlerFunction] that can be used to handle the request and response.
/// It takes a [RouteContext] and optional parameters such as [errorHandler], [notFoundHandler], and [rawBody].
/// The [errorHandler] is used to handle errors that occur during the request processing.
/// The [rawBody] parameter indicates whether the body should be treated as raw binary data
Expand Down Expand Up @@ -78,7 +77,6 @@ class RouteExecutionContext {
rawBody: rawBody,
);
executionContext.attachHttpContext(requestContext);

for (int i = 0; i < context.reqHooks.length; i++) {
final hook = context.reqHooks[i];
await hook.onRequest(executionContext);
Expand Down Expand Up @@ -167,7 +165,13 @@ class RouteExecutionContext {
}
await _executeBeforeHandle(executionContext, context);
final handler = spec.handler;
final handlerResult = await handler.call(requestContext);
final handlerResult = switch (handler) {
ReqResHandler reqResHandler => await reqResHandler.call(requestContext),
StreamHandler streamHandler => streamHandler.call(requestContext),
_ => throw StateError(
'Unsupported REST handler type: ${handler.runtimeType}',
),
};
final responseData = WrappedResponse(handlerResult);
await _executeAfterHandle(executionContext, context, responseData);
await _executeOnResponse(context, executionContext, responseData);
Expand Down Expand Up @@ -340,7 +344,7 @@ class RouteExecutionContext {
ExecutionContext context,
) {
final data = result.data;
if (data == null) {
if (data == null || data is Stream) {
return result;
}

Expand Down
1 change: 1 addition & 0 deletions packages/serinus/lib/src/routes/routes_explorer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ final class RoutesExplorer {
method: routeMethod,
moduleToken: moduleToken,
isStatic: spec.isStatic,
streaming: spec.streaming,
spec: spec,
moduleScope: moduleScope,
hooksServices: mergedContainer.services,
Expand Down
38 changes: 38 additions & 0 deletions packages/serinus/test/core/controller_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,43 @@ void main() async {
expect(() => controller.onStatic(route, () => 'ok!'), throwsStateError);
},
);

test(
'when a stream route is added to a controller, then it should use a StreamHandler in RestRouteHandlerSpec',
() {
final controller = TestController();
final route = GetRoute(path: '/stream');

controller.onStream<String, dynamic>(
route,
(_) => Stream<String>.fromIterable(['a', 'b']),
);

final spec = controller.get(controller.routes.keys.first);
expect(spec, isNotNull);
expect(spec, isA<RestRouteHandlerSpec>());
expect(spec!.handler, isA<StreamHandler>());
},
);

test(
'when RestRouteHandlerSpec is created, then it should support both ReqResHandler and StreamHandler',
() {
final reqResSpec = RestRouteHandlerSpec<Future<String>, dynamic>(
GetRoute(path: '/req-res'),
ReqResHandler<String, dynamic>((_) async => 'ok'),
);
final streamSpec = RestRouteHandlerSpec<Stream<String>, dynamic>(
GetRoute(path: '/stream'),
StreamHandler<String, dynamic>(
(_) => Stream<String>.value('ok'),
),
streaming: true,
);

expect(reqResSpec.handler, isA<ReqResHandler>());
expect(streamSpec.handler, isA<StreamHandler>());
},
);
});
}
Loading