Skip to content

Commit 8259905

Browse files
committed
Updated graphql event stream functionality to watch for changes to
schema
1 parent 681a01c commit 8259905

8 files changed

+2593
-1398
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
build
2+
dumps
23
.vscode
34

5+
.DS_Store
6+
47
# Logs
58
logs
69
*.log

package.json

+7-3
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,26 @@
77
"license": "MIT",
88
"scripts": {
99
"build": "tsc",
10-
"dev": "nodemon src/index.ts"
10+
"dev": "nodemon src/index.ts",
11+
"dev:rs": "nodemon src/rs-index.ts --ignore src/schema/"
1112
},
1213
"dependencies": {
13-
"apollo-server-express": "^2.14.2",
14+
"apollo-server-express": "^2.19.1",
1415
"axios": "^0.18.1",
16+
"chokidar": "^3.5.0",
1517
"compression": "^1.7.4",
18+
"cors": "^2.8.5",
1619
"express": "^4.17.1",
1720
"graphql": "^14.2.0",
21+
"graphql-tools": "^7.0.2",
1822
"lodash": "^4.17.20"
1923
},
2024
"devDependencies": {
2125
"@types/compression": "^1.7.0",
2226
"@types/express-serve-static-core": "^4.17.7",
2327
"@types/graphql": "^14.2.0",
2428
"@types/lodash": "^4.14.123",
25-
"nodemon": "^1.18.10",
29+
"nodemon": "^2.0.7",
2630
"ts-node": "^8.0.3",
2731
"typescript": "3.4.1"
2832
},

src/index.ts

+5-3
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ import http from 'http';
22
import express, { RequestHandler } from 'express';
33
import { ApolloServer, ApolloError } from 'apollo-server-express';
44
import { typeDefs, resolvers } from './schema';
5-
import { graphqlEventStream } from './schema-observer';
65
const compression = require('compression');
6+
const cors = require('cors');
77

