Skip to content

Commit 015701d

Browse files
committed
add a separate module for ssr-render-cluster
1 parent 9c5d10e commit 015701d

File tree

20 files changed

+572
-575
lines changed

20 files changed

+572
-575
lines changed

modules/demo/ssr/graph/index.js

+13-1
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,22 @@
1515
// limitations under the License.
1616

1717
const fastify = require('fastify')();
18+
const fs = require('fs');
19+
const path = require('path');
20+
21+
// create `/data` directory if it does not exist
22+
const basePath = path.join(__dirname, 'data/');
23+
fs.access(basePath, fs.constants.F_OK, (err, _) => () => {
24+
if (!err) { fs.mkdir(basePath); }
25+
});
1826

1927
fastify //
2028
.register(require('./plugins/webrtc'), require('./plugins/graph')(fastify))
2129
.register(require('fastify-static'), {root: require('path').join(__dirname, 'public')})
22-
.get('/', (req, reply) => reply.sendFile('video.html'));
30+
.register(require('fastify-multipart'))
31+
.register(require('fastify-cors'), {})
32+
.register((require('fastify-arrow')))
33+
.register(require('./plugins/api'))
34+
.get('/', (req, reply) => reply.sendFile('video.html'))
2335

2436
fastify.listen(8080).then(() => console.log('server ready'));

modules/demo/ssr/graph/package.json

