Skip to content

Conversation

joepavitt
Copy link
Contributor

@joepavitt joepavitt commented Aug 14, 2025

Description

  • Changes the default behaviour to not use backpressure.
  • Adds a configuration option to enable "backpressure" should the user want to still use this
  • Adds a second output if the "split" option is enabled which returns a "complete" message to ensure the database entries output on output 1. This is only required when results.length % pagination size is 0, as we end up with a msg.payload = undefined in a final message which is misleading and could cause problems.
  • Updates documentation to detail updated behaviour

Related Issue(s)

Closes #10

@joepavitt joepavitt requested a review from Steve-Mcl August 14, 2025 15:53
@joepavitt
Copy link
Contributor Author

joepavitt commented Aug 14, 2025

Just noticed one issue remaining which I'm investigating now, if you set rows per message higher than the number of rows in the resulting query, it returns on the second output because it (correctly) has complete: true on the message. I'll improve the logic here.

Update: done

Copy link

@Steve-Mcl Steve-Mcl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not working as expected.

Issues noted:

  1. when "Split", "1" part, "backpresure ON" - all rows are released immediately
  2. when "Split", "1" part - msg.parts.complete is output from port2
  3. when "Split", "2" part - msg.parts.complete is output from port1 (port 2 never sends anything)
Image

Other points of note:

  • msg.parts is missing the usual len and count that the join node can use in auto mode
  • msg.id is a weird random number (in Node-RED core it is usually a crypto random string)

Final thoughts

IMHO, this should "just work" with a join node set to Automatic mode.
That would mean we can simple drop a join node on and it just works.
To achieve that, I would suggest we set the necessary parts props id, count and index and len on each message. To also be able to handle join in manual mode, set msg.parts.complete on the last msg


Demo flow I used for testing:

