Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added uploading dataset and drag & drop support #317

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
d126f34
added uploading dataset and drag & drop support
AjayThorve Oct 18, 2021
0a0616e
fix client-server bug
AjayThorve Oct 18, 2021
19c189a
added server side pagination API
AjayThorve Oct 19, 2021
e0a39c0
create dir if not exists
AjayThorve Oct 19, 2021
69d5ce9
Merge branch 'main' into demo/viz-app/improvements
AjayThorve Oct 21, 2021
f0fa54d
added redbird node based reverse-proxying
AjayThorve Oct 22, 2021
90e5eb3
apply suggestions
AjayThorve Oct 22, 2021
6a919aa
update readme instructions
AjayThorve Oct 22, 2021
ce3cdaa
typo
AjayThorve Oct 22, 2021
5dc3eda
change front-end app port
AjayThorve Oct 22, 2021
59d279a
add missing license text
AjayThorve Oct 22, 2021
8058756
update rest api routes as per suggestions
AjayThorve Oct 27, 2021
df6f994
fix route name
AjayThorve Oct 27, 2021
0625236
removed unnecessary condition
AjayThorve Oct 27, 2021
f7f0d74
bug fixes and improvements
AjayThorve Oct 28, 2021
8ba7977
sending arrow tables to the client directly
AjayThorve Oct 28, 2021
0bac16f
update route
AjayThorve Oct 28, 2021
e28eb5a
typo fix route
AjayThorve Oct 28, 2021
f589db7
update apache-arrow version
AjayThorve Oct 28, 2021
6420370
revert apache arrow version update
AjayThorve Oct 28, 2021
cba5b22
add apache-arrow based streaming
AjayThorve Oct 29, 2021
5881444
add updated viz to html front-end as well
AjayThorve Oct 29, 2021
5c504ea
handle cases where numRows is less than pageIndex*pageSize
AjayThorve Oct 29, 2021
8174c67
Update modules/demo/viz-app/pages/demo/graph.jsx
AjayThorve Nov 11, 2021
765e61e
Merge branch 'main' into demo/viz-app/improvements
AjayThorve Jan 19, 2022
eb95097
Merge branch 'main' of github.com:rapidsai/node into demo/viz-app/imp…
trxcllnt Jan 20, 2022
bd535a5
update math.gl version
trxcllnt Jan 20, 2022
b7fde93
Merge branch 'main' of github.com:rapidsai/node into demo/viz-app/imp…
trxcllnt Jan 24, 2022
b4dc58a
Merge branch 'main' of github.com:rapidsai/node into demo/viz-app/imp…
trxcllnt Jan 27, 2022
5718e16
add missing parameters
trxcllnt Jan 27, 2022
47de288
add missing const and script type
trxcllnt Jan 27, 2022
48de9ca
fix graph viz column dtypes
trxcllnt Jan 27, 2022
b59926c
Update yarn.lock
trxcllnt Jan 27, 2022
b6a27f3
separate makeDeck from the render class, and generalize the render mo…
AjayThorve Feb 4, 2022
e3ec1a0
Merge branch 'main' into demo/viz-app/improvements
AjayThorve Feb 4, 2022
27bb83f
Merge branch 'main' into demo/viz-app/improvements
AjayThorve Feb 28, 2022
e23304f
fix graph api
AjayThorve Feb 28, 2022
9ca9a98
update forceatlas2 calculations
AjayThorve Feb 28, 2022
d3be34c
fix positions
AjayThorve Feb 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions modules/demo/client-server/components/server/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ readMortgageData() {
sources: [path.resolve('./public', 'data/mortgage.csv')],
dataTypes: {
index: new Int16,
zip: new Int32,
zip: new Uint32,
dti: new Float32,
current_actual_upb: new Float32,
borrower_credit_score: new Int16,
load_id: new Int32,
load_id: new Uint32,
delinquency_12_prediction: new Float32,
seller_name: new Int16
}
Expand Down
2 changes: 1 addition & 1 deletion modules/demo/graph/src/app.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ function getNodeLabels({ x, y, coordinate, nodeId, props, layer }) {
return labels;
}

function getEdgeLabels({ x, y, coordinate, edgeId, props, layer }) {
function getEdgeLabels({ x, y, coordinate, edgeId, props, layer, sourceNodeId, targetNodeId }) {
let size = 14;
const color = [255, 255, 255];
const labels = [{
Expand Down
14 changes: 13 additions & 1 deletion modules/demo/ssr/graph/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,22 @@
// limitations under the License.

const fastify = require('fastify')();
const fs = require('fs');
const path = require('path');

// create `/data` directory if it does not exist
const basePath = path.join(__dirname, 'data/');
fs.access(basePath, fs.constants.F_OK, (err, _) => () => {
if (!err) { fs.mkdir(basePath); }
});

fastify //
.register(require('./plugins/webrtc'), require('./plugins/graph')(fastify))
.register(require('fastify-static'), {root: require('path').join(__dirname, 'public')})
.get('/', (req, reply) => reply.sendFile('video.html'));
.register(require('fastify-multipart'))
.register(require('fastify-cors'), {})
.register((require('fastify-arrow')))
.register(require('./plugins/api'))
.get('/', (req, reply) => reply.sendFile('video.html'))

fastify.listen(8080).then(() => console.log('server ready'));
8 changes: 7 additions & 1 deletion modules/demo/ssr/graph/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@
"fastify-socket.io": "2.0.0",
"fastify-static": "4.4.1",
"fastify": "3.20.2",
"fastify-multipart": "5.0.2",
"fastify-cors": "6.0.2",
"fastify-arrow": "0.1.0",
"apache-arrow": "^4.0.0",
"nanoid": "3.1.31",
"rxjs": "6.6.7",
"shm-typed-array": "0.0.13",
"simple-peer": "9.11.0",
"socket.io": "4.1.3"
"socket.io": "4.1.3",
"glob": "7.2.0",
"sharp": "0.29.2"
},
"files": [
"render",
Expand Down
254 changes: 254 additions & 0 deletions modules/demo/ssr/graph/plugins/api/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Copyright (c) 2021, NVIDIA CORPORATION.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

const {graphs, clients} = require('../graph');
const fs = require('fs')
const util = require('util')
const {pipeline} = require('stream')
const pump = util.promisify(pipeline)
const glob = require('glob');
const {Float32Buffer} = require('@rapidsai/cuda');
const {Graph} = require('@rapidsai/cugraph');
const {DataFrame, Series, Int32, Uint8, Uint32, Uint64, Float32, Float64} =
require('@rapidsai/cudf');
const {loadEdges, loadNodes} = require('../graph/loader');
const {RecordBatchStreamWriter} = require('apache-arrow');
const path = require('path');

function readDataFrame(path) {
if (path.indexOf('.csv', path.length - 4) !== -1) {
// csv file
return DataFrame.readCSV({sources: [path], header: 0, sourceType: 'files'});

} else if (path.indexOf('.parquet', path.length - 8) !== -1) {
// csv file
return DataFrame.readParquet({sources: [path]});
}
// if (df.names.includes('Unnamed: 0')) { df = df.cast({'Unnamed: 0': new Uint32}); }
return new DataFrame({});
}

async function getNodesForGraph(asDeviceMemory, nodes, numNodes) {
let nodesRes = {};
const pos = new Float32Buffer(Array.from(
{length: numNodes * 2},
() => Math.random() * 1000 * (Math.random() < 0.5 ? -1 : 1),
));

if (nodes.x in nodes.dataframe.names) {
nodesRes.nodeXPositions = asDeviceMemory(nodes.dataframe.get(node.x).data);
} else {
nodesRes.nodeXPositions = pos.subarray(0, pos.length / 2);
}
if (nodes.y in nodes.dataframe.names) {
nodesRes.nodeYPositions = asDeviceMemory(nodes.dataframe.get(node.y).data);
} else {
nodesRes.nodeYPositions = pos.subarray(pos.length / 2);
}
if (nodes.dataframe.names.includes(nodes.size)) {
nodesRes.nodeRadius = asDeviceMemory(nodes.dataframe.get(nodes.size).cast(new Uint8).data);
}
if (nodes.dataframe.names.includes(nodes.color)) {
nodesRes.nodeFillColors =
asDeviceMemory(nodes.dataframe.get(nodes.color).cast(new Uint32).data);
}
if (nodes.dataframe.names.includes(nodes.id)) {
nodesRes.nodeElementIndices =
asDeviceMemory(nodes.dataframe.get(nodes.id).cast(new Uint32).data);
}
return nodesRes;
}

async function getEdgesForGraph(asDeviceMemory, edges) {
let edgesRes = {};

if (edges.dataframe.names.includes(edges.color)) {
edgesRes.edgeColors = asDeviceMemory(edges.dataframe.get(edges.color).data);
} else {
edgesRes.edgeColors = asDeviceMemory(
Series
.sequence(
{type: new Uint64, size: edges.dataframe.numRows, init: 18443486512814075489n, step: 0})
.data);
}
if (edges.dataframe.names.includes(edges.id)) {
edgesRes.edgeList = asDeviceMemory(edges.dataframe.get(edges.id).cast(new Uint64).data);
}
if (edges.dataframe.names.includes(edges.bundle)) {
edgesRes.edgeBundles = asDeviceMemory(edges.dataframe.get(edges.bundle).data);
}
return edgesRes;
}

async function getPaginatedRows(df, pageIndex = 0, pageSize = 400, selected = []) {
if (selected.length != 0) {
const selectedSeries = Series.new({type: new Int32, data: selected}).unique(true);
const updatedDF = df.gather(selectedSeries);
const idxs = Series.sequence({
type: new Int32,
init: (pageIndex - 1) * pageSize,
size: Math.min(pageSize, updatedDF.numRows),
step: 1
});
return [updatedDF.gather(idxs).toArrow(), updatedDF.numRows];
} else {
const idxs = Series.sequence({
type: new Int32,
init: (pageIndex - 1) * pageSize,
size: Math.min(pageSize, df.numRows),
step: 1
});
return [df.gather(idxs).toArrow(), df.numRows];
}
}

module.exports = function(fastify, opts, done) {
fastify.addHook('preValidation', (request, reply, done) => {
// handle upload validation after reading request.file() in the route function itself
if (request.url == '/datasets/upload') {
done();
} else {
request.query.id =
(request.method == 'POST') ? `${request.body.id}:video` : `${request.query.id}:video`;
if (request.query.id in fastify[clients]) {
done();
} else {
reply.code(500).send('client handshake not established');
}
}
});

async function renderGraph(id, data) {
const asDeviceMemory = (buf) => new (buf[Symbol.species])(buf);
const src = data.edges.dataframe.get(data.edges.src);
const dst = data.edges.dataframe.get(data.edges.dst);
const graph = Graph.fromEdgeList(src, dst);
fastify[graphs][id] = {
refCount: 0,
nodes: await getNodesForGraph(asDeviceMemory, data.nodes, graph.numNodes),
edges: await getEdgesForGraph(asDeviceMemory, data.edges),
graph: graph,
};

++fastify[graphs][id].refCount;

return {
gravity: 0.0,
linLogMode: false,
scalingRatio: 5.0,
barnesHutTheta: 0.0,
jitterTolerance: 0.05,
strongGravityMode: false,
outboundAttraction: false,
graph: fastify[graphs][id].graph,
nodes: {
...fastify[graphs][id].nodes,
length: fastify[graphs][id].graph.numNodes,
},
edges: {
...fastify[graphs][id].edges,
length: fastify[graphs][id].graph.numEdges,
},
};
}

fastify.post('/datasets/upload', async function(req, reply) {
const data = await req.file();
const id = `${data.fields.id.value}:video`;
if (id in fastify[clients]) {
const basePath = `${__dirname}/../../data/`;
const filepath = path.join(basePath, data.filename);
const target = fs.createWriteStream(filepath);
try {
await pump(data.file, target);
} catch (err) { console.log(err); }
reply.send();
} else {
reply.code(500).send('client handshake not established');
}
});

fastify.get('/datasets', async (request, reply) => {
glob(`*.{csv,parquet}`,
{cwd: `${__dirname}/../../data/`},
(er, files) => { reply.send(JSON.stringify(files.concat(['defaultExample']))); });
});

fastify.post('/dataframe/load', async (request, reply) => {
const filePath = `${__dirname}/../../data/`
if (fs.existsSync(`${filePath}${request.body.nodes}`) &&
fs.existsSync(`${filePath}${request.body.edges}`)) {
fastify[clients][request.query.id].data.nodes.dataframe =
await readDataFrame(`${filePath}${request.body.nodes}`);

fastify[clients][request.query.id].data.edges.dataframe =
await readDataFrame(`${filePath}${request.body.edges}`);
}
else {
fastify[clients][request.query.id].data.nodes.dataframe = await loadNodes();
fastify[clients][request.query.id].data.edges.dataframe = await loadEdges();
}
if (fastify[clients][request.query.id].data.nodes.dataframe.numRows == 0) {
reply.code(500).send('no dataframe loaded');
}
reply.send(JSON.stringify({
'nodes': fastify[clients][request.query.id].data.nodes.dataframe.numRows,
'edges': fastify[clients][request.query.id].data.edges.dataframe.numRows
}));
})

fastify.get('/dataframe/columnNames/read', async (request, reply) => {
reply.send(JSON.stringify({
nodesParams: fastify[clients][request.query.id].data.nodes.dataframe.names.concat([null]),
edgesParams: fastify[clients][request.query.id].data.edges.dataframe.names.concat([null])
}));
});

fastify.post('/dataframe/columnNames/update', async (request, reply) => {
try {
Object.assign(fastify[clients][request.query.id].data.nodes, request.body.nodes);
Object.assign(fastify[clients][request.query.id].data.edges, request.body.edges);
reply.code(200).send('successfully updated columnNames');
} catch (err) { reply.code(500).send(err); }
});

fastify.post('/graph/render', async (request, reply) => {
try {
fastify[clients][request.query.id].graph =
await renderGraph('default', fastify[clients][request.query.id].data);
reply.code(200).send('successfully rendered graph');
} catch (err) { reply.code(500).send(err); }
})

fastify.get('/dataframe/read', async (request, reply) => {
try {
const pageIndex = parseInt(request.query.pageIndex);
const pageSize = parseInt(request.query.pageSize);
const dataframe = request.query.dataframe; //{'nodes', 'edges'}
const [arrowTable, numRows] =
await getPaginatedRows(fastify[clients][request.query.id].data[dataframe].dataframe,
pageIndex,
pageSize,
fastify[clients][request.query.id].state.selectedInfo[dataframe]);

arrowTable.schema.metadata.set('numRows', numRows);
RecordBatchStreamWriter.writeAll(arrowTable).pipe(reply.stream());
} catch (err) {
request.log.error({err}, '/run_query error');
reply.code(500).send(err);
}
});

done();
}
Loading