diff --git a/README.md b/README.md index 6754ac7..231deb5 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,45 @@ When messages are received from the topic, those messages can be returned back t ```js pubsub.publish(SOMETHING_CHANGED_TOPIC, { somethingChanged: { id: "123" }}); ``` +## Resolving Subscription Callbacks +By default, MQTTPubSub will respond to subscriptions with the message in the form of a string or a JSON object, if the message can be parsed as valid JSON. +```typescript +const pubsub = new MQTTPubSub(); +const onMsg = (msg: string | Object) => { + console.log(msg); +} +pubsub.subscribe('Posts/#', onMsg); +pubsub.publish('Posts/A', 'Hello mosquitto.') +//Output: +//Hello mosquitto. +//But we don't know what topic sent the message (Posts/A, Posts/B, etc.) +``` +However, if you set the includeFullPacketInfo option to true when initializing your MQTTPubSub, the subscribtions will resolve with an object containing the full topic, the raw payload buffer, the mqtt packet info, and the message as string or JSON. This allows to have more complex subscriptions. For example, you can subscribe to the topic '#' and record all topics currently being published to the MQTT server. +```typescript +const pubsub = new MQTTPubSub({ includeFullPacketInfo: true }); +const onMsg = (mqttPacket: { topic: string, parsedMessage: string | Object, payload: Buffer, packet: Packet}) => { + console.log(`Recieved ${mqttPacket.parsedMessage} from topic ${mqttPacket.topic}`); +} +pubsub.subscribe('Posts/#', onMsg); +pubsub.publish('Posts/A', 'Hello mosquitto.') +//Output: +//Recieved Hello mosquitto. from topic Posts/A +``` +This output mode can also be used with the asyncIterator: +```javascript +export const resolvers = { + Subscription: { + somethingChanged: { + subscribe: () => pubsub.asyncIterator('topic'), + resolve: (mqttPacket) => { + // format info appropriately + return { topic: mqttPacket.topic, message: mqttPacket.parsedMessage } + } + } + } +} +``` ## Dynamically Create a Topic Based on Subscription Args Passed on the Query: ```javascript @@ -77,8 +115,14 @@ export const resolvers = { }, }, } +export const resolvers = { + Subscription: { + somethingChanged: { + subscribe: () => pubsub.asyncIterator(SOMETHING_CHANGED_TOPIC) + } + } +} ``` - ## Using Arguments and Payload to Filter Events ```javascript @@ -215,7 +259,7 @@ When `subscribe` is called like this: ```javascript const query = ` - subscription X($repoName: String!) { + subscription ($repoName: String!) { commentsAdded(repoName: $repoName) } `; diff --git a/package.json b/package.json index 07fee42..c00ef1d 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,11 @@ { "name": "graphql-mqtt-subscriptions", - "version": "1.2.0", + "version": "1.3.0", "description": "A graphql-subscriptions PubSub Engine using mqtt protocol", "main": "dist/index.js", "repository": { "type": "git", - "url": "https://github.com/davidyaha/graphql-mqtt-subscriptions.git" + "url": "https://github.com/henhen724/graphql-mqtt-subscriptions.git" }, "keywords": [ "graphql", @@ -13,12 +13,12 @@ "apollo", "subscriptions" ], - "author": "David Yahalomi", + "author": "David Yahalomi and Henry Hunt", "license": "MIT", "bugs": { - "url": "https://github.com/davidyaha/graphql-mqtt-subscriptions/issues" + "url": "https://github.com/henhen724/graphql-mqtt-subscriptions/issues" }, - "homepage": "https://github.com/davidyaha/graphql-mqtt-subscriptions", + "homepage": "https://github.com/henhen724/graphql-mqtt-subscriptions", "scripts": { "compile": "tsc --noUnusedParameters --noUnusedLocals", "pretest": "npm run compile", @@ -43,7 +43,7 @@ "@types/chai-as-promised": "0.0.30", "@types/graphql": "^0.9.0", "@types/mocha": "^2.2.33", - "@types/node": "7.0.18", + "@types/node": "^14.14.25", "@types/simple-mock": "0.0.27", "chai": "^3.5.0", "chai-as-promised": "^6.0.0", @@ -53,7 +53,7 @@ "remap-istanbul": "^0.9.5", "simple-mock": "^0.7.0", "tslint": "^5.2.0", - "typescript": "^2.3.4" + "typescript": "^4.1.5" }, "typings": "dist/index.d.ts", "typescript": { diff --git a/src/mqtt-pubsub.ts b/src/mqtt-pubsub.ts index 1153d6a..e0d21c0 100644 --- a/src/mqtt-pubsub.ts +++ b/src/mqtt-pubsub.ts @@ -1,5 +1,5 @@ import { PubSubEngine } from 'graphql-subscriptions/dist/pubsub-engine'; -import { connect, Client, ISubscriptionGrant, IClientPublishOptions, IClientSubscribeOptions } from 'mqtt'; +import { connect, Client, ISubscriptionGrant, IClientPublishOptions, IClientSubscribeOptions, Packet } from 'mqtt'; import { PubSubAsyncIterator } from './pubsub-async-iterator'; export interface PubSubMQTTOptions { @@ -10,7 +10,8 @@ export interface PubSubMQTTOptions { subscribeOptions?: SubscribeOptionsResolver; onMQTTSubscribe?: (id: number, granted: ISubscriptionGrant[]) => void; triggerTransform?: TriggerTransform; - parseMessageWithEncoding?: string; + parseMessageWithEncoding?: BufferEncoding; + includeFullPacketInfo?: boolean; } export class MQTTPubSub implements PubSubEngine { @@ -23,7 +24,8 @@ export class MQTTPubSub implements PubSubEngine { private subscriptionMap: { [subId: number]: [string, Function] }; private subsRefsMap: { [trigger: string]: Array }; private currentSubscriptionId: number; - private parseMessageWithEncoding: string; + private parseMessageWithEncoding: BufferEncoding; + private includeFullPacketInfo?: boolean; private static matches(pattern: string, topic: string) { const patternSegments = pattern.split('/'); @@ -75,6 +77,7 @@ export class MQTTPubSub implements PubSubEngine { this.publishOptionsResolver = options.publishOptions || (() => Promise.resolve({} as IClientPublishOptions)); this.subscribeOptionsResolver = options.subscribeOptions || (() => Promise.resolve({} as IClientSubscribeOptions)); this.parseMessageWithEncoding = options.parseMessageWithEncoding; + this.includeFullPacketInfo = options.includeFullPacketInfo; } public publish(trigger: string, payload: any): boolean { @@ -152,18 +155,17 @@ export class MQTTPubSub implements PubSubEngine { return new PubSubAsyncIterator(this, triggers); } - private onMessage(topic: string, message: Buffer) { + private onMessage(topic: string, payload: Buffer, packet: Packet) { const subscribers = [].concat( - ...Object.keys(this.subsRefsMap) + ...Object.keys(this.subsRefsMap) .filter((key) => MQTTPubSub.matches(key, topic)) .map((key) => this.subsRefsMap[key]), ); - // Don't work for nothing.. if (!subscribers || !subscribers.length) { return; } - const messageString = message.toString(this.parseMessageWithEncoding); + const messageString = payload.toString(this.parseMessageWithEncoding); let parsedMessage; try { parsedMessage = JSON.parse(messageString); @@ -173,7 +175,11 @@ export class MQTTPubSub implements PubSubEngine { for (const subId of subscribers) { const listener = this.subscriptionMap[subId][1]; - listener(parsedMessage); + if (this.includeFullPacketInfo) { + listener({ topic, payload, packet, parsedMessage }); + } else { + listener(parsedMessage); + } } } } diff --git a/src/test/tests.ts b/src/test/tests.ts index 304cba9..634f14c 100644 --- a/src/test/tests.ts +++ b/src/test/tests.ts @@ -244,6 +244,33 @@ describe('MQTTPubSub', function () { }); + it('can subscribe with fullPacketInfo', function (done) { + const pubSubPacketInfo = new MQTTPubSub({ + includeFullPacketInfo: true, + }); + + let sub; + const onMessage = ({ topic, payload, parsedMessage }) => { + pubSubPacketInfo.unsubscribe(sub); + + try { + expect(topic).to.equals('Posts'); + expect(payload).not.to.be.an('undefined'); + expect(parsedMessage).to.equals('test'); + done(); + } catch (e) { + done(e); + } + }; + + pubSubPacketInfo.subscribe('Posts', onMessage).then(subId => { + expect(subId).to.be.a('number'); + pubSubPacketInfo.publish('Posts', 'test'); + sub = subId; + }).catch(err => done(err)); + + }); + it('allows to change encodings of messages passed through MQTT broker', function (done) { const pubsub = new MQTTPubSub({ parseMessageWithEncoding: 'base64', diff --git a/yarn.lock b/yarn.lock index 88fd2b2..a4cb74c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -24,9 +24,10 @@ version "2.2.41" resolved "https://registry.yarnpkg.com/@types/mocha/-/mocha-2.2.41.tgz#e27cf0817153eb9f2713b2d3f6c68f1e1c3ca608" -"@types/node@7.0.18": - version "7.0.18" - resolved "https://registry.yarnpkg.com/@types/node/-/node-7.0.18.tgz#cd67f27d3dc0cfb746f0bdd5e086c4c5d55be173" +"@types/node@^14.14.25": + version "14.14.25" + resolved "https://registry.yarnpkg.com/@types/node/-/node-14.14.25.tgz#15967a7b577ff81383f9b888aa6705d43fbbae93" + integrity sha512-EPpXLOVqDvisVxtlbvzfyqSsFeQxltFbluZNRndIb8tr9KiBnYNLzrc1N3pyKUCww2RNrfHDViqDWWE1LCJQtQ== "@types/simple-mock@0.0.27": version "0.0.27" @@ -1567,9 +1568,10 @@ typedarray@^0.0.6: version "0.0.6" resolved "https://registry.yarnpkg.com/typedarray/-/typedarray-0.0.6.tgz#867ac74e3864187b1d3d47d996a78ec5c8830777" -typescript@^2.3.4: - version "2.4.1" - resolved "https://registry.yarnpkg.com/typescript/-/typescript-2.4.1.tgz#c3ccb16ddaa0b2314de031e7e6fee89e5ba346bc" +typescript@^4.1.5: + version "4.1.5" + resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.1.5.tgz#123a3b214aaff3be32926f0d8f1f6e704eb89a72" + integrity sha512-6OSu9PTIzmn9TCDiovULTnET6BgXtDYL4Gg4szY+cGsc3JP1dQL8qvE8kShTRx1NIw4Q9IBHlwODjkjWEtMUyA== uglify-js@^2.6: version "2.8.29"