Skip to content

Commit 19a2519

Browse files
committed
add server side event support to UI
1 parent e38f309 commit 19a2519

File tree

6 files changed

+85
-9
lines changed

6 files changed

+85
-9
lines changed

ui/src/pages/Data.tsx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import moment from 'moment';
77

88
import { ApplicationState } from '../store';
99
import { Data } from '../client';
10-
import { fetchRequest } from '../store/data/actions';
10+
import { fetchRequest, startSubscription, endSubscription } from '../store/data/actions';
1111

1212
interface RouteInfo {
1313
key: string;
@@ -27,6 +27,10 @@ export const DataPage: React.FC<MainProps> = props => {
2727

2828
useEffect(() => {
2929
dispatch(fetchRequest(params.key));
30+
dispatch(startSubscription(params.key));
31+
return () => {
32+
dispatch(endSubscription(params.key));
33+
};
3034
}, [dispatch]);
3135

3236
// TODO tjt: move data formatting to store

ui/src/store/data/actions.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import { action } from 'typesafe-actions';
22
import { DataActionTypes } from './types';
3-
import { Stream } from '../../client';
3+
import { Data } from '../../client';
44

55
export const fetchRequest = (key: string) => action(DataActionTypes.FETCH_REQUEST, key);
6-
export const fetchSuccess = (data: Stream[]) => action(DataActionTypes.FETCH_SUCCESS, data);
6+
export const fetchSuccess = (data: Data[]) => action(DataActionTypes.FETCH_SUCCESS, data);
77
export const fetchError = (message: string) => action(DataActionTypes.FETCH_ERROR, message);
8+
export const startSubscription = (key: string) => action(DataActionTypes.START_SUBSCRIPTION, key);
9+
export const endSubscription = (key: string) => action(DataActionTypes.END_SUBSCRIPTION, key);
10+
export const newMessage = (data: Data) => action(DataActionTypes.NEW_MESSAGE, data);

ui/src/store/data/reducer.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,23 @@ export const initialState: DataState = {
1111
const reducer: Reducer<DataState> = (state = initialState, action) => {
1212
switch (action.type) {
1313
case DataActionTypes.FETCH_REQUEST: {
14-
return { ...state, loading: true, key: action.key };
14+
return { ...state, loading: true, key: action.payload };
1515
}
1616
case DataActionTypes.FETCH_SUCCESS: {
1717
return { ...state, loading: false, data: action.payload };
1818
}
1919
case DataActionTypes.FETCH_ERROR: {
2020
return { ...state, loading: false, errors: action.payload };
2121
}
22+
case DataActionTypes.NEW_MESSAGE: {
23+
return { ...state, data: [action.payload, ...state.data] };
24+
}
25+
case DataActionTypes.START_SUBSCRIPTION: {
26+
return state;
27+
}
28+
case DataActionTypes.END_SUBSCRIPTION: {
29+
return state;
30+
}
2231
default: {
2332
return state;
2433
}

ui/src/store/data/sagas.ts

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import { all, call, fork, put, takeLatest } from 'redux-saga/effects';
1+
import { all, call, fork, put, takeEvery, take, cancel } from 'redux-saga/effects';
2+
import { eventChannel, EventChannel } from 'redux-saga';
23

34
import { DataActionTypes } from './types';
4-
import { fetchError, fetchSuccess, fetchRequest } from './actions';
5+
import { fetchError, fetchSuccess, fetchRequest, newMessage } from './actions';
56
import { DataApi, Configuration } from '../../client';
7+
import { debug } from '../../utils/logger';
68

79
const api = new DataApi(new Configuration({ basePath: '/api/v1' }));
810

@@ -19,12 +21,60 @@ function* handleFetch({ payload }: ReturnType<typeof fetchRequest>) {
1921
}
2022
}
2123

24+
function createSubscription(key: string) {
25+
return new EventSource(`/api/v1/streams/${key}/subscribe`);
26+
}
27+
28+
function createSubscriptionChannel(subscription: EventSource) {
29+
return eventChannel(emit => {
30+
subscription.onmessage = event => emit(event.data);
31+
return () => {
32+
return subscription.close();
33+
};
34+
});
35+
}
36+
37+
function* handleMessages(channel: EventChannel<unknown>, key: string) {
38+
try {
39+
while (true) {
40+
const message = yield take(channel);
41+
try {
42+
const data = {
43+
payload: JSON.parse(message),
44+
created: new Date().toISOString(),
45+
updated: new Date().toISOString()
46+
};
47+
debug(`/streams/${key}/subscription ->`, message);
48+
yield put(newMessage(data));
49+
} catch (err) {
50+
debug(`/streams/${key}/subscription ->`, message);
51+
}
52+
}
53+
} finally {
54+
debug(`/streams/${key}/subscription -> closing`);
55+
channel.close();
56+
}
57+
}
58+
59+
function* watchMessages() {
60+
while (true) {
61+
const { payload: key } = yield take(DataActionTypes.START_SUBSCRIPTION);
62+
const subscription = yield call(createSubscription, key);
63+
64+
const channel = yield call(createSubscriptionChannel, subscription);
65+
const handleMessagesTask = yield fork(handleMessages, channel, key);
66+
67+
yield take(DataActionTypes.END_SUBSCRIPTION);
68+
yield cancel(handleMessagesTask);
69+
}
70+
}
71+
2272
function* watchFetchRequest() {
23-
yield takeLatest(DataActionTypes.FETCH_REQUEST, handleFetch);
73+
yield takeEvery(DataActionTypes.FETCH_REQUEST, handleFetch);
2474
}
2575

2676
function* dataSaga() {
27-
yield all([fork(watchFetchRequest)]);
77+
yield all([fork(watchFetchRequest), fork(watchMessages)]);
2878
}
2979

3080
export default dataSaga;

ui/src/store/data/types.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ import { Data } from '../../client';
33
export enum DataActionTypes {
44
FETCH_REQUEST = '@@data/FETCH_REQUEST',
55
FETCH_SUCCESS = '@@data/FETCH_SUCCESS',
6-
FETCH_ERROR = '@@data/FETCH_ERROR'
6+
FETCH_ERROR = '@@data/FETCH_ERROR',
7+
NEW_MESSAGE = '@@data/NEW_MESSAGE',
8+
START_SUBSCRIPTION = '@@data/START_SUBSCRIPTION',
9+
END_SUBSCRIPTION = '@@data/END_SUBSCRIPTION'
710
}
811

912
export interface DataState {

ui/src/utils/logger.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export function log(...args: any[]) {
2+
console.log('[dlphn]', ...args);
3+
}
4+
5+
export function debug(...args: any[]) {
6+
console.debug('[dlphn]', ...args);
7+
}

0 commit comments

Comments
 (0)