Skip to content

Commit 066a75f

Browse files
authored
Merge pull request #6 from bonnydeal/feature/concurrent-sessions
Feature/concurrent sessions
2 parents 6870818 + a7eb224 commit 066a75f

File tree

2 files changed

+101
-76
lines changed

2 files changed

+101
-76
lines changed

neo4j.html

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
defaults: {
66
server: {value:"", type:"neo4j-bolt-server", required:true},
77
name: {value:""},
8-
query: {value: ""}
8+
query: {value: ""},
9+
sessions: {value: 10, required: true},
10+
database: {value: ""}
911
},
1012
inputs:1,
11-
outputs:2,
13+
outputs:3,
14+
outputLabels: ["single", "array", "no session available - retry later"],
1215
icon: "neo4j.png",
1316
label: function() {
1417
return this.name||"neo4j-bolt";
@@ -27,6 +30,12 @@
2730
<br/>
2831
<label for="node-input-query"><i class="icon-tag"></i>Cypher Query</label>
2932
<textarea id="node-input-query" placeholder="MATCH (o:Object {attrib: 'value'}) RETURN o" style="width: 70%;"></textarea>
33+
<br/>
34+
<label for="node-input-sessions"><i class="icon-tag"></i>Concurrent sessions</label>
35+
<input type="text" id="node-input-sessions" placeholder="10" />
36+
<br />
37+
<label for="node-input-database"><i class="icon-tag"></i> Database</label>
38+
<input type="text" id="node-input-database" placeholder="database">
3039
</div>
3140
</script>
3241

@@ -40,6 +49,7 @@
4049
<ul>
4150
<li><code>1:</code>If the query returns a single record, it is returned in output #1 as an object with the Neo4j Record (Node, Relationship, Path, Integer) fields in <code>msg.payload</code></li>
4251
<li><code>2:</code>If the query returns a multiple records, they are returned in output #2 as an array of objects in <code>msg.payload</code></li>
52+
<li><code>2:</code>If there are no available sessions, the msg is passed untouched for the calling flow to decide on what to do, e.g. retry</li>
4353
</ul>
4454
</p>
4555
</script>

neo4j.js

Lines changed: 89 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -4,105 +4,120 @@ module.exports = function (RED) {
44
RED.nodes.createNode(this, config)
55
var node = this
66

7+
var sessions = []
8+
var readySessionList = []
9+
710
node.server = RED.nodes.getNode(config.server)
811

912
const driver = node.server.driver
10-
const session = driver.session() // session for status display
1113

12-
if (session) {
14+
// set up a list of sessions for later use, to avoid session allocation error later on
15+
for( i=0; i<config.sessions; i++){
16+
try {
17+
sessions.push(driver.session({database: config.database || 'neo4j'}))
18+
readySessionList.push(i)
19+
} catch (err) {
20+
console.log(err)
21+
break
22+
}
23+
}
24+
25+
if (readySessionList.length > 0) {
1326
node.status({
1427
fill: 'green',
1528
shape: 'dot',
16-
text: 'node-red:common.status.connected'
29+
text: `connected: ${readySessionList.length} sesions`
1730
})
1831
node.on('input', function (msg) {
1932
var query = config.query || msg.query
20-
/*
21-
* Need one session per request
22-
* ref: https://stackoverflow.com/questions/62615761/queries-cannot-be-run-directly-on-a-session-with-an-open-transaction-either-run
23-
*/
24-
var boltSession = msg.database
25-
? driver.session({database: msg.database}) // use msg.database if supplied
26-
: driver.session()
27-
let params = null
28-
if (typeof (msg.params) === 'string') {
29-
params = JSON.parse(msg.params)
30-
} else {
31-
params = msg.params
32-
}
3333

34-
function processInteger (integer) {
35-
if (integer.constructor.name === 'Integer') {
36-
return integer.toNumber()
34+
if( readySessionList.length > 0 ){
35+
// remove and use a session
36+
var readySession = readySessionList.shift()
37+
var boltSession = sessions[readySession]
38+
39+
let params = null
40+
if (typeof (msg.params) === 'string') {
41+
params = JSON.parse(msg.params)
42+
} else {
43+
params = msg.params
3744
}
38-
return integer
39-
}
4045

41-
function processRecord (record) {
42-
if (record.constructor.name === 'Integer') {
43-
return record.toNumber()
46+
function processInteger (integer) {
47+
if (integer.constructor.name === 'Integer') {
48+
return integer.toNumber()
49+
}
50+
return integer
4451
}
4552

46-
if (record.constructor.name === 'Path') {
47-
record.start.identity = processInteger(record.start.identity)
48-
record.end.identity = processInteger(record.end.identity)
49-
record.segments = record.segments.map(segment => {
53+
function processRecord (record) {
54+
if (record.constructor.name === 'Integer') {
55+
return record.toNumber()
56+
}
5057

51-
segment.start.identity = processInteger(segment.start.identity)
52-
segment.end.identity = processInteger(segment.end.identity)
58+
if (record.constructor.name === 'Path') {
59+
record.start.identity = processInteger(record.start.identity)
60+
record.end.identity = processInteger(record.end.identity)
61+
record.segments = record.segments.map(segment => {
5362

54-
segment.relationship.identity = processInteger(segment.relationship.identity)
55-
segment.relationship.start = processInteger(segment.relationship.start)
56-
segment.relationship.end = processInteger(segment.relationship.end)
63+
segment.start.identity = processInteger(segment.start.identity)
64+
segment.end.identity = processInteger(segment.end.identity)
5765

58-
return segment
59-
})
60-
return record
61-
}
66+
segment.relationship.identity = processInteger(segment.relationship.identity)
67+
segment.relationship.start = processInteger(segment.relationship.start)
68+
segment.relationship.end = processInteger(segment.relationship.end)
6269

63-
if (record.constructor.name === 'Relationship') {
64-
record.identity = processInteger(record.identity)
65-
record.start = processInteger(record.start)
66-
record.end = processInteger(record.end)
67-
return record
68-
}
70+
return segment
71+
})
72+
return record
73+
}
74+
75+
if (record.constructor.name === 'Relationship') {
76+
record.identity = processInteger(record.identity)
77+
record.start = processInteger(record.start)
78+
record.end = processInteger(record.end)
79+
return record
80+
}
81+
82+
if (record.constructor.name === 'Node') {
83+
record.identity = processInteger(record.identity)
84+
return record
85+
}
6986

70-
if (record.constructor.name === 'Node') {
71-
record.identity = processInteger(record.identity)
7287
return record
7388
}
74-
75-
return record
76-
}
77-
78-
boltSession.run(query, params).then(result => {
79-
if (result.records.length > 1) {
80-
msg.payload = [];
81-
result.records.forEach(function (record, index, array) {
89+
90+
boltSession.run(query, params).then(result => {
91+
if (result.records.length > 1) {
92+
msg.payload = [];
93+
result.records.forEach(function (record, index, array) {
94+
let itm = {};
95+
record.forEach(function (item, index, array) {
96+
itm[index] = processRecord(item)
97+
})
98+
msg.payload.push(itm)
99+
})
100+
node.send([null, msg, null])
101+
} else if (result.records.length == 1) {
82102
let itm = {};
83-
record.forEach(function (item, index, array) {
103+
result.records[0].forEach(function (item, index, array) {
84104
itm[index] = processRecord(item)
85105
})
86-
msg.payload.push(itm)
87-
})
88-
node.send([null, msg])
89-
} else if (result.records.length == 1) {
90-
let itm = {};
91-
result.records[0].forEach(function (item, index, array) {
92-
itm[index] = processRecord(item)
93-
})
94-
msg.payload = itm
95-
node.send([msg, null])
96-
} else {
97-
msg.payload = null
98-
node.send([msg, null])
99-
}
100-
boltSession.close()
101-
})
102-
.catch(err => {
103-
boltSession.close()
106+
msg.payload = itm
107+
node.send([msg, null, null])
108+
} else {
109+
node.send([null, null, null])
110+
}
111+
})
112+
.catch(err => {
104113
node.error(err, msg);
105-
})
114+
}).finally(() => {
115+
readySessionList.push(readySession)
116+
})
117+
} else {
118+
// no sessions available, send the message out on the third port for further processing bty caller
119+
node.send([null, null, msg])
120+
}
106121
})
107122
} else {
108123
node.status({
@@ -113,7 +128,7 @@ module.exports = function (RED) {
113128
}
114129

115130
node.on('close', function () {
116-
session.close()
131+
sessions.map(s => s.close())
117132
driver.close()
118133
})
119134
}

0 commit comments

Comments
 (0)