Skip to content

Commit db90c3f

Browse files
Merge pull request #16286 from suuuuuuminnnnnn/feat/microservices-mqtt-per-handler-qos
feat(microservices): support per-handler qos in mqtt
2 parents 9a0ea33 + f514870 commit db90c3f

File tree

2 files changed

+105
-2
lines changed

2 files changed

+105
-2
lines changed

packages/microservices/server/server-mqtt.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,21 @@ export class ServerMqtt extends Server<MqttEvents, MqttStatus> {
9090

9191
const registeredPatterns = [...this.messageHandlers.keys()];
9292
registeredPatterns.forEach(pattern => {
93-
const { isEventHandler } = this.messageHandlers.get(pattern)!;
93+
const handler = this.messageHandlers.get(pattern)!;
94+
const { isEventHandler, extras } = handler;
95+
96+
const globalSubscribeOptions = this.getOptionsProp(
97+
this.options,
98+
'subscribeOptions',
99+
);
100+
const subscribeOptions =
101+
extras?.qos !== undefined
102+
? { ...globalSubscribeOptions, qos: extras.qos }
103+
: globalSubscribeOptions;
104+
94105
mqttClient.subscribe(
95106
isEventHandler ? pattern : this.getRequestPattern(pattern),
96-
this.getOptionsProp(this.options, 'subscribeOptions'),
107+
subscribeOptions,
97108
);
98109
});
99110
}

packages/microservices/test/server/server-mqtt.spec.ts

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,98 @@ describe('ServerMqtt', () => {
9393
server.bindEvents(mqttClient);
9494
expect(subscribeSpy.calledWith(pattern)).to.be.true;
9595
});
96+
97+
describe('per-handler QoS via extras.qos', () => {
98+
it('should use extras.qos=2 when handler specifies qos 2', () => {
99+
const pattern = 'alerts/critical';
100+
const handler = Object.assign(sinon.spy(), { extras: { qos: 2 } });
101+
untypedServer.messageHandlers = objectToMap({
102+
[pattern]: handler,
103+
});
104+
server.bindEvents(mqttClient);
105+
expect(subscribeSpy.calledOnce).to.be.true;
106+
expect(subscribeSpy.firstCall.args[0]).to.equal(pattern);
107+
expect(subscribeSpy.firstCall.args[1]).to.deep.equal({ qos: 2 });
108+
});
109+
110+
it('should use extras.qos=0 when handler specifies qos 0', () => {
111+
const pattern = 'telemetry/data';
112+
const handler = Object.assign(sinon.spy(), { extras: { qos: 0 } });
113+
untypedServer.messageHandlers = objectToMap({
114+
[pattern]: handler,
115+
});
116+
server.bindEvents(mqttClient);
117+
expect(subscribeSpy.calledOnce).to.be.true;
118+
expect(subscribeSpy.firstCall.args[0]).to.equal(pattern);
119+
expect(subscribeSpy.firstCall.args[1]).to.deep.equal({ qos: 0 });
120+
});
121+
122+
it('should use global subscribeOptions when extras.qos is undefined', () => {
123+
const globalQos = 1;
124+
const serverWithOptions = new ServerMqtt({
125+
subscribeOptions: { qos: globalQos },
126+
});
127+
const untypedServerWithOptions = serverWithOptions as any;
128+
const pattern = 'events/general';
129+
const handler = Object.assign(sinon.spy(), { extras: {} });
130+
untypedServerWithOptions.messageHandlers = objectToMap({
131+
[pattern]: handler,
132+
});
133+
serverWithOptions.bindEvents(mqttClient);
134+
expect(subscribeSpy.calledOnce).to.be.true;
135+
expect(subscribeSpy.firstCall.args[0]).to.equal(pattern);
136+
expect(subscribeSpy.firstCall.args[1]).to.deep.equal({
137+
qos: globalQos,
138+
});
139+
});
140+
141+
it('should override only qos while preserving other global subscribeOptions', () => {
142+
const serverWithOptions = new ServerMqtt({
143+
subscribeOptions: { qos: 1, nl: true, rap: false },
144+
});
145+
const untypedServerWithOptions = serverWithOptions as any;
146+
const pattern = 'commands/run';
147+
const handler = Object.assign(sinon.spy(), { extras: { qos: 2 } });
148+
untypedServerWithOptions.messageHandlers = objectToMap({
149+
[pattern]: handler,
150+
});
151+
serverWithOptions.bindEvents(mqttClient);
152+
expect(subscribeSpy.calledOnce).to.be.true;
153+
expect(subscribeSpy.firstCall.args[1]).to.deep.equal({
154+
qos: 2,
155+
nl: true,
156+
rap: false,
157+
});
158+
});
159+
160+
it('should apply different qos per handler when multiple handlers exist', () => {
161+
const serverWithOptions = new ServerMqtt({
162+
subscribeOptions: { qos: 1 },
163+
});
164+
const untypedServerWithOptions = serverWithOptions as any;
165+
166+
const handler0 = Object.assign(sinon.spy(), { extras: { qos: 0 } });
167+
const handler1 = Object.assign(sinon.spy(), { extras: {} });
168+
const handler2 = Object.assign(sinon.spy(), { extras: { qos: 2 } });
169+
170+
untypedServerWithOptions.messageHandlers = objectToMap({
171+
'telemetry/+': handler0,
172+
'events/#': handler1,
173+
'alerts/critical': handler2,
174+
});
175+
176+
serverWithOptions.bindEvents(mqttClient);
177+
178+
expect(subscribeSpy.callCount).to.equal(3);
179+
180+
const calls = subscribeSpy.getCalls();
181+
const callMap = new Map(calls.map(c => [c.args[0], c.args[1]]));
182+
183+
expect(callMap.get('telemetry/+')).to.deep.equal({ qos: 0 });
184+
expect(callMap.get('events/#')).to.deep.equal({ qos: 1 });
185+
expect(callMap.get('alerts/critical')).to.deep.equal({ qos: 2 });
186+
});
187+
});
96188
});
97189
describe('getMessageHandler', () => {
98190
it(`should return function`, () => {

0 commit comments

Comments
 (0)