Skip to content

Commit 24212a6

Browse files
authored
Merge pull request #264 from siberiacancode/prototype/sse
prototype/sse 🧊 add sse + move file handing into data handler
2 parents 98f3540 + 6d1a953 commit 24212a6

19 files changed

Lines changed: 361 additions & 436 deletions

File tree

.husky/pre-commit

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
pnpm run -r lint-staged
1+
pnpm -r --workspace-concurrency=1 run lint-staged

packages/playground/src/core/createDatabaseRoutes/storages/File/FileStorage.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import { APP_PATH } from '@/shared/constants';
88
import { isIndex } from '../../helpers';
99
import { FileWriter } from './FileWriter';
1010

11-
export class FileStorage<
12-
Data extends Record<StorageIndex, any> = Record<StorageIndex, any>
13-
> implements Storage {
11+
export class FileStorage<Data extends Record<StorageIndex, any> = Record<StorageIndex, any>>
12+
implements Storage
13+
{
1414
private readonly fileWriter: FileWriter;
1515

1616
private readonly data: Data;

packages/playground/src/core/createDatabaseRoutes/storages/Memory/MemoryStorage.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ import type { Storage, StorageIndex } from '@/shared/types';
22

33
import { isIndex } from '../../helpers';
44

5-
export class MemoryStorage<
6-
Data extends Record<StorageIndex, any> = Record<StorageIndex, any>
7-
> implements Storage {
5+
export class MemoryStorage<Data extends Record<StorageIndex, any> = Record<StorageIndex, any>>
6+
implements Storage
7+
{
88
private readonly data: Data;
99

1010
public constructor(initialData: Data) {

packages/server/README.md

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -498,15 +498,9 @@ export default flatMockServerConfig;
498498

499499
> If the file path is absolute, then this path will be used as is. If the file path is relative, it will be appended to the current working directory.
500500
501-
If the file exists, response interceptors will receive `file descriptor` as the `data` argument:
501+
If the file exists, response interceptors will receive file content `Buffer` as the `data` argument.
502502

503-
`File descriptor` is an object with `path` and `file` fields that describe file location and file content.
504-
505-
- `path` `string` path to the file. Same as `file` passed in route
506-
- `file` `Buffer` file content as binary buffer
507-
508-
> Note to return file descriptor from interceptor. Server will send a buffer from `data.file` with corresponding `Content-Type` and `Content-Disposition` headers.
509-
> If you return invalid file descriptor, server will send it as json data.
503+
`Content-Type` and `Content-Disposition` headers are set before response interceptors are called.
510504

511505
```javascript
512506
/** @type {import('mock-config-server').FlatMockServerConfig} */
@@ -523,11 +517,14 @@ const flatMockServerConfig = [
523517
{
524518
file: "./settings.json",
525519
interceptors: {
526-
response: (data) => {
527-
const { file, path } = data;
528-
const buffer = file; // some logic with buffer
529-
fs.writeFileSync(path, buffer); // rewrite ./settings.json file on disk with new content
530-
return { path, file: buffer };
520+
response: (data, { setHeader, getResponseHeader }) => {
521+
// Content-Type and Content-Disposition are available before response interceptor runs.
522+
console.log("Content-Type:", getResponseHeader("Content-Type"));
523+
console.log("Content-Disposition:", getResponseHeader("Content-Disposition"));
524+
525+
const buffer = data;
526+
const updatedBuffer = Buffer.from(buffer.toString("utf-8").replace("name", "title"));
527+
return updatedBuffer;
531528
},
532529
},
533530
},
@@ -540,10 +537,6 @@ const flatMockServerConfig = [
540537
export default flatMockServerConfig;
541538
```
542539

543-
> Any changes to the data will not affect the file on disk unless you manually rewrite it.
544-
545-
> If you return a new `path` from interceptor, server will send file corresponding to this path or 404 error otherwise.
546-
547540
#### Polling
548541

549542
Routes support polling for data. To add polling for data, you must specify the `polling setting` and use `queue` property instead of `data` or `file`.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import fs from 'node:fs';
2+
import path from 'node:path';
3+
4+
import type { RestDataResponse, RestFileResponse, RestMethod } from '@/utils/types';
5+
6+
import { isFilePathValid } from '@/utils/helpers';
7+
8+
export const createFileHandler =
9+
<Method extends RestMethod>(filePath: RestFileResponse): RestDataResponse<Method> =>
10+
({ response, setHeader, setStatusCode }) => {
11+
if (!isFilePathValid(filePath)) {
12+
// TODO: what should we do?
13+
setStatusCode(404);
14+
response.send('Not Found');
15+
return null;
16+
}
17+
18+
const buffer = fs.readFileSync(path.resolve(filePath));
19+
const fileName = filePath.replaceAll('\\', '/').split('/').at(-1)!;
20+
const fileExtension = fileName.split('.').at(-1)!;
21+
22+
response.type(fileExtension);
23+
setHeader('Content-Disposition', `attachment; filename="${fileName}"`);
24+
25+
return buffer;
26+
};
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { describe, expect, it } from 'vitest';
2+
3+
import { formatSsePayload } from './formatSsePayload';
4+
5+
describe('formatSsePayload', () => {
6+
it('Should format payload without meta', () => {
7+
const payload = formatSsePayload('hello');
8+
9+
expect(payload).toBe('data: hello\n\n');
10+
});
11+
12+
it('Should include normalized meta fields before data', () => {
13+
const payload = formatSsePayload('message', {
14+
id: 'id\r\n123',
15+
event: 'user\ncreated\r',
16+
retry: 1500
17+
});
18+
19+
expect(payload).toBe('id: id123\nevent: usercreated\nretry: 1500\ndata: message\n\n');
20+
});
21+
22+
it('Should split multiline payload into multiple data lines', () => {
23+
const payload = formatSsePayload('line1\r\nline2\nline3\rline4');
24+
25+
expect(payload).toBe('data: line1\ndata: line2\ndata: line3\ndata: line4\n\n');
26+
});
27+
28+
it('Should throw if meta is invalid', () => {
29+
expect(() => formatSsePayload('message', { retry: -1 })).toThrow(
30+
'Invalid SSE meta: Number must be greater than or equal to 0'
31+
);
32+
});
33+
});
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import z from 'zod';
2+
3+
const sseMetaSchema = z
4+
.object({
5+
event: z.string().optional(),
6+
id: z.string().optional(),
7+
retry: z.number().int().nonnegative().optional()
8+
})
9+
.optional();
10+
11+
// ✅ important:
12+
// SSE is a line-based protocol. `id` and `event` must be single-line values.
13+
// Strip CR/LF so metadata cannot break frame structure.
14+
const normalizeSseMetaValue = (value: string) => value.replaceAll('\r', '').replaceAll('\n', '');
15+
16+
export const formatSsePayload = (
17+
data: string,
18+
meta?: { event?: string; id?: string; retry?: number }
19+
) => {
20+
const parseMetaResult = sseMetaSchema.safeParse(meta);
21+
if (!parseMetaResult.success) {
22+
throw new Error(`Invalid SSE meta: ${parseMetaResult.error.issues[0]?.message}`);
23+
}
24+
25+
const parsedMeta = parseMetaResult.data;
26+
const lines: string[] = [];
27+
28+
if (parsedMeta?.id) {
29+
lines.push(`id: ${normalizeSseMetaValue(parsedMeta.id)}`);
30+
}
31+
32+
if (parsedMeta?.event) {
33+
lines.push(`event: ${normalizeSseMetaValue(parsedMeta.event)}`);
34+
}
35+
36+
if (parsedMeta?.retry != null) {
37+
lines.push(`retry: ${parsedMeta.retry}`);
38+
}
39+
40+
// ✅ important:
41+
// Multiline payloads are encoded as multiple `data:` lines.
42+
// SSE clients concatenate consecutive `data:` lines with '\n' into one message.
43+
data.split(/\r\n|\r|\n/).forEach((line) => {
44+
lines.push(`data: ${line}`);
45+
});
46+
47+
return `${lines.join('\n')}\n\n`;
48+
};
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from './createFileHandler';
2+
export * from './formatSsePayload/formatSsePayload';
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './rest';

packages/server/src/core/functions/rest/rest.test.ts

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ describe('rest', () => {
6060
path: '/users',
6161
routes: [
6262
{
63-
file: '/tmp/user.json',
63+
data: expect.any(Function),
6464
entities: {
6565
headers: {
6666
key: 'value'
@@ -139,7 +139,7 @@ describe('rest', () => {
139139
queue: [
140140
{ data: queueHandler, time: 100 },
141141
{ data: { ok: 'response' }, time: 200 },
142-
{ file: '/tmp/user.json', time: 300 }
142+
{ data: expect.any(Function), time: 300 }
143143
],
144144
entities: {
145145
headers: {
@@ -191,20 +191,92 @@ describe('rest', () => {
191191
});
192192
});
193193

194+
it('Should build config for SSE request', () => {
195+
const result = rest.sse('/users/stream', () => undefined);
196+
197+
expect(result).toStrictEqual({
198+
method: 'get',
199+
path: '/users/stream',
200+
routes: [
201+
{
202+
data: expect.any(Function),
203+
settings: { polling: false }
204+
}
205+
]
206+
});
207+
});
208+
209+
it('Should build config for stream request', () => {
210+
const result = rest.stream('/users/stream', () => undefined);
211+
212+
expect(result).toStrictEqual({
213+
method: 'post',
214+
path: '/users/stream',
215+
routes: [
216+
{
217+
data: expect.any(Function),
218+
settings: { polling: false }
219+
}
220+
]
221+
});
222+
});
223+
224+
it('Should send SSE payload and close stream client', async () => {
225+
const result = rest.sse('/users/stream', ({ client }) => {
226+
client.send('hello');
227+
client.close();
228+
});
229+
230+
const [route] = result.routes as [{ data: (params: any) => unknown }];
231+
const routeData = route.data;
232+
const setHeader = vi.fn();
233+
const write = vi.fn();
234+
const end = vi.fn();
235+
236+
await routeData({
237+
setHeader,
238+
response: { write, end }
239+
});
240+
241+
expect(setHeader).toHaveBeenCalledTimes(3);
242+
expect(setHeader).toHaveBeenNthCalledWith(1, 'connection', 'keep-alive');
243+
expect(setHeader).toHaveBeenNthCalledWith(2, 'content-type', 'text/event-stream');
244+
expect(setHeader).toHaveBeenNthCalledWith(3, 'cache-control', 'no-cache');
245+
expect(write).toHaveBeenCalledWith('data: hello\n\n');
246+
expect(end).toHaveBeenCalledTimes(1);
247+
});
248+
249+
it('Should send SSE payload with meta fields', async () => {
250+
const result = rest.sse('/users/stream', ({ client }) => {
251+
client.send('msg', {
252+
id: 'id-1',
253+
event: 'user.created',
254+
retry: 1500
255+
});
256+
client.close();
257+
});
258+
259+
const [route] = result.routes as [{ data: (params: any) => unknown }];
260+
const routeData = route.data;
261+
const setHeader = vi.fn();
262+
const write = vi.fn();
263+
const end = vi.fn();
264+
265+
await routeData({
266+
setHeader,
267+
response: { write, end }
268+
});
269+
270+
expect(write).toHaveBeenCalledWith('id: id-1\nevent: user.created\nretry: 1500\ndata: msg\n\n');
271+
});
272+
194273
it('Should type handler params with all typed fields', () => {
195274
const result = rest.post<{
196275
query: { query: string };
197276
body: { body: string };
198277
params: { params: string };
199278
response: { response: string };
200-
}>('/users/:id', (params) => {
201-
const query = params.request.query.query;
202-
const body = params.request.body.body;
203-
const path = params.request.params.params;
204-
console.log(query, body, path);
205-
206-
return { response: 'value' };
207-
});
279+
}>('/users/:id', () => ({ response: 'value' }));
208280

209281
expect(result).toStrictEqual({
210282
method: 'post',

0 commit comments

Comments
 (0)