30
30
import stroom .pipeline .filter .AbstractXMLFilter ;
31
31
import stroom .pipeline .shared .data .PipelineElementType ;
32
32
import stroom .svg .shared .SvgImage ;
33
+ import stroom .util .CharBuffer ;
33
34
import stroom .util .logging .LambdaLogger ;
34
35
import stroom .util .logging .LambdaLoggerFactory ;
35
36
import stroom .util .shared .Severity ;
51
52
import java .time .format .DateTimeParseException ;
52
53
import java .util .ArrayDeque ;
53
54
import java .util .ArrayList ;
55
+ import java .util .Arrays ;
54
56
import java .util .List ;
55
57
import java .util .Queue ;
56
58
import java .util .concurrent .ExecutionException ;
@@ -94,6 +96,7 @@ class StandardKafkaProducer extends AbstractXMLFilter {
94
96
private final LocationFactoryProxy locationFactory ;
95
97
private final KafkaProducerFactory stroomKafkaProducerFactory ;
96
98
private final Queue <Future <RecordMetadata >> kafkaMetaFutures ;
99
+ private final CharBuffer content = new CharBuffer ();
97
100
98
101
private Locator locator = null ;
99
102
private DocRef configRef = null ;
@@ -143,7 +146,7 @@ public void startProcessing() {
143
146
144
147
kafkaProducer = sharedKafkaProducer .getKafkaProducer ().orElseThrow (() -> {
145
148
log (Severity .FATAL_ERROR , "No Kafka produce exists for config " + configRef , null );
146
- throw LoggedException .create ("Unable to create Kafka Producer using config " + configRef );
149
+ return LoggedException .create ("Unable to create Kafka Producer using config " + configRef );
147
150
});
148
151
} catch (KafkaException ex ) {
149
152
log (Severity .FATAL_ERROR , "Unable to create Kafka Producer using config " + configRef .getUuid (), ex );
@@ -210,19 +213,19 @@ public void startElement(final String uri, final String localName, final String
210
213
if (xmlValueDepth == -1 ) {
211
214
final ErrorListener errorListener = new ErrorListener () {
212
215
@ Override
213
- public void warning (TransformerException exception ) throws TransformerException {
216
+ public void warning (TransformerException exception ) {
214
217
errorReceiverProxy .log (Severity .WARNING , locationFactory .create (locator ), getElementId (),
215
218
"Kafka XML value parse error" , exception );
216
219
}
217
220
218
221
@ Override
219
- public void error (TransformerException exception ) throws TransformerException {
222
+ public void error (TransformerException exception ) {
220
223
errorReceiverProxy .log (Severity .ERROR , locationFactory .create (locator ), getElementId (),
221
224
"Kafka XML value parse error" , exception );
222
225
}
223
226
224
227
@ Override
225
- public void fatalError (TransformerException exception ) throws TransformerException {
228
+ public void fatalError (TransformerException exception ) {
226
229
errorReceiverProxy .log (Severity .FATAL_ERROR , locationFactory .create (locator ), getElementId (),
227
230
"Kafka XML value parse error" , exception );
228
231
}
@@ -271,73 +274,76 @@ public void fatalError(TransformerException exception) throws TransformerExcepti
271
274
}
272
275
}
273
276
274
- if (HEADER_ELEMENT_LOCAL_NAME .equals (localName )) {
275
- state .inHeader = true ;
276
- }
277
-
278
277
if (state != null ) {
278
+ if (HEADER_ELEMENT_LOCAL_NAME .equals (localName )) {
279
+ state .inHeader = true ;
280
+ }
279
281
state .lastElement = localName ;
280
282
}
281
283
}
282
284
283
-
285
+ content . clear ();
284
286
super .startElement (uri , localName , qName , atts );
285
287
}
286
288
287
289
@ Override
288
- public void characters (char [] ch , int start , int length ) throws SAXException {
289
- String val = new String (ch , start , length );
290
- String element = state .lastElement ;
291
- if (KEY_ELEMENT_LOCAL_NAME .equals (element )) {
292
- if (state .inHeader ) {
293
- state .headerNames .add (val );
294
- } else {
295
- state .key = val ;
296
- }
297
- } else if (VALUE_ELEMENT_LOCAL_NAME .equals (element )) {
298
- if (state .inHeader ) {
299
- state .headerVals .add (val );
300
- } else if (xmlValueDepth >= 0 ) {
301
- xmlValueHandler .characters (ch , start , length );
302
- } else {
303
- state .messageValue = val .getBytes (StandardCharsets .UTF_8 );
290
+ public void endElement (final String uri , final String localName , final String qName ) throws SAXException {
291
+ if (state != null ) {
292
+ if (KEY_ELEMENT_LOCAL_NAME .equals (localName )) {
293
+ if (state .inHeader ) {
294
+ state .headerNames .add (content .toString ());
295
+ } else {
296
+ state .key = content .toString ();
297
+ }
298
+ } else if (VALUE_ELEMENT_LOCAL_NAME .equals (localName )) {
299
+ if (state .inHeader ) {
300
+ state .headerVals .add (content .toString ());
301
+ } else if (xmlValueDepth < 0 ) {
302
+ state .messageValue = content .toString ().getBytes (StandardCharsets .UTF_8 );
303
+ }
304
304
}
305
305
}
306
- super .characters (ch , start , length );
307
- }
308
-
309
-
310
- @ Override
311
- public void endElement (final String uri , final String localName , final String qName ) throws SAXException {
312
306
313
307
if (xmlValueDepth == 0 ) {
314
308
if (VALUE_ELEMENT_LOCAL_NAME .equals (localName )) {
315
- //Create the val from the contents of XML handler buffer
309
+ // Create the val from the contents of XML handler buffer
316
310
xmlValueHandler .endDocument ();
317
311
state .messageValue = outputStream .toByteArray ();
318
312
xmlValueDepth = -1 ;
319
313
} else {
320
314
throw new SAXException ("Unexpected tag " + localName + " in kafka message value." );
321
315
}
322
316
} else if (xmlValueDepth > 0 ) {
323
- //Closing an XML value element
317
+ // Closing an XML value element
324
318
xmlValueHandler .endElement (uri , localName , qName );
325
319
xmlValueDepth --;
326
- } else {
327
- if (state != null ) {
328
- state .lastElement = null ;
329
- }
330
320
321
+ } else if (state != null ) {
322
+ state .lastElement = null ;
331
323
if (HEADER_ELEMENT_LOCAL_NAME .equals (localName )) {
332
324
state .inHeader = false ;
333
325
} else if (RECORD_ELEMENT_LOCAL_NAME .equals (localName )) {
334
326
createKafkaMessage (state );
335
327
state = null ;
336
328
}
337
329
}
330
+
331
+ content .clear ();
338
332
super .endElement (uri , localName , qName );
339
333
}
340
334
335
+ @ Override
336
+ public void characters (char [] ch , int start , int length ) throws SAXException {
337
+ if (state != null && VALUE_ELEMENT_LOCAL_NAME .equals (state .lastElement )) {
338
+ if (!state .inHeader && xmlValueDepth >= 0 ) {
339
+ xmlValueHandler .characters (ch , start , length );
340
+ }
341
+ }
342
+
343
+ content .append (ch , start , length );
344
+ super .characters (ch , start , length );
345
+ }
346
+
341
347
private void createKafkaMessage (KafkaMessageState state ) {
342
348
343
349
if (state .isInvalid ()) {
@@ -384,7 +390,7 @@ private void logState(final KafkaMessageState state) {
384
390
}
385
391
stringBuilder
386
392
.append (" Value: " )
387
- .append (state .messageValue );
393
+ .append (Arrays . toString ( state .messageValue ) );
388
394
389
395
log (Severity .INFO , stringBuilder .toString (), null );
390
396
}
@@ -400,7 +406,7 @@ public void setKafkaConfig(final DocRef configRef) {
400
406
@ SuppressWarnings ("unused" )
401
407
@ PipelineProperty (
402
408
description = "At the end of the stream, wait for acknowledgement from the Kafka broker for all " +
403
- "the messages sent. This ensures errors are caught in the pipeline process." ,
409
+ "the messages sent. This ensures errors are caught in the pipeline process." ,
404
410
defaultValue = "true" ,
405
411
displayPriority = 2 )
406
412
public void setFlushOnSend (final boolean flushOnSend ) {
0 commit comments