1- const winston = require ( 'winston' ) ,
2- kafka = require ( 'kafka-node' ) ,
1+ const Transport = require ( 'winston-transport' ) ,
32 _ = require ( 'lodash' ) ,
4- HighLevelProducer = kafka . HighLevelProducer ,
3+ { Kafka, CompressionTypes } = require ( 'kafkajs' ) ,
4+ config = require ( '../envVariables' ) ,
55 defaultOptions = {
66 kafkaHost : 'localhost:9092' ,
77 maxAsyncRequests : 100 ,
88 topic : 'local.ingestion' ,
99 compression_type : 'none'
1010 } ;
1111
12- class KafkaDispatcher extends winston . Transport {
12+ function mapCompressionAttr ( attr ) {
13+ // kafka-node used numeric attributes: 0 = none, 1 = gzip, 2 = snappy
14+ if ( attr === 2 ) return CompressionTypes . Snappy ;
15+ if ( attr === 1 ) return CompressionTypes . GZIP ;
16+ return CompressionTypes . None ;
17+ }
18+
19+ class KafkaDispatcher extends Transport {
1320 constructor ( options ) {
14- super ( ) ;
21+ super ( options ) ;
1522 this . name = 'kafka' ;
1623 this . options = _ . assignInWith ( defaultOptions , options , ( objValue , srcValue ) => srcValue ? srcValue : objValue ) ;
17- if ( this . options . compression_type == 'snappy' ) {
24+ if ( this . options . compression_type === 'snappy' ) {
1825 this . compression_attribute = 2 ;
19- } else if ( this . options . compression_type == 'gzip' ) {
26+ } else if ( this . options . compression_type === 'gzip' ) {
2027 this . compression_attribute = 1 ;
2128 } else {
2229 this . compression_attribute = 0 ;
2330 }
24- this . client = new kafka . KafkaClient ( {
25- kafkaHost : this . options . kafkaHost ,
26- maxAsyncRequests : this . options . maxAsyncRequests
27- } ) ;
28- this . producer = new HighLevelProducer ( this . client ) ;
29- this . producer . on ( 'ready' , ( ) => console . log ( 'kafka dispatcher is ready' ) ) ;
30- this . producer . on ( 'error' , ( err ) => console . error ( 'Unable to connect to kafka' , err ) ) ;
31+
32+ // kafkajs expects an array of broker strings
33+ const brokers = ( typeof this . options . kafkaHost === 'string' ) ? [ this . options . kafkaHost ] : this . options . kafkaHost ;
34+ this . _kafka = new Kafka ( { brokers } ) ;
35+ this . _producer = this . _kafka . producer ( ) ;
36+ this . _admin = this . _kafka . admin ( ) ;
37+ this . _producerConnected = false ;
38+
39+ // Backwards-compatible lightweight wrappers so existing code/tests that
40+ // expect producer.send(payloads, cb) and client.topicExists(topic, cb)
41+ // continue to work.
42+ this . producer = {
43+ send : ( payloads , cb ) => {
44+ // payloads is an array like [{ topic, key, messages, attributes, partition }]
45+ const topicMessages = payloads . map ( p => {
46+ const msg = { key : p . key , value : p . messages } ;
47+ if ( p . hasOwnProperty ( 'partition' ) ) msg . partition = p . partition ;
48+ return {
49+ topic : p . topic ,
50+ messages : [ msg ] ,
51+ compression : mapCompressionAttr ( p . attributes )
52+ } ;
53+ } ) ;
54+
55+ // ensure producer is connected, then send batch
56+ const sendPromise = this . _producerConnected
57+ ? Promise . resolve ( )
58+ : this . _producer . connect ( ) . then ( ( ) => { this . _producerConnected = true ; } ) ;
59+
60+ sendPromise
61+ . then ( ( ) => this . _producer . sendBatch ( { topicMessages } ) )
62+ . then ( ( ) => { if ( cb ) cb ( ) ; } )
63+ . catch ( err => { if ( cb ) cb ( err ) ; } ) ;
64+ }
65+ } ;
66+
67+ this . client = {
68+ topicExists : ( topic , cb ) => {
69+ // kafkajs admin.fetchTopicMetadata throws if topic doesn't exist
70+ this . _admin . connect ( )
71+ . then ( ( ) => this . _admin . fetchTopicMetadata ( { topics : [ topic ] } ) )
72+ . then ( ( ) => this . _admin . disconnect ( ) )
73+ . then ( ( ) => cb && cb ( null ) )
74+ . catch ( err => {
75+ // ensure disconnect
76+ this . _admin . disconnect ( ) . catch ( ( ) => { } ) ;
77+ cb && cb ( err ) ;
78+ } ) ;
79+ }
80+ } ;
81+
82+ // log basic connection info asynchronously
83+ this . _producer . connect ( )
84+ . then ( ( ) => {
85+ this . _producerConnected = true ;
86+ console . log ( 'kafka dispatcher producer connected' ) ;
87+ } )
88+ . catch ( err => console . error ( 'Unable to connect kafka producer' , err ) ) ;
89+ this . _admin . connect ( )
90+ . then ( ( ) => this . _admin . disconnect ( ) )
91+ . catch ( ( ) => { } ) ;
3192 }
32- log ( level , msg , meta , callback ) {
93+
94+ log ( info , callback ) {
95+ // Modern winston 3.x transport signature: log(info, callback)
96+ // info contains: level, message, and other metadata
97+ // msg/message is expected to be a JSON string. Inject a top-level dataset key
98+ // from configuration if provided and not already present.
99+ const msg = info . message ;
100+ let outgoing = msg ;
101+ try {
102+ if ( typeof msg === 'string' ) {
103+ const parsed = JSON . parse ( msg ) ;
104+ if ( parsed && typeof parsed === 'object' ) {
105+ if ( config . dataset && ! parsed . hasOwnProperty ( 'dataset' ) ) {
106+ parsed . dataset = config . dataset ;
107+ }
108+ outgoing = JSON . stringify ( parsed ) ;
109+ }
110+ }
111+ } catch ( e ) {
112+ // if parsing fails, leave the message as-is
113+ }
114+
33115 this . producer . send ( [ {
34116 topic : this . options . topic ,
35- key : meta . mid ,
36- messages : msg ,
117+ key : info . mid ,
118+ messages : outgoing ,
37119 attributes : this . compression_attribute
38120 } ] , callback ) ;
39121 }
122+
40123 health ( callback ) {
41124 this . client . topicExists ( this . options . topic , ( err ) => {
42125 if ( err ) callback ( false ) ;
@@ -45,6 +128,4 @@ class KafkaDispatcher extends winston.Transport {
45128 }
46129}
47130
48- winston . transports . Kafka = KafkaDispatcher ;
49-
50131module . exports = { KafkaDispatcher } ;
0 commit comments