Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,22 @@ This node allows you to write and run queries against database tables that are m

## Outputs

### Single Message Output

The response (rows) is provided in `msg.payload` as an array.

An exception is if the *Split results* option is enabled and the *Number of rows per message* is set to **1**,
then `msg.payload` is not an array but the single-row response.
### Split Results

In the case where the _"split results"_ option is enabled, then the rows are split
into multiple messages, defined by the _"number of rows per message"_ option.

Additional information is provided as `msg.pgsql.rowCount` and `msg.pgsql.command`.
See the [underlying documentation](https://node-postgres.com/apis/result) for details.
If the _"number of rows per message"_ is set to 1, then the response is provided in
`msg.payload` as a single object, otherwise it is an array.

In the case of multiple queries, then `msg.pgsql` is an array.
A second output is also added to the node when the _"split results"_ option is enabled.
This second output emits a "complete" message after the last message of the sequence is emitted,
which contains details about the total number of rows, and contains the data in order to join your
separate messages into a single array again using a "join" node.

## Inputs

Expand Down
28 changes: 15 additions & 13 deletions nodes/locales/en-US/query.html
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
<script type="text/x-red" data-help-name="tables-query">
<script type="text/html" data-help-name="tables-query">
<p>
This node allows you to write and run queries against database tables that are managed by FlowFuse Tables.
</p>

<h3>Outputs</h3>
<h4>Single Message Output</h4>
<p>The response (rows) is provided in <code>msg.payload</code> as an array.</p>
<h4>Split Results</h4>
<p>
An exception is if the <em>Split results</em> option is enabled and the <em>Number of rows per message</em> is set to 1,
then <code>msg.payload</code> is not an array but the single-row response.
In the case where the <em>"split results"</em> option is enabled, then the rows are split
into multiple messages, defined by the <em>"number of rows per message"</em> option.
</p>
<p>
Additional information is provided as <code>msg.pgsql.rowCount</code> and <code>msg.pgsql.command</code>.
See the <a href="https://node-postgres.com/apis/result">underlying documentation</a> for details.
If the <em>"number of rows per message"</em> is set to 1, then the response is provided in
<code>msg.payload</code> as a single object, otherwise it is an array.
</p>
<p>
A second output is also added to the node when the <em>"split results"</em> option is enabled.
This second output emits a "complete" message after the last message of the sequence is emitted,
which contains details about the total number of rows, and contains the data in order to join your
separate messages into a single array again using a "join" node.
</p>
<p>In the case of multiple queries, then <code>msg.pgsql</code> is an array.</p>

<h3>Inputs</h3>
<h4>SQL query template</h4>
Expand Down Expand Up @@ -64,17 +71,12 @@ <h4>Parameterized queries</h4>
<h3>Backpressure</h3>
<p>
This node supports <em>backpressure</em> / <em>flow control</em>:
when the <em>Split results</em> option is enabled, it waits for a <em>tick</em> before releasing the next batch of lines,
when the <em>Split Results</em> and <em>Enable Backpressure</em> options are enabled, it waits for a <em>tick</em> before releasing the next batch of lines,
to make sure the rest of your Node-RED flow is ready to process more data
(instead of risking an out-of-memory condition), and also conveys this information upstream.
</p><p>
So when the <em>Split results</em> option is enabled, this node will only output one message at first,
So when the <em>Split Results</em> and <em>Enable Backpressure</em> options are enabled, this node will only output one message at first,
and then awaits a message containing a truthy <code>msg.tick</code> before releasing the next message.
</p><p>
To make this behaviour potentially automatic (avoiding manual wires),
this node declares its ability by exposing a truthy <code>node.tickConsumer</code> for downstream nodes to detect this feature,
and a truthy <code>node.tickProvider</code> for upstream nodes.
Likewise, this node detects upstream nodes using the same back-pressure convention, and automatically sends ticks.
</p>

<h3>Sequences for split results</h3>
Expand Down
3 changes: 2 additions & 1 deletion nodes/locales/en-US/query.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"name": "Name",
"query": "Query",
"split": "Split results in multiple messages",
"rowsPerMsg": "Rows per message"
"rowsPerMsg": "Rows per Message:",
"enableBackPressure": "Enable Backpressure:"
}
}
}
51 changes: 44 additions & 7 deletions nodes/query.html
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@
split: {
value: false,
},
enableBackPressure: {
value: false,
},
rowsPerMsg: {
value: 1,
}
},
outputs: {
value: 1,
},
},
inputs: 1,
outputs: 1,
Expand All @@ -34,11 +40,15 @@