+9-3
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,24 @@
1717
"@rapidsai/cuda": "0.0.1",
1818
"@rapidsai/deck.gl": "0.0.1",
1919
"@rapidsai/jsdom": "0.0.1",
20+
"@rapidsai/ssr-render-cluster": "0.0.1",
2021
"fastify-plugin": "3.0.0",
2122
"fastify-socket.io": "2.0.0",
2223
"fastify-static": "4.4.1",
2324
"fastify": "3.20.2",
24-
"nanoid": "3.1.31",
25+
"fastify-multipart": "5.0.2",
26+
"fastify-cors": "6.0.2",
27+
"fastify-arrow": "0.1.0",
28+
"apache-arrow": "^4.0.0",
29+
"nanoid": "3.1.25",
2530
"rxjs": "6.6.7",
2631
"shm-typed-array": "0.0.13",
2732
"simple-peer": "9.11.0",
28-
"socket.io": "4.1.3"
33+
"socket.io": "4.1.3",
34+
"glob": "7.2.0",
35+
"sharp": "0.29.2"
2936
},
3037
"files": [
31-
"render",
3238
"public",
3339
"plugins",
3440
"index.js",
+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// Copyright (c) 2021, NVIDIA CORPORATION.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
const {graphs, clients} = require('../graph');
16+
const fs = require('fs')
17+
const util = require('util')
18+
const {pipeline} = require('stream')
19+
const pump = util.promisify(pipeline)
20+
const glob = require('glob');
21+
const {Graph} = require('@rapidsai/cugraph');
22+
const {loadEdges, loadNodes} = require('../graph/loader');
23+
const {RecordBatchStreamWriter} = require('apache-arrow');
24+
const path = require('path');
25+
const {readDataFrame, getNodesForGraph, getEdgesForGraph, getPaginatedRows} = require('./utils');
26+
27+
module.exports = function(fastify, opts, done) {
28+
fastify.addHook('preValidation', (request, reply, done) => {
29+
// handle upload validation after reading request.file() in the route function itself
30+
if (request.url == '/datasets/upload') {
31+
done();
32+
} else {
33+
request.query.id =
34+
(request.method == 'POST') ? `${request.body.id}:video` : `${request.query.id}:video`;
35+
if (request.query.id in fastify[clients]) {
36+
done();
37+
} else {
38+
reply.code(500).send('client handshake not established');
39+
}
40+
}
41+
});
42+
43+
async function renderGraph(id, data) {
44+
const asDeviceMemory = (buf) => new (buf[Symbol.species])(buf);
45+
const src = data.edges.dataframe.get(data.edges.src);
46+
const dst = data.edges.dataframe.get(data.edges.dst);
47+
const graph = Graph.fromEdgeList(src, dst, {directedEdges: true});
48+
fastify[graphs][id] = {
49+
refCount: 0,
50+
nodes: await getNodesForGraph(asDeviceMemory, data.nodes, graph.numNodes),
51+
edges: await getEdgesForGraph(asDeviceMemory, data.edges),
52+
graph: graph,
53+
};
54+
55+
++fastify[graphs][id].refCount;
56+
57+
return {
58+
gravity: 0.0,
59+
linLogMode: false,
60+
scalingRatio: 5.0,
61+
barnesHutTheta: 0.0,
62+
jitterTolerance: 0.05,
63+
strongGravityMode: false,
64+
outboundAttraction: false,
65+
graph: fastify[graphs][id].graph,
66+
nodes: {
67+
...fastify[graphs][id].nodes,
68+
length: fastify[graphs][id].graph.numNodes,
69+
},
70+
edges: {
71+
...fastify[graphs][id].edges,
72+
length: fastify[graphs][id].graph.numEdges,
73+
},
74+
};
75+
}
76+
77+
fastify.post('/datasets/upload', async function(req, reply) {
78+
const data = await req.file();
79+
const id = `${data.fields.id.value}:video`;
80+
if (id in fastify[clients]) {
81+
const basePath = `${__dirname}/../../data/`;
82+
const filepath = path.join(basePath, data.filename);
83+
const target = fs.createWriteStream(filepath);
84+
try {
85+
await pump(data.file, target);
86+
} catch (err) { console.log(err); }
87+
reply.send();
88+
} else {
89+
reply.code(500).send('client handshake not established');
90+
}
91+
});
92+
93+
fastify.get('/datasets', async (request, reply) => {
94+
glob(`*.{csv,parquet}`,
95+
{cwd: `${__dirname}/../../data/`},
96+
(er, files) => { reply.send(JSON.stringify(files.concat(['defaultExample']))); });
97+
});
98+
99+
fastify.post('/dataframe/load', async (request, reply) => {
100+
const filePath = `${__dirname}/../../data/`
101+
if (fs.existsSync(`${filePath}${request.body.nodes}`) &&
102+
fs.existsSync(`${filePath}${request.body.edges}`)) {
103+
fastify[clients][request.query.id].data.nodes.dataframe =
104+
await readDataFrame(`${filePath}${request.body.nodes}`);
105+
106+
fastify[clients][request.query.id].data.edges.dataframe =
107+
await readDataFrame(`${filePath}${request.body.edges}`);
108+
}
109+
else {
110+
fastify[clients][request.query.id].data.nodes.dataframe = await loadNodes();
111+
fastify[clients][request.query.id].data.edges.dataframe = await loadEdges();
112+
}
113+
if (fastify[clients][request.query.id].data.nodes.dataframe.numRows == 0) {
114+
reply.code(500).send('no dataframe loaded');
115+
}
116+
reply.send(JSON.stringify({
117+
'nodes': fastify[clients][request.query.id].data.nodes.dataframe.numRows,
118+
'edges': fastify[clients][request.query.id].data.edges.dataframe.numRows
119+
}));
120+
})
121+
122+
fastify.get('/dataframe/columnNames/read', async (request, reply) => {
123+
reply.send(JSON.stringify({
124+
nodesParams: fastify[clients][request.query.id].data.nodes.dataframe.names.concat([null]),
125+
edgesParams: fastify[clients][request.query.id].data.edges.dataframe.names.concat([null])
126+
}));
127+
});
128+
129+
fastify.post('/dataframe/columnNames/update', async (request, reply) => {
130+
try {
131+
Object.assign(fastify[clients][request.query.id].data.nodes, request.body.nodes);
132+
Object.assign(fastify[clients][request.query.id].data.edges, request.body.edges);
133+
reply.code(200).send('successfully updated columnNames');
134+
} catch (err) { reply.code(500).send(err); }
135+
});
136+
137+
fastify.post('/graph/render', async (request, reply) => {
138+
try {
139+
fastify[clients][request.query.id].graph =
140+
await renderGraph('default', fastify[clients][request.query.id].data);
141+
reply.code(200).send('successfully rendered graph');
142+
} catch (err) { reply.code(500).send(err); }
143+
})
144+
145+
fastify.get('/dataframe/read', async (request, reply) => {
146+
try {
147+
const pageIndex = parseInt(request.query.pageIndex);
148+
const pageSize = parseInt(request.query.pageSize);
149+
const dataframe = request.query.dataframe; //{'nodes', 'edges'}
150+
const [arrowTable, numRows] =
151+
await getPaginatedRows(fastify[clients][request.query.id].data[dataframe].dataframe,
152+
pageIndex,
153+
pageSize,
154+
fastify[clients][request.query.id].state.selectedInfo[dataframe]);
155+
156+
arrowTable.schema.metadata.set('numRows', numRows);
157+
RecordBatchStreamWriter.writeAll(arrowTable).pipe(reply.stream());
158+
} catch (err) {
159+
request.log.error({err}, '/run_query error');
160+
reply.code(500).send(err);
161+
}
162+
});
163+
164+
done();
165+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
const {DataFrame, Series, Int32, Uint8, Uint32, Uint64} = require('@rapidsai/cudf');
2+
const {Float32Buffer} = require('@rapidsai/cuda');
3+
4+
function readDataFrame(path) {
5+
if (path.indexOf('.csv', path.length - 4) !== -1) {
6+
// csv file
7+
return DataFrame.readCSV({sources: [path], header: 0, sourceType: 'files'});
8+
} else if (path.indexOf('.parquet', path.length - 8) !== -1) {
9+
// csv file
10+
return DataFrame.readParquet({sources: [path]});
11+
}
12+
// if (df.names.includes('Unnamed: 0')) { df = df.cast({'Unnamed: 0': new Uint32}); }
13+
return new DataFrame({});
14+
}
15+
16+
async function getNodesForGraph(asDeviceMemory, nodes, numNodes) {
17+
let nodesRes = {};
18+
const pos = new Float32Buffer(Array.from(
19+
{length: numNodes * 2},
20+
() => Math.random() * 1000 * (Math.random() < 0.5 ? -1 : 1),
21+
));
22+
23+
if (nodes.x in nodes.dataframe.names) {
24+
nodesRes.nodeXPositions = asDeviceMemory(nodes.dataframe.get(node.x).data);
25+
} else {
26+
nodesRes.nodeXPositions = pos.subarray(0, pos.length / 2);
27+
}
28+
if (nodes.y in nodes.dataframe.names) {
29+
nodesRes.nodeYPositions = asDeviceMemory(nodes.dataframe.get(node.y).data);
30+
} else {
31+
nodesRes.nodeYPositions = pos.subarray(pos.length / 2);
32+
}
33+
if (nodes.dataframe.names.includes(nodes.size)) {
34+
nodesRes.nodeRadius = asDeviceMemory(nodes.dataframe.get(nodes.size).cast(new Uint8).data);
35+
}
36+
if (nodes.dataframe.names.includes(nodes.color)) {
37+
nodesRes.nodeFillColors =
38+
asDeviceMemory(nodes.dataframe.get(nodes.color).cast(new Uint32).data);
39+
}
40+
if (nodes.dataframe.names.includes(nodes.id)) {
41+
nodesRes.nodeElementIndices =
42+
asDeviceMemory(nodes.dataframe.get(nodes.id).cast(new Uint32).data);
43+
}
44+
return nodesRes;
45+
}
46+
47+
async function getEdgesForGraph(asDeviceMemory, edges) {
48+
let edgesRes = {};
49+
50+
if (edges.dataframe.names.includes(edges.color)) {
51+
edgesRes.edgeColors = asDeviceMemory(edges.dataframe.get(edges.color).data);
52+
} else {
53+
edgesRes.edgeColors = asDeviceMemory(
54+
Series
55+
.sequence(
56+
{type: new Uint64, size: edges.dataframe.numRows, init: 18443486512814075489n, step: 0})
57+
.data);
58+
}
59+
if (edges.dataframe.names.includes(edges.id)) {
60+
edgesRes.edgeList = asDeviceMemory(edges.dataframe.get(edges.id).cast(new Uint64).data);
61+
}
62+
if (edges.dataframe.names.includes(edges.bundle)) {
63+
edgesRes.edgeBundles = asDeviceMemory(edges.dataframe.get(edges.bundle).data);
64+
}
65+
return edgesRes;
66+
}
67+
68+
async function getPaginatedRows(df, pageIndex = 0, pageSize = 400, selected = []) {
69+
if (selected.length != 0) {
70+
const selectedSeries = Series.new({type: new Int32, data: selected}).unique(true);
71+
const updatedDF = df.gather(selectedSeries);
72+
const idxs = Series.sequence({
73+
type: new Int32,
74+
init: (pageIndex - 1) * pageSize,
75+
size: Math.min(pageSize, updatedDF.numRows),
76+
step: 1
77+
});
78+
return [updatedDF.gather(idxs).toArrow(), updatedDF.numRows];
79+
} else {
80+
const idxs = Series.sequence({
81+
type: new Int32,
82+
init: (pageIndex - 1) * pageSize,
83+
size: Math.min(pageSize, df.numRows),
84+
step: 1
85+
});
86+
return [df.gather(idxs).toArrow(), df.numRows];
87+
}
88+
}
89+
90+
module.exports = {
91+
readDataFrame,
92+
getNodesForGraph,
93+
getEdgesForGraph,
94+
getPaginatedRows
95+
}

0 commit comments

Comments
 (0)