88
const server = new ApolloServer({
99
typeDefs,
@@ -23,17 +23,19 @@ const server = new ApolloServer({
2323
});
2424
const app = express();
2525
app.use(compression());
26+
app.use(cors());
2627
app.use((req, res, next) => {
2728
// console.log(req.headers);
2829
return next();
2930
});
30-
app.use(graphqlEventStream());
31+
// app.use(graphqlEventStream());
32+
3133
server.applyMiddleware({ app, path: '/graphql' });
3234
const httpServer = http.createServer(app);
3335
server.installSubscriptionHandlers(httpServer);
3436
app.all('/test', (req, res) => {
3537
// console.log(req.headers);
36-
res.send('Hi');
38+
res.send('');
3739
});
3840
const PORT = process.env.__PORT__ || 5400;
3941
httpServer.listen(PORT, () => {

src/rs-index.ts

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import http from 'http';
2+
import { EventEmitter } from 'events';
3+
import express, { RequestHandler, Express } from 'express';
4+
import { ApolloServer, ApolloError } from 'apollo-server-express';
5+
import { graphqlEventStream } from './schema-observer';
6+
import { execute } from 'graphql';
7+
import { makeExecutableSchema } from 'graphql-tools';
8+
import { createWatcher } from './watcher';
9+
10+
const compression = require('compression');
11+
const cors = require('cors');
12+
13+
const getSchema = () => {
14+
// First clear require cache for the module
15+
delete require.cache[require.resolve('./schema/rs-schema')];
16+
const { typeDefs, resolvers } = require('./schema/rs-schema');
17+
return makeExecutableSchema({
18+
typeDefs,
19+
resolvers,
20+
})
21+
};
22+
23+
const setupServer = async() => {
24+
const watcher = createWatcher();
25+
const emitter = new EventEmitter();
26+
27+
const server = new ApolloServer({
28+
subscriptions: false,
29+
gateway: {
30+
load: async() => {
31+
const schema = getSchema();
32+
return {
33+
schema,
34+
executor: args => {
35+
return execute({
36+
...args,
37+
schema,
38+
})
39+
}
40+
}
41+
},
42+
executor: execute,
43+
onSchemaChange: cb => {
44+
watcher.on('all', () => {
45+
console.log('updating schema..');
46+
const schema = getSchema();
47+
cb(schema);
48+
emitter.emit('schema:updated', schema);
49+
});
50+
51+
return () => watcher.close();
52+
},
53+
},
54+
context({ req }) {
55+
// throw new ApolloError('NOT_FOUND', '404');
56+
return { req };
57+
},
58+
formatError(error) {
59+
if (!process.env.NODE_ENV || process.env.NODE_ENV === 'development') {
60+
// logging the errors can help in development
61+
// tslint:disable-next-line
62+
console.log(error);
63+
}
64+
return error;
65+
}
66+
});
67+
const app = express();
68+
69+
// Compression doesn't work well with server side events
70+
// app.use(compression());
71+
app.use(cors());
72+
app.use((req, res, next) => {
73+
// console.log(req.headers);
74+
return next();
75+
});
76+
graphqlEventStream({
77+
app,
78+
emitter,
79+
streamPath: '/stream',
80+
});
81+
server.applyMiddleware({ app, path: '/graphql' });
82+
const httpServer = http.createServer(app);
83+
// subscriptions are not supported in apollo gateway
84+
// server.installSubscriptionHandlers(httpServer);
85+
86+
app.all('/test', (req, res) => {
87+
// console.log(req.headers);
88+
res.send('');
89+
});
90+
const PORT = process.env.__PORT__ || 5400;
91+
httpServer.listen(PORT, () => {
92+
// tslint:disable-next-line
93+
console.log(`Server running at http://localhost:${PORT}${server.graphqlPath}`);
94+
// tslint:disable-next-line
95+
// console.log(`Subscription running at ws://localhost:${PORT}${server.subscriptionsPath}`);
96+
});
97+
};
98+
99+
setupServer();

src/schema-observer.ts

+39-43
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,45 @@
1-
import { Request, Response } from 'express';
1+
import { Express } from 'express';
2+
import { EventEmitter } from 'events';
23

3-
export const graphqlEventStream = ({ streamPath = '/stream' } = {}) => {
4-
return (req: Request, res: Response) => {
5-
const sse = (str = '') => {
6-
res.write(str);
7-
res.flushHeaders();
8-
};
9-
const cleanup = () => {
10-
// tslint:disable-next-line
11-
console.log('Cleaning up...');
12-
res.end();
13-
};
14-
15-
// If current request is for the stream, setup the stream.
16-
if (req.path === streamPath) {
17-
if (req.headers.accept !== 'text/event-stream') {
18-
res.statusCode = 405;
19-
res.end();
20-
return;
21-
}
22-
23-
// tslint:disable-next-line
24-
console.log('stream requested.');
25-
// Making sure these options are set.
26-
req.socket.setTimeout(0);
27-
req.socket.setNoDelay(true);
28-
req.socket.setKeepAlive(true);
29-
30-
// Set headers for Server-Sent Events.
31-
res.statusCode = 200;
32-
res.setHeader('Content-Type', 'text/event-stream');
33-
res.setHeader('Cache-Control', 'no-cache');
34-
res.setHeader('Connection', 'keep-alive');
35-
36-
sse('event: open\n\n');
4+
export const graphqlEventStream = ({
5+
streamPath = '/stream',
6+
emitter,
7+
app,
8+
}: { app: Express, streamPath: string, emitter: EventEmitter }) => {
9+
app.use((req, res) => {
10+
res.setHeader('X-GraphQL-Event-Stream', streamPath);
11+
if (req.next) {
12+
req.next();
13+
}
14+
});
3715

38-
req.on('close', cleanup);
39-
req.on('finish', cleanup);
40-
req.on('error', cleanup);
16+
app.get(streamPath, (req, res) => {
17+
if (req.headers.accept !== 'text/event-stream') {
18+
res.statusCode = 405;
19+
res.end();
4120
return;
4221
}
4322

44-
res.setHeader('X-GraphQL-Event-Stream', streamPath);
45-
if (req.next) {
46-
return req.next();
47-
}
48-
};
23+
res.setHeader('Cache-Control', 'no-cache');
24+
res.setHeader('Content-Type', 'text/event-stream');
25+
res.setHeader('Connection', 'keep-alive');
26+
res.setHeader('Access-Control-Allow-Origin', '*');
27+
res.flushHeaders(); // flush the headers to establish SSE with client
28+
29+
console.log('connected to client.');
30+
let counter = 0;
31+
const updateClients = () => {
32+
counter++;
33+
console.log('sending update to client..');
34+
res.write(`data: ${JSON.stringify({num: counter})}\n\n`); // res.write() instead of res.send()
35+
};
36+
emitter.on('schema:updated', updateClients);
37+
38+
// If client closes connection, stop sending events
39+
res.on('close', () => {
40+
console.log('client dropped me.');
41+
emitter.off('schema:updated', updateClients);
42+
res.end();
43+
});
44+
});
4945
};

src/schema/rs-schema.ts

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { gql } from 'apollo-server-express';
2+
import { merge } from 'lodash';
3+
4+
import * as GOTCharacter from './GOTCharacter';
5+
import * as GOTBook from './GOTBook';
6+
import * as GOTHouse from './GOTHouse';
7+
import * as HNUser from './HNUser';
8+
import * as Message from './Message';
9+
10+
export const rootTypeDef = gql`
11+
type Query {
12+
hello: String
13+
bye: Boolean
14+
}
15+
type Mutation
16+
type Subscription
17+
`;
18+
export const rootResolvers = {
19+
Query: {
20+
hello: () => 'Hello world',
21+
bye: () => true,
22+
}
23+
};
24+
25+
export const typeDefs = [
26+
rootTypeDef,
27+
GOTCharacter.typeDef,
28+
GOTBook.typeDef,
29+
GOTHouse.typeDef,
30+
HNUser.typeDef,
31+
Message.typeDef,
32+
// for some reason file Upload type is causing problems
33+
// File.typeDef,
34+
];
35+
36+
export const resolvers = merge(
37+
rootResolvers,
38+
GOTCharacter.resolvers,
39+
GOTBook.resolvers,
40+
GOTHouse.resolvers,
41+
HNUser.resolvers,
42+
Message.resolvers,
43+
// File.resolvers,
44+
);

src/watcher.ts

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import path from 'path';
2+
import chokidar from 'chokidar';
3+
4+
export const createWatcher = () => {
5+
return chokidar.watch(path.resolve(__dirname, './schema'));
6+
};

0 commit comments

Comments
 (0)