// when node-input-split is modified, react:
$('#node-input-split').on('change', function () {
// if checked, show the rowsPerMsg input
// if checked, show the rowsPerMsg input and set outputs to 2
if ($('#node-input-split').is(':checked')) {
$('#node-input-rowsPerMsg-container').show();
// Set number of outputs to 2 when split is enabled
$('#node-input-outputs').val(2);
} else {
$('#node-input-rowsPerMsg-container').hide();
// Set number of outputs back to 1 when split is disabled
$('#node-input-outputs').val(1);
}
});

Expand All @@ -52,12 +62,14 @@
},
oneditsave: function () {
$('#node-input-query').val(this.editor.getValue());
const split = $('#node-input-split').is(':checked');
$('#node-input-outputs').val(split ? 2 : 1);
delete this.editor;
},
});
</script>

<script type="text/x-red" data-template-name="tables-query">
<script type="text/html" data-template-name="tables-query">
<div class="form-row">
<label for="node-input-name">
<i class="icon-tag"></i>
Expand All @@ -77,15 +89,40 @@
</div>
<h4 style="margin-bottom: 0.5rem;">Output</h4>
<div class="form-row">
<input type="hidden" id="node-input-outputs" value="1">
<input type="checkbox" id="node-input-split" style="display: inline-block; width: auto; vertical-align: top;" />
<label for="node-input-split" style="width: auto;">
<span data-i18n="tables-query.label.split"></span>
</label>
</div>
<div class="form-row" id="node-input-rowsPerMsg-container">
<label for="node-input-rowsPerMsg">
<span data-i18n="tables-query.label.rowsPerMsg"></span>
</label>
<input type="number" id="node-input-rowsPerMsg" placeholder="1" value="1" min="1" />
<div class="form-flex-row">
<label for="node-input-rowsPerMsg" style="width: auto;">
<span data-i18n="tables-query.label.rowsPerMsg"></span>
</label>
<input type="number" id="node-input-rowsPerMsg" placeholder="1" value="1" min="1" style="width: 100px; flex-grow: 1;" />
</div>
<div>
<input type="checkbox" id="node-input-enableBackPressure" style="display: inline-block; width: auto; vertical-align: top;" />
<label for="node-input-enableBackPressure" style="width: auto;">
<span data-i18n="tables-query.label.enableBackPressure"></span>
</label>
</div>
</div>
</script>

<style>
#node-input-rowsPerMsg-container {
padding-left: 24px;
display: flex;
flex-direction: column;
gap: 6px;
margin-top: -6px;
}
.form-flex-row {
display: flex;
flex-direction: row;
gap: 6px;
align-items: center;
}
</style>
19 changes: 15 additions & 4 deletions nodes/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ module.exports = function (RED) {

cursor = client.query(new Cursor(query, params));

const cursorcallback = (err, rows, result) => {
const cursorCallback = (err, rows, result) => {
if (err) {
handleError(err);
} else {
Expand Down Expand Up @@ -240,8 +240,19 @@ module.exports = function (RED) {
msg2.complete = true;
}
partsIndex++;
downstreamReady = false;
send(msg2);
if (config.enableBackPressure) {
// await msg.tick before sending further messages
downstreamReady = false;
} else {
// send all of the messages as quick as possible
downstreamReady = true;
}
if (typeof msg2.payload === 'undefined' && msg2.complete) {
// this contains no data, and is just a "complete" message
send([null, msg2]);
} else {
send([msg2, null]);
}
if (complete) {
if (tickUpstreamNode) {
tickUpstreamNode.receive({ tick: true });
Expand All @@ -257,7 +268,7 @@ module.exports = function (RED) {

getNextRows = () => {
if (downstreamReady) {
cursor.read(node.rowsPerMsg || 1, cursorcallback);
cursor.read(node.rowsPerMsg || 1, cursorCallback);
}
};
} else {
Expand Down