Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion .website/controllers.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,25 @@ class UserController extends Controller {
}
```

As you can see the `onStatic` method takes the same parameters as the `on` method, but the handler **is** the value that will be sent to the client.
As you can see the `onStatic` method takes the same parameters as the `on` method, but the handler **is** the value that will be sent to the client.

## Stream Routes

You can define stream routes using the `onStream` method. Stream routes are routes that return a stream of data to the client.

```dart
import 'package:serinus/serinus.dart';

class UserController extends Controller {
UserController(): super('/users') {
onStream(Route.get('/stream'), streamUsers);
}

Stream<User> streamUsers(RequestContext context) async* {
final users = await context.use<UsersService>().getUsers();
for (final user in users) {
yield user;
}
}
}
```
33 changes: 27 additions & 6 deletions packages/serinus/bin/serinus.dart
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ class AppController extends Controller {
final id = context.paramAs<int?>('id');
return {'message': 'Data for id: $id'};
});
onStream(Route.get('/stream'), (RequestContext context) async* {
for (int i = 0; i < 5; i++) {
yield 'Stream message $i';
await Future.delayed(Duration(seconds: 1));
}
});
}
}

Expand All @@ -89,7 +95,7 @@ class AppModule extends Module {
: super(
imports: [Test2Module(), TestModule()],
providers: [Provider.forValue('AppModuleValue', name: 'appValue')],
controllers: [AppController()],
controllers: [AppController(), UserController()],
exports: [],
);
}
Expand Down Expand Up @@ -122,16 +128,31 @@ class MyModelProvider extends ModelProvider {
};
}

class UserController extends Controller {
UserController(): super('/users') {
onStream(Route.get('/stream'), streamUsers);
}

Stream<MyObject> streamUsers(RequestContext context) async* {
final users = [
MyObject('Alice', 30),
MyObject('Bob', 25),
MyObject('Charlie', 35),
];
for (final user in users) {
yield user;
await Future.delayed(Duration(seconds: 1));
}
}
}

void main(List<String> arguments) async {
final application = await serinus.createMinimalApplication(
final application = await serinus.createApplication(
entrypoint: AppModule(),
host: InternetAddress.anyIPv4.address,
port: 3002,
logger: ConsoleLogger(prefix: 'Serinus New Logger'),
modelProvider: MyModelProvider(),
);
application.provide(TestProvider());
application.get('/', (RequestContext context) async {
return context.use<TestProvider>().counter;
});
await application.serve();
}
33 changes: 33 additions & 0 deletions packages/serinus/lib/src/adapters/serinus_http_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,39 @@ class SerinusHttpAdapter
headers[io.HttpHeaders.contentTypeHeader] = contentTypeValue;

final bodyData = body.data;
if (bodyData is Stream) {
headers[io.HttpHeaders.dateHeader] = _cachedDate;
response.toggleBuffering(false);
headers[io.HttpHeaders.contentTypeHeader] = contentTypeValue;
headers[io.HttpHeaders.transferEncodingHeader] = 'chunked';
headers['X-Content-Type-Options'] = 'nosniff';
headers['Cache-Control'] = 'no-cache, no-transform';
headers['Connection'] = 'keep-alive';
response.headers(headers, preserveHeaderCase: preserveHeaderCase);
try {
await for (final chunk in bodyData) {
if (chunk is List<int>) {
response.add(chunk);
} else if (chunk is String) {
response.write(chunk);
} else {
throw StateError('Unsupported stream data type: ${chunk.runtimeType}');
}
await response.flush();
}
} catch (e) {
// Handle errors during streaming
if (!response.isClosed) {
io.stderr.writeln('Stream error: $e');
}
} finally {
// Close the connection when the stream is done.
if (!response.isClosed) {
await response.flushAndClose();
}
}
return;
}
if (bodyData is io.File) {
final fileStat = await bodyData.stat();
final rawFileName = bodyData.uri.pathSegments.last;
Expand Down
4 changes: 4 additions & 0 deletions packages/serinus/lib/src/contexts/route_context.dart
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ class RouteContext<T extends RouteHandlerSpec> {
hooksContainer.resHooks,
);

/// The [streaming] property determines if the route is streaming or not.
final bool streaming;

/// The [RouteContext] constructor initializes the route context with the provided parameters.
RouteContext({
required this.id,
Expand All @@ -120,6 +123,7 @@ class RouteContext<T extends RouteHandlerSpec> {
required this.spec,
required this.moduleScope,
required this.hooksContainer,
this.streaming = false,
this.isStatic = false,
this.pipes = const [],
this.queryParameters = const {},
Expand Down
83 changes: 64 additions & 19 deletions packages/serinus/lib/src/core/controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,43 @@ import '../contexts/contexts.dart';
import '../http/http.dart';
import 'core.dart';

/// The [RestHandler] class is an abstract class that defines a REST handler. It takes a [RequestContext] and returns a response of type [T].
abstract class RestHandler<T, B> {

/// The handler function.
T call(RequestContext<B> context);

}

/// Shortcut for a request-response handler. It takes a [RequestContext] and returns a [Response].
class ReqResHandler<T, B> {
class ReqResHandler<T, B> extends RestHandler<Future<T>, B> {
/// The handler function.
final Future<T> Function(RequestContext<B> context) _handler;

/// Creates a request-response handler.
ReqResHandler(this._handler);

/// Calls the handler function.
@override
Future<T> call(RequestContext<B> context) {
return _handler(context);
}
}

/// Shortcut for a stream handler. It takes a [RequestContext] and returns a [Stream].
class StreamHandler<T, B> extends RestHandler<Stream<T>, B> {
final Stream<T> Function(RequestContext<B> context) _handler;

/// Creates a stream handler.
StreamHandler(this._handler);

/// Calls the handler function.
@override
Stream<T> call(RequestContext<B> context) {
return _handler(context);
}
}

typedef _RequestContextBuilder =
Future<RequestContext> Function({
required Request request,
Expand All @@ -47,21 +70,25 @@ abstract class RouteHandlerSpec<T> {
}

/// The [RestRouteHandlerSpec] class is used to define a REST route handler specification.
class RestRouteHandlerSpec<T, B> extends RouteHandlerSpec<ReqResHandler<T, B>> {
class RestRouteHandlerSpec<T, B> extends RouteHandlerSpec<RestHandler<T, B>> {
/// The [shouldValidateMultipart] property determines if multipart form data should be validated.
final bool shouldValidateMultipart;

/// The [isStatic] property determines if the route is static.
final bool isStatic;

/// The [streaming] property determines if the route is streaming.
final bool streaming;

late final _RequestContextBuilder _requestContextBuilder;

/// The [RestRouteHandlerSpec] constructor is used to create a new instance of the [RestRouteHandlerSpec] class.
RestRouteHandlerSpec(
Route route,
ReqResHandler<T, B> handler, {
RestHandler<T, B> handler, {
this.shouldValidateMultipart = false,
this.isStatic = false,
this.streaming = false,
}) : super(route, handler) {
_requestContextBuilder =
({
Expand Down Expand Up @@ -149,16 +176,9 @@ abstract class Controller {
Future<T> Function(RequestContext<B> context) handler, {
bool shouldValidateMultipart = false,
}) {
final routeExists = _routes.values.any(
(r) => r.route.path == route.path && r.route.method == route.method,
);
if (routeExists) {
throw StateError(
'A route with the same path and method already exists. [${route.path}] [${route.method}]',
);
}
_ensureRouteDoesNotExist(route);

_routes[UuidV4().generate()] = RestRouteHandlerSpec<T, B>(
_routes[UuidV4().generate()] = RestRouteHandlerSpec<Future<T>, B>(
route,
ReqResHandler<T, B>(handler),
shouldValidateMultipart: shouldValidateMultipart,
Expand All @@ -174,6 +194,37 @@ abstract class Controller {
if (handler is Function) {
throw StateError('The handler must be a static value');
}
_ensureRouteDoesNotExist(route);

_routes[UuidV4().generate()] = RestRouteHandlerSpec<Future<T>, dynamic>(
route,
ReqResHandler<T, dynamic>((_) async => handler),
shouldValidateMultipart: false,
isStatic: true,
);
}

/// The [onStream] method is used to register a streaming route.
/// It takes a [Route] and a [Stream] handler function.
/// The handler function receives a [RequestContext] and returns a [Stream] of type [R].
/// It should not be overridden.
@mustCallSuper
void onStream<R, B>(
Route route,
Stream<R> Function(RequestContext<B> context) handler, {
bool shouldValidateMultipart = false,
}) {
_ensureRouteDoesNotExist(route);

_routes[UuidV4().generate()] = RestRouteHandlerSpec<Stream<R>, B>(
route,
StreamHandler<R, B>(handler),
shouldValidateMultipart: shouldValidateMultipart,
streaming: true
);
}

void _ensureRouteDoesNotExist(Route route) {
final routeExists = _routes.values.any(
(r) => r.route.path == route.path && r.route.method == route.method,
);
Expand All @@ -182,12 +233,6 @@ abstract class Controller {
'A route with the same path and method already exists. [${route.path}] [${route.method}]',
);
}

_routes[UuidV4().generate()] = RestRouteHandlerSpec<T, dynamic>(
route,
ReqResHandler<T, dynamic>((_) async => handler),
shouldValidateMultipart: false,
isStatic: true,
);
}
Comment thread
francescovallone marked this conversation as resolved.

}
24 changes: 24 additions & 0 deletions packages/serinus/lib/src/http/internal_response.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ abstract class OutgoingMessage<T, THeaders> {
/// The [write] method is used to write data to the response without closing it.
void write(String data);

/// The [add] method is used to add data to the response without closing it.
void add(List<int> data);

/// The [flush] method is used to flush the response.
Future<void> flush();

/// The [toggleBuffering] method is used to enable or disable buffering of the response.
void toggleBuffering(bool enable);

/// The [cookies] property is used to get the cookies of the response.
List<Cookie> get cookies;

Expand Down Expand Up @@ -77,6 +86,11 @@ class InternalResponse extends OutgoingMessage<HttpResponse, HttpHeaders> {
return original.detachSocket(writeHeaders: writeHeaders);
}

@override
void toggleBuffering(bool enable) {
original.bufferOutput = enable;
}

@override
void send([List<int> data = const []]) {
original.add(data);
Expand All @@ -89,6 +103,16 @@ class InternalResponse extends OutgoingMessage<HttpResponse, HttpHeaders> {
original.write(data);
}

@override
void add(List<int> data) {
original.add(data);
}

@override
Future<void> flush() {
return original.flush();
}

@override
List<Cookie> get cookies => original.cookies;

Expand Down
Loading
Loading