Image
[{"id":"124e7ea9ce2b0351","type":"tables-query","z":"5f26939bbac6427a","name":"Split, 1, No bp","query":"SELECT * FROM \"table 3 ok!\" order by id asc;","split":true,"enableBackPressure":false,"rowsPerMsg":"1","outputs":2,"x":860,"y":1020,"wires":[["ce6c7cd5c70dfb5f","c57d27788f69b507"],["7c8f4e9bf63f9320","c57d27788f69b507"]]},{"id":"345e7e5b26b20fd1","type":"inject","z":"5f26939bbac6427a","name":"Do query","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":680,"y":1020,"wires":[["124e7ea9ce2b0351"]]},{"id":"ce6c7cd5c70dfb5f","type":"debug","z":"5f26939bbac6427a","name":"Test 1: Output 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1000,"wires":[]},{"id":"7c8f4e9bf63f9320","type":"debug","z":"5f26939bbac6427a","name":"Test 1: Output 2","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1040,"wires":[]},{"id":"6323dfb47660e6eb","type":"tables-query","z":"5f26939bbac6427a","name":"Split, 2, No bp","query":"SELECT * FROM \"table 3 ok!\" order by id asc;","split":true,"enableBackPressure":false,"rowsPerMsg":"2","outputs":2,"x":860,"y":1180,"wires":[["2f253a7fee86dda2","d39bcb87b834beb8"],["4e63e3b8178146b3","d39bcb87b834beb8"]]},{"id":"72bdce2eb7487a53","type":"inject","z":"5f26939bbac6427a","name":"Do query","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":680,"y":1180,"wires":[["6323dfb47660e6eb"]]},{"id":"2f253a7fee86dda2","type":"debug","z":"5f26939bbac6427a","name":"Test 2: Output 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1160,"wires":[]},{"id":"4e63e3b8178146b3","type":"debug","z":"5f26939bbac6427a","name":"Test 2: Output 2","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1200,"wires":[]},{"id":"422edbccd8c82480","type":"tables-query","z":"5f26939bbac6427a","name":"Split, 1, bp","query":"SELECT * FROM \"table 3 ok!\" order by id asc;","split":true,"enableBackPressure":true,"rowsPerMsg":"1","outputs":2,"x":850,"y":1340,"wires":[["67855d784d8c4fd2","993c669f3da8b9a5"],["9007e4e2b08bb9f8","993c669f3da8b9a5"]]},{"id":"4551b84f75746af6","type":"inject","z":"5f26939bbac6427a","name":"Do query","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":680,"y":1340,"wires":[["422edbccd8c82480"]]},{"id":"c29d62b9251d4274","type":"inject","z":"5f26939bbac6427a","name":"tick","props":[{"p":"tick","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":690,"y":1380,"wires":[["422edbccd8c82480"]]},{"id":"67855d784d8c4fd2","type":"debug","z":"5f26939bbac6427a","name":"Test 3: Output 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1320,"wires":[]},{"id":"9007e4e2b08bb9f8","type":"debug","z":"5f26939bbac6427a","name":"Test 3: Output 2","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1360,"wires":[]},{"id":"b73123f16a535308","type":"tables-query","z":"5f26939bbac6427a","name":"Split, 2, bp","query":"SELECT * FROM \"table 3 ok!\" order by id asc;","split":true,"enableBackPressure":true,"rowsPerMsg":"1","outputs":2,"x":850,"y":1500,"wires":[["05529ecc8b564ab5","c64c1f8903c7049e"],["a96568127ca33a9c","c64c1f8903c7049e"]]},{"id":"82ccf86864f0e0a7","type":"inject","z":"5f26939bbac6427a","name":"Do query","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":680,"y":1500,"wires":[["b73123f16a535308"]]},{"id":"b9b54138724a69a5","type":"inject","z":"5f26939bbac6427a","name":"tick","props":[{"p":"tick","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":690,"y":1540,"wires":[["b73123f16a535308"]]},{"id":"05529ecc8b564ab5","type":"debug","z":"5f26939bbac6427a","name":"Test 4: Output 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1480,"wires":[]},{"id":"a96568127ca33a9c","type":"debug","z":"5f26939bbac6427a","name":"Test 4: Output 2","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1520,"wires":[]},{"id":"8f639f01e675956e","type":"tables-query","z":"5f26939bbac6427a","name":"Multiple Queries, no split","query":"SELECT * FROM \"table 3 ok!\" order by id asc;\nSELECT * FROM \"table 3 ok!\" order by id asc;","split":false,"enableBackPressure":false,"rowsPerMsg":"1","outputs":1,"x":890,"y":1640,"wires":[["f8881b45ebf7b0d9"]]},{"id":"eebdeb40b3566ddf","type":"inject","z":"5f26939bbac6427a","name":"Do query","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":680,"y":1640,"wires":[["8f639f01e675956e"]]},{"id":"f8881b45ebf7b0d9","type":"debug","z":"5f26939bbac6427a","name":"Test 5","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1110,"y":1640,"wires":[]},{"id":"854fa9498276e408","type":"inject","z":"5f26939bbac6427a","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[1,10,100]","payloadType":"json","x":240,"y":1840,"wires":[["533b4206953c81eb"]]},{"id":"533b4206953c81eb","type":"split","z":"5f26939bbac6427a","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","property":"payload","x":390,"y":1840,"wires":[["2fd622941547db79"]]},{"id":"2fd622941547db79","type":"debug","z":"5f26939bbac6427a","name":"debug 21","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":560,"y":1840,"wires":[]},{"id":"c57d27788f69b507","type":"join","z":"5f26939bbac6427a","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":910,"y":1080,"wires":[["2de746d8617895ef"]]},{"id":"2de746d8617895ef","type":"debug","z":"5f26939bbac6427a","name":"Test 1: Output 3","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1080,"wires":[]},{"id":"d39bcb87b834beb8","type":"join","z":"5f26939bbac6427a","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":910,"y":1240,"wires":[["b736e4e0a7a0ad63"]]},{"id":"b736e4e0a7a0ad63","type":"debug","z":"5f26939bbac6427a","name":"Test 2: Output 3","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1240,"wires":[]},{"id":"4786046a4f7251d4","type":"debug","z":"5f26939bbac6427a","name":"Test 2: Output 3","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1400,"wires":[]},{"id":"993c669f3da8b9a5","type":"join","z":"5f26939bbac6427a","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":910,"y":1400,"wires":[["4786046a4f7251d4"]]},{"id":"c64c1f8903c7049e","type":"join","z":"5f26939bbac6427a","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":910,"y":1560,"wires":[["3a5823fceac5caa6"]]},{"id":"3a5823fceac5caa6","type":"debug","z":"5f26939bbac6427a","name":"Test 2: Output 3","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1080,"y":1560,"wires":[]},{"id":"8136e49bcbdf0a9a","type":"comment","z":"5f26939bbac6427a","name":"never outputs anything to \"output 2\"","info":"","x":440,"y":1180,"wires":[]},{"id":"cbf395a2d31ca1b1","type":"comment","z":"5f26939bbac6427a","name":"Does not respect \"backpressure\" setting \\n (outputs all table messages without waiting for msg.tick)","info":"","x":380,"y":1340,"wires":[]},{"id":"3abc62901e330649","type":"comment","z":"5f26939bbac6427a","name":"Does not respect \"backpressure\" setting \\n (outputs all table messages without waiting for msg.tick)","info":"","x":380,"y":1500,"wires":[]},{"id":"c6cd2e71ddb16a65","type":"comment","z":"5f26939bbac6427a","name":"parts.id is a random number instead of a GUID","info":"","x":410,"y":1020,"wires":[]},{"id":"5ebdc40a34a1041b","type":"comment","z":"5f26939bbac6427a","name":"parts.len and parts.count are not present","info":"","x":420,"y":1060,"wires":[]},{"id":"d50739c0f4af5dac","type":"inject","z":"5f26939bbac6427a","name":"","props":[{"p":"payload"},{"p":"parts","v":"{\"index\":0, \"count\":2}","vt":"json"},{"p":"parts.id","v":"abcd1234","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[1,10,100]","payloadType":"json","x":240,"y":1920,"wires":[["bc6b8fe7d92fceed"]]},{"id":"7e8c5cd60cd81fc0","type":"inject","z":"5f26939bbac6427a","name":"","props":[{"p":"payload"},{"p":"parts","v":"{\"index\":1, \"count\":2}","vt":"json"},{"p":"parts.id","v":"abcd1234","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[1,10,100]","payloadType":"json","x":241.8958282470703,"y":1968.77783203125,"wires":[["bc6b8fe7d92fceed"]]},{"id":"bc6b8fe7d92fceed","type":"join","z":"5f26939bbac6427a","name":"","mode":"auto","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":"false","timeout":"","count":"","reduceRight":false,"x":390,"y":1940,"wires":[["7861762021ad827e"]]},{"id":"7861762021ad827e","type":"debug","z":"5f26939bbac6427a","name":"debug 22","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":560,"y":1940,"wires":[]},{"id":"fb8afeff2efb4b50","type":"comment","z":"5f26939bbac6427a","name":"how to do auto joining","info":"","x":520,"y":1900,"wires":[]},{"id":"90c04f2df3df9887","type":"comment","z":"5f26939bbac6427a","name":"what core nodes output in msg.parts","info":"","x":480,"y":1800,"wires":[]},{"id":"a541c637da938ebc","type":"global-config","env":[],"modules":{"@flowfuse/nr-tables-nodes":"0.1.0"}}]
```

@joepavitt
Copy link
Contributor Author

joepavitt commented Aug 15, 2025

when "Split", "1" part - msg.parts.complete is output from port2
when "Split", "2" part - msg.parts.complete is output from port1 (port 2 never sends anything)

This is because I detail that the data will all go out of output 1.

In a situation where length % split = 0 then postgres produces an extra undefined payload, along with the complete message. When it's not equal to 0, it includes it with the data, and doesn't send the undefined message.

Not sure how best to handle that? I didn't want to send the complete message out on output 2, because then data is missing from output 1, similarly, I didn't want to duplicate the data in output 2, because then, if you're using a join, merging both outputs, data will get duplicated.

@joepavitt
Copy link
Contributor Author

msg.parts is missing the usual len and count that the join node can use in auto mode

I need to sanity check how the contrib-postgres node was doing this because when I used the fork, they're set as undefined, and so I removed them. I tested this with Join "automatic" and it worked fine.

@Steve-Mcl
Copy link

I tested this with Join "automatic" and it worked fine.

Did you inspect the payload (because of how you are sending the messages and parts, we get extra empty rows)

@Steve-Mcl
Copy link

In a situation where length % split = 0 then postgres produces an extra undefined payload, along with the complete message. When it's not equal to 0, it includes it with the data, and doesn't send the undefined message.

Not sure how best to handle that? I didn't want to send the complete message out on output 2, because then data is missing from output 1, similarly, I didn't want to duplicate the data in output 2, because then, if you're using a join, merging both outputs, data will get duplicated.

In all honesty, I think we should do what the core nodes do. 1 output. include all the necessary fields. The rest takes care of itself.

@joepavitt
Copy link
Contributor Author

joepavitt commented Aug 15, 2025

In all honesty, I think we should do what the core nodes do. 1 output. include all the necessary fields. The rest takes care of itself.

But it doesn't. The core node outputs an undefined payload at the end when length % split = 0, which is why i went down this route.

@joepavitt
Copy link
Contributor Author

we get extra empty row

That's the point I've made multiple times, this is what happens in the underlying postgres node

@joepavitt
Copy link
Contributor Author

To achieve that, I would suggest we set the necessary parts props id, count and index and len on each message.

We cannot do this as we do not know len until the very last message (which, when length % split = 0, is undefined payload)

@joepavitt
Copy link
Contributor Author

msg.id is a weird random number (in Node-RED core it is usually a crypto random string)

This hasn't changed from the contrib node, and isn't a blocker to this PR, can open an issue and do a follow up if you really must, but it's not at all essential.

@joepavitt
Copy link
Contributor Author

I suspect we need to connect on this, as async isn't working. I'll be online on Tuesday.

@joepavitt joepavitt requested a review from Steve-Mcl August 15, 2025 17:07
@Steve-Mcl
Copy link

Steve-Mcl commented Aug 23, 2025

I suspect we need to connect on this, as async isn't working. I'll be online on Tuesday.

I know we didnt get a chance to connect on this - so rather than me trying to express how I think it should be in words, here is a demo and some supporting code that works with auto-mode join node OOTB - exactly like the SPLIT-JOIN work

It achieves this by first doing a read of the first row & storing it in a peek var, then gets the next row, sends the peeked (prev) row and, gets the next, updates the peek, and so on until the last read row is not found. The last row (stored in the the peek var) is then passed in the past message along with a complete flag and the count.


CLICK ME FOR CODE

This code replaces the current section in .js file between if (node.split) { and the } else {

						if (node.split) {
							let partsIndex = 0;
							delete msg.complete;

							cursor = client.query(new Cursor(query, params));

							let peekRows = [];
							let resultMeta = null;

							// Helper to send the next message, using peekRows as the buffer
							const sendNext = (nextRows, isLast) => {
								const msg2 = Object.assign({}, msg, {
									payload: (node.rowsPerMsg || 1) > 1 ? nextRows : nextRows[0],
									pgsql: {
										command: resultMeta ? resultMeta.command : undefined,
										rowCount: resultMeta ? resultMeta.rowCount : undefined
									},
									parts: {
										id: partsId,
										type: 'array',
										index: partsIndex
									}
								});
								if (msg.parts) {
									msg2.parts.parts = msg.parts;
								}
								if (isLast) {
									msg2.parts.count = partsIndex + 1;
									msg2.complete = true;
								}
								partsIndex++;
								if (config.enableBackPressure) {
									downstreamReady = false;
								} else {
									downstreamReady = true;
								}
								send(msg2);
								if (isLast) {
									if (tickUpstreamNode) {
										tickUpstreamNode.receive({ tick: true });
									}
									if (done) {
										done();
									}
								}
							};

							// Read the first batch to prime the peek buffer
							const primePeek = () => {
								cursor.read(node.rowsPerMsg || 1, (err, rows, result) => {
									if (err) {
										handleError(err);
									} else {
										resultMeta = result;
										peekRows = rows;
										getNextRows();
									}
								});
							};

							getNextRows = () => {
								if (!downstreamReady) return;
								if (!peekRows || peekRows.length === 0) {
									// No more data, nothing to send
									return;
								}
								// Read the next batch to peek ahead
								cursor.read(node.rowsPerMsg || 1, (err, nextRows) => {
									if (err) {
										handleError(err);
									} else {
										const isLast = !nextRows || nextRows.length === 0;
										sendNext(peekRows, isLast);
										if (isLast) {
											handleDone(false);
										} else {
											peekRows = nextRows;
											getNextRows();
										}
									}
								});
							};

							primePeek();
						} else {


chrome_4CLxRawU0s

@joepavitt
Copy link
Contributor Author

Will re-review this later today and action feedback, not required for the 2.21 release

@knolleary knolleary self-requested a review October 2, 2025 13:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Split results in multiple messages not working

2 participants