Skip to content

Commit ce585a9

Browse files
committed
Merge branch 'main' into add_mqtt
2 parents 496ce7b + ac5d799 commit ce585a9

File tree

23 files changed

+603
-1298
lines changed

23 files changed

+603
-1298
lines changed

docs/generators/channels.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export default {
1212
preset: 'channels',
1313
outputPath: './src/__gen__/',
1414
language: 'typescript',
15-
protocols: ['nats']
15+
protocols: ['nats', 'kafka']
1616
}
1717
]
1818
};
@@ -26,7 +26,7 @@ This is supported through the following inputs: [`asyncapi`](#inputs)
2626

2727
It supports the following languages; [`typescript`](#typescript)
2828

29-
It supports the following protocols; [`nats`](../protocols/nats.md), [`mqtt`](../protocols/mqtt.md)
29+
It supports the following protocols; [`nats`](../protocols/nats.md), [`kafka`](../protocols/kafka.md), [`mqtt`](../protocols/mqtt.md)
3030

3131
## Options
3232
These are the available options for the `channels` generator;
@@ -41,6 +41,7 @@ These are the available options for the `channels` generator;
4141

4242
Depending on which protocol, these are the dependencies:
4343
- `NATS`: https://github.com/nats-io/nats.js v2
44+
- `Kafka`: https://github.com/tulios/kafkajs v2
4445
- `MQTT`: https://github.com/mqttjs/MQTT.js v5
4546

4647
For TypeScript what is generated is a single file that include functions to help easier interact with AsyncAPI channels. For example;

docs/protocols/kafka.md

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
---
2+
sidebar_position: 99
3+
---
4+
5+
# Kafka
6+
Kafka is currently supported through the following generators ([channels](../generators/channels.md)):
7+
8+
| **Languages** | Publish | Subscribe
9+
|---|---|---|
10+
| TypeScript | ✔️ | ✔️ |
11+
12+
All of this is available through [AsyncAPI](../inputs/asyncapi.md). If you use
13+
14+
## Channels
15+
Read more about the [channels](../generators/channels.md) generator here before continuing.
16+
17+
This generator provides support functions for each resource ensuring you the right payload and parameter are used.
18+
<table>
19+
<thead>
20+
<tr>
21+
<th>Input (AsyncAPI)</th>
22+
<th>Using the code</th>
23+
</tr>
24+
</thead>
25+
<tbody>
26+
<tr>
27+
<td>
28+
29+
```yaml
30+
asyncapi: 3.0.0
31+
info:
32+
title: Account Service
33+
version: 1.0.0
34+
description: This service is in charge of processing user signups
35+
channels:
36+
userSignups:
37+
address: user/signedup
38+
messages:
39+
userSignedup:
40+
$ref: '#/components/messages/UserSignedUp'
41+
operations:
42+
publishUserSignups:
43+
action: send
44+
channel:
45+
$ref: '#/channels/userSignups'
46+
consumeUserSignups:
47+
action: receive
48+
channel:
49+
$ref: '#/channels/userSignups'
50+
components:
51+
messages:
52+
UserSignedUp:
53+
payload:
54+
type: object
55+
properties:
56+
displayName:
57+
type: string
58+
description: Name of the user
59+
email:
60+
type: string
61+
format: email
62+
description: Email of the user
63+
64+
```
65+
</td>
66+
<td>
67+
68+
```ts
69+
import { Kafka } from 'kafkajs';
70+
// Location depends on the payload generator configurations
71+
import { UserSignedup } from './__gen__/payloads/UserSignedup';
72+
// Location depends on the channel generator configurations
73+
import { Protocols } from './__gen__/channels';
74+
const { kafka } = Protocols;
75+
const { consumeFromConsumeUserSignups, produceToPublishUserSignups } = kafka;
76+
77+
/**
78+
* Setup the regular client
79+
*/
80+
const kafkaClient = new Kafka({
81+
clientId: 'test',
82+
brokers: ['localhost:9093'],
83+
});
84+
85+
const myPayload = new UserSignedup({displayName: 'test', email: 'test@test.dk'});
86+
const myParameters = new UserSignedUpParameters({userId: 'test'});
87+
88+
// Consume the messages with the generated channel function
89+
const consumerCallback = async (
90+
err,
91+
msg: UserSignedUp | undefined,
92+
parameters: UserSignedUpParameters | undefined,
93+
kafkaMsg: EachMessagePayload | undefined
94+
) => {
95+
// Do stuff once you consumer from the topic
96+
};
97+
const consumer = await consumeFromConsumeUserSignups(
98+
consumerCallback,
99+
myParameters,
100+
kafkaClient,
101+
{
102+
fromBeginning: true,
103+
groupId: 'testId1'
104+
}
105+
);
106+
107+
// Produce the messages with the generated channel function
108+
const producer = await produceToPublishUserSignups(myPayload, myParameters, kafkaClient);
109+
```
110+
</td>
111+
</tr>
112+
</tbody>
113+
</table>

package.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,21 @@
115115
"runtime:typescript": "npm run runtime:typescript:setup && npm run runtime:typescript:test",
116116
"runtime:typescript:setup": "npm run runtime:prepare && npm run runtime:services:start && cd test/runtime/typescript && npm ci && npm run generate",
117117
"runtime:typescript:test": "cd test/runtime/typescript && npm run test",
118+
<<<<<<< HEAD
118119
"runtime:services:start": "npm run runtime:nats:start && npm run runtime:mqtt:start",
119120
"runtime:services:stop": "npm run runtime:nats:stop && npm run runtime:mqtt:stop",
120121
"runtime:nats:start": "cd test/runtime && docker compose -f ./docker-compose-nats.yml up -d",
121122
"runtime:nats:stop": "cd test/runtime && docker compose -f ./docker-compose-nats.yml down",
122123
"runtime:mqtt:start": "cd test/runtime && docker compose -f ./docker-compose-mqtt.yml up -d",
123124
"runtime:mqtt:stop": "cd test/runtime && docker compose -f ./docker-compose-mqtt.yml down",
125+
=======
126+
"runtime:services:start": "npm run runtime:nats:start && npm run runtime:kafka:start",
127+
"runtime:services:stop": "npm run runtime:nats:stop && npm run runtime:kafka:stop",
128+
"runtime:nats:start": "cd test/runtime && docker compose -f ./docker-compose-nats.yml up -d",
129+
"runtime:nats:stop": "cd test/runtime && docker compose -f ./docker-compose-nats.yml down",
130+
"runtime:kafka:start": "cd test/runtime && docker compose -f ./docker-compose-kafka.yml up -d",
131+
"runtime:kafka:stop": "cd test/runtime && docker compose -f ./docker-compose-kafka.yml down",
132+
>>>>>>> main
124133
"test:blackbox": "concurrently --group -n typescript \"npm run test:blackbox:typescript\"",
125134
"test:blackbox:typescript": "cross-env CI=true jest ./test/blackbox/typescript.spec.ts"
126135
},

src/codegen/generators/helpers/payloads.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,24 +159,21 @@ export function findReplyId(
159159
reply: OperationReplyInterface,
160160
channel: ChannelInterface
161161
) {
162-
return (
163-
(reply.json() as any)?.id ??
164-
`${findOperationId(operation, reply.channel() ?? channel)}_reply`
165-
);
162+
return `${findOperationId(operation, reply.channel() ?? channel)}_reply`;
166163
}
167164
export function findOperationId(
168165
operation: OperationInterface,
169166
channel: ChannelInterface
170167
) {
171-
let operationId = operation.id();
172-
operationId = operation.hasOperationId()
173-
? operation.operationId()
174-
: operationId;
175168
const userSpecificName = findExtensionObject(operation)
176169
? findExtensionObject(operation)['channelName']
177170
: undefined;
178171
if (userSpecificName) {
179172
return userSpecificName;
180173
}
174+
let operationId = operation.id();
175+
operationId = operation.hasOperationId()
176+
? operation.operationId()
177+
: operationId;
181178
return operationId ?? channel.id();
182179
}

src/codegen/generators/typescript/channels/asyncapi.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ const sendingFunctionTypes = [
55
ChannelFunctionTypes.NATS_JETSTREAM_PUBLISH,
66
ChannelFunctionTypes.NATS_PUBLISH,
77
ChannelFunctionTypes.NATS_REQUEST,
8-
ChannelFunctionTypes.MQTT_PUBLISH
8+
ChannelFunctionTypes.MQTT_PUBLISH,
9+
ChannelFunctionTypes.KAFKA_PUBLISH
910
];
1011
const receivingFunctionTypes = [
1112
ChannelFunctionTypes.NATS_JETSTREAM_PULL_SUBSCRIBE,
1213
ChannelFunctionTypes.NATS_JETSTREAM_PUSH_SUBSCRIBE,
1314
ChannelFunctionTypes.NATS_REPLY,
14-
ChannelFunctionTypes.NATS_SUBSCRIBE
15+
ChannelFunctionTypes.NATS_SUBSCRIBE,
16+
ChannelFunctionTypes.KAFKA_SUBSCRIBE
1517
];
1618

1719
// eslint-disable-next-line sonarjs/cognitive-complexity

src/codegen/generators/typescript/channels/index.ts

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import {
4242
addPayloadsToDependencies,
4343
getMessageTypeAndModule
4444
} from './utils';
45+
import * as KafkaRenderer from './protocols/kafka';
4546
export {
4647
renderedFunctionType,
4748
TypeScriptChannelRenderType,
@@ -367,7 +368,111 @@ export async function generateTypeScriptChannels(
367368
dependencies.push(...(new Set(renderedDependencies) as any));
368369
break;
369370
}
371+
case 'kafka': {
372+
// AsyncAPI v2 explicitly say to use RFC 6570 URI template, Kafka does not support '/' subject separation, so lets replace it
373+
let topic = simpleContext.topic;
374+
if (topic.startsWith('/')) {
375+
topic = topic.slice(1);
376+
}
377+
topic = topic.replace(/\//g, generator.kafkaTopicSeparator);
378+
let kafkaContext: RenderRegularParameters = {
379+
...simpleContext,
380+
topic,
381+
messageType: ''
382+
};
383+
const renders = [];
384+
const payload = payloads.channelModels[channel.id()];
385+
if (payload === undefined) {
386+
throw new Error(
387+
`Could not find payload for ${channel.id()} for channel typescript generator`
388+
);
389+
}
390+
const {messageModule, messageType} =
391+
getMessageTypeAndModule(payload);
392+
kafkaContext = {...kafkaContext, messageType, messageModule};
393+
const operations = channel.operations().all();
394+
if (operations.length > 0 && !ignoreOperation) {
395+
for (const operation of operations) {
396+
const payloadId = findOperationId(operation, channel);
397+
const payload = payloads.operationModels[payloadId];
398+
if (payload === undefined) {
399+
throw new Error(
400+
`Could not find payload for ${payloadId} for channel typescript generator ${JSON.stringify(payloads.operationModels, null, 4)}`
401+
);
402+
}
403+
const {messageModule, messageType} =
404+
getMessageTypeAndModule(payload);
405+
kafkaContext = {
406+
...kafkaContext,
407+
messageType,
408+
messageModule,
409+
subName: findNameFromOperation(operation, channel)
410+
};
411+
const action = operation.action();
412+
if (
413+
shouldRenderFunctionType(
414+
functionTypeMapping,
415+
ChannelFunctionTypes.KAFKA_PUBLISH,
416+
action,
417+
generator.asyncapiReverseOperations
418+
)
419+
) {
420+
renders.push(KafkaRenderer.renderPublish(kafkaContext));
421+
}
422+
if (
423+
shouldRenderFunctionType(
424+
functionTypeMapping,
425+
ChannelFunctionTypes.KAFKA_SUBSCRIBE,
426+
action,
427+
generator.asyncapiReverseOperations
428+
)
429+
) {
430+
renders.push(KafkaRenderer.renderSubscribe(kafkaContext));
431+
}
432+
}
433+
} else {
434+
if (
435+
shouldRenderFunctionType(
436+
functionTypeMapping,
437+
ChannelFunctionTypes.KAFKA_PUBLISH,
438+
'send',
439+
generator.asyncapiReverseOperations
440+
)
441+
) {
442+
renders.push(KafkaRenderer.renderPublish(kafkaContext));
443+
}
444+
if (
445+
shouldRenderFunctionType(
446+
functionTypeMapping,
447+
ChannelFunctionTypes.KAFKA_SUBSCRIBE,
448+
'receive',
449+
generator.asyncapiReverseOperations
450+
)
451+
) {
452+
renders.push(KafkaRenderer.renderSubscribe(kafkaContext));
453+
}
454+
}
455+
protocolCodeFunctions[protocol].push(
456+
...renders.map((value) => value.code)
457+
);
370458

459+
externalProtocolFunctionInformation[protocol].push(
460+
...renders.map((value) => {
461+
return {
462+
functionType: value.functionType,
463+
functionName: value.functionName,
464+
messageType: value.messageType,
465+
replyType: value.replyType,
466+
parameterType: parameter?.model?.type
467+
};
468+
})
469+
);
470+
const renderedDependencies = renders
471+
.map((value) => value.dependencies)
472+
.flat(Infinity);
473+
dependencies.push(...(new Set(renderedDependencies) as any));
474+
break;
475+
}
371476
case 'mqtt': {
372477
const topic = simpleContext.topic;
373478
let natsContext: RenderRegularParameters = {
@@ -440,7 +545,7 @@ export async function generateTypeScriptChannels(
440545
messageType: value.messageType,
441546
replyType: value.replyType,
442547
parameterType: parameter?.model?.type
443-
} as renderedFunctionType;
548+
};
444549
})
445550
);
446551
const renderedDependencies = renders
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export {renderPublish} from './publish';
2+
export {renderSubscribe} from './subscribe';

0 commit comments

Comments
 (0)