-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcreate-kafka-topic.js
More file actions
67 lines (56 loc) · 1.66 KB
/
create-kafka-topic.js
File metadata and controls
67 lines (56 loc) · 1.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Create Kafka Topic for OpsGuardian
*
* Usage: Configure .env file with Kafka credentials, then run:
* node create-kafka-topic.js
*/
import { Kafka } from 'kafkajs';
import dotenv from 'dotenv';
dotenv.config();
const kafka = new Kafka({
clientId: 'opsguardian-admin',
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
ssl: process.env.KAFKA_SSL === 'true' ? {
rejectUnauthorized: false,
} : false,
sasl: process.env.KAFKA_SASL_USERNAME ? {
mechanism: 'plain',
username: process.env.KAFKA_SASL_USERNAME,
password: process.env.KAFKA_SASL_PASSWORD,
} : undefined,
connectionTimeout: 10000,
requestTimeout: 30000,
});
async function createTopic() {
const admin = kafka.admin();
try {
console.log('🔌 Connecting to Kafka...');
await admin.connect();
console.log('✅ Connected to Kafka');
console.log('📋 Checking existing topics...');
const topics = await admin.listTopics();
console.log('Existing topics:', topics);
const topicName = 'system-logs';
if (topics.includes(topicName)) {
console.log(`✅ Topic "${topicName}" already exists`);
} else {
console.log(`📝 Creating topic "${topicName}"...`);
await admin.createTopics({
topics: [
{
topic: topicName,
numPartitions: 3,
replicationFactor: 2,
},
],
});
console.log(`✅ Topic "${topicName}" created successfully`);
}
await admin.disconnect();
console.log('👋 Disconnected from Kafka');
} catch (error) {
console.error('❌ Error:', error.message);
process.exit(1);
}
}
createTopic();