Skip to content

Commit 7506eee

Browse files
committed
Merge branch '7.7' of github.com:gchq/stroom into 7.8
2 parents 88f0ba0 + bcc684b commit 7506eee

File tree

17 files changed

+291
-264
lines changed

17 files changed

+291
-264
lines changed

Diff for: stroom-annotation/stroom-annotation-pipeline/src/main/java/stroom/annotation/pipeline/AnnotationWriter.java

+23-40
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import stroom.pipeline.filter.AbstractXMLFilter;
2828
import stroom.pipeline.shared.data.PipelineElementType;
2929
import stroom.svg.shared.SvgImage;
30+
import stroom.util.CharBuffer;
3031
import stroom.util.shared.Severity;
3132

3233
import jakarta.inject.Inject;
@@ -90,20 +91,19 @@ class AnnotationWriter extends AbstractXMLFilter {
9091
private final LocationFactoryProxy locationFactory;
9192
private final ErrorReceiverProxy errorReceiverProxy;
9293
private final AnnotationCreator annotationCreator;
94+
private final CharBuffer content = new CharBuffer();
9395
private Locator locator;
9496

95-
private String lastTag = null;
96-
private Annotation currentAnnotation = null;
97+
private Annotation currentAnnotation = new Annotation();
9798
private String lastEventId = null;
9899
private String lastStreamId = null;
99100

100-
private ArrayList<EventId> currentEventIds = null;
101+
private ArrayList<EventId> currentEventIds = new ArrayList<>();
101102

102103
@Inject
103104
AnnotationWriter(final AnnotationCreator annotationCreator,
104105
final ErrorReceiverProxy errorReceiverProxy,
105-
final LocationFactoryProxy locationFactory
106-
) {
106+
final LocationFactoryProxy locationFactory) {
107107
this.errorReceiverProxy = errorReceiverProxy;
108108
this.locationFactory = locationFactory;
109109
this.annotationCreator = annotationCreator;
@@ -124,37 +124,12 @@ public void setDocumentLocator(final Locator locator) {
124124
@Override
125125
public void startElement(final String uri, final String localName, final String qName, final Attributes atts)
126126
throws SAXException {
127-
128-
lastTag = localName;
129-
130-
if (ANNOTATION_TAG.equals(localName)) {
131-
currentAnnotation = new Annotation();
132-
currentEventIds = new ArrayList<>();
133-
}
134-
127+
content.clear();
135128
super.startElement(uri, localName, qName, atts);
136129
}
137130

138-
@Override
139-
public void characters(char[] ch, int start, int length) throws SAXException {
140-
String val = new String(ch, start, length);
141-
142-
if (TITLE_TAG.equals(lastTag)) {
143-
currentAnnotation.setTitle(val);
144-
} else if (DESCRIPTION_TAG.equals(lastTag)) {
145-
currentAnnotation.setSubject(val);
146-
} else if (EVENTID_TAG.equals(lastTag)) {
147-
lastEventId = val;
148-
} else if (STREAMID_TAG.equals(lastTag)) {
149-
lastStreamId = val;
150-
}
151-
152-
super.characters(ch, start, length);
153-
}
154-
155131
@Override
156132
public void endElement(final String uri, final String localName, final String qName) throws SAXException {
157-
158133
if (EVENT_TAG.equals(localName)) {
159134
if (lastStreamId == null) {
160135
log(Severity.ERROR, "StreamId must be a long integer, but none provided ", null);
@@ -177,13 +152,7 @@ public void endElement(final String uri, final String localName, final String qN
177152
}
178153
}
179154
}
180-
} else if (ANNOTATION_TAG.equals(localName) && currentAnnotation != null) {
181-
CreateEntryRequest request = new CreateEntryRequest(
182-
currentAnnotation,
183-
Annotation.COMMENT,
184-
(StringEntryValue) null,
185-
currentEventIds);
186-
155+
} else if (ANNOTATION_TAG.equals(localName)) {
187156
try {
188157
annotationCreator.createEntry(new CreateEntryRequest(
189158
currentAnnotation,
@@ -194,13 +163,27 @@ public void endElement(final String uri, final String localName, final String qN
194163
} catch (final RuntimeException e) {
195164
log(Severity.ERROR, "Unable to create annotation " + currentAnnotation.getSubject(), e);
196165
}
197-
currentAnnotation = null;
198-
currentEventIds = null;
166+
currentAnnotation = new Annotation();
167+
currentEventIds = new ArrayList<>();
168+
} else if (TITLE_TAG.equals(localName)) {
169+
currentAnnotation.setTitle(content.toString());
170+
} else if (DESCRIPTION_TAG.equals(localName)) {
171+
currentAnnotation.setSubject(content.toString());
172+
} else if (EVENTID_TAG.equals(localName)) {
173+
lastEventId = content.toString();
174+
} else if (STREAMID_TAG.equals(localName)) {
175+
lastStreamId = content.toString();
199176
}
200177

178+
content.clear();
201179
super.endElement(uri, localName, qName);
202180
}
203181

182+
@Override
183+
public void characters(char[] ch, int start, int length) throws SAXException {
184+
content.append(ch, start, length);
185+
super.characters(ch, start, length);
186+
}
204187

205188
private void log(final Severity severity, final String message, final Exception e) {
206189
errorReceiverProxy.log(severity, locationFactory.create(locator), getElementId(), message, e);

Diff for: stroom-core-client/src/main/java/stroom/dashboard/client/DashboardPlugin.java

+20-19
Original file line numberDiff line numberDiff line change
@@ -167,25 +167,26 @@ private void reopen(final ResultStoreInfo resultStoreInfo) {
167167
final SearchRequestSource source = resultStoreInfo.getSearchRequestSource();
168168
if (source != null && SourceType.DASHBOARD_UI.equals(source.getSourceType())) {
169169
final DocRef docRef = source.getOwnerDocRef();
170-
171-
// If the item isn't already open but we are forcing it open then,
172-
// create a new presenter and register it as open.
173-
final DashboardSuperPresenter presenter = dashboardSuperPresenterProvider.get();
174-
presenter.setResultStoreInfo(resultStoreInfo);
175-
176-
// Load the document and show the tab.
177-
final CloseContentEvent.Handler closeHandler = event -> {
178-
// Tell the presenter we are closing.
179-
presenter.onClose();
180-
// Actually close the tab.
181-
event.getCallback().closeTab(true);
182-
};
183-
showDocument(docRef,
184-
presenter,
185-
closeHandler,
186-
presenter,
187-
false,
188-
new DefaultTaskMonitorFactory(this));
170+
if (docRef != null) {
171+
// If the item isn't already open but we are forcing it open then,
172+
// create a new presenter and register it as open.
173+
final DashboardSuperPresenter presenter = dashboardSuperPresenterProvider.get();
174+
presenter.setResultStoreInfo(resultStoreInfo);
175+
176+
// Load the document and show the tab.
177+
final CloseContentEvent.Handler closeHandler = event -> {
178+
// Tell the presenter we are closing.
179+
presenter.onClose();
180+
// Actually close the tab.
181+
event.getCallback().closeTab(true);
182+
};
183+
showDocument(docRef,
184+
presenter,
185+
closeHandler,
186+
presenter,
187+
false,
188+
new DefaultTaskMonitorFactory(this));
189+
}
189190
}
190191
}
191192

Diff for: stroom-dashboard/stroom-dashboard-impl/src/main/java/stroom/dashboard/impl/DashboardServiceImpl.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -365,13 +365,18 @@ private String getResultsFilename(final DownloadSearchResultsRequest request) {
365365

366366
private String getQueryFileName(final DashboardSearchRequest request) {
367367
final SearchRequestSource searchRequestSource = request.getSearchRequestSource();
368-
final DocRefInfo dashDocRefInfo = dashboardStore.info(searchRequestSource.getOwnerDocRef());
369-
final String dashboardName = NullSafe.getOrElse(
370-
dashDocRefInfo,
371-
DocRefInfo::getDocRef,
372-
DocRef::getName,
373-
searchRequestSource.getOwnerDocRef().getName());
374-
final String basename = dashboardName + "__" + searchRequestSource.getComponentId();
368+
String basename = searchRequestSource.getComponentId();
369+
if (searchRequestSource.getOwnerDocRef() != null) {
370+
final DocRefInfo dashDocRefInfo = dashboardStore.info(searchRequestSource.getOwnerDocRef());
371+
final String dashboardName = NullSafe.getOrElse(
372+
dashDocRefInfo,
373+
DocRefInfo::getDocRef,
374+
DocRef::getName,
375+
searchRequestSource.getOwnerDocRef().getName());
376+
if (dashboardName != null) {
377+
basename = dashboardName + "__" + searchRequestSource.getComponentId();
378+
}
379+
}
375380
return getFileName(basename, "json");
376381
}
377382

@@ -504,7 +509,8 @@ private void storeSearchHistory(final DashboardSearchRequest request) {
504509
final SearchRequestSource searchRequestSource = request.getSearchRequestSource();
505510
final StoredQuery storedQuery = new StoredQuery();
506511
storedQuery.setName("History");
507-
storedQuery.setDashboardUuid(searchRequestSource.getOwnerDocRef().getUuid());
512+
storedQuery.setDashboardUuid(NullSafe
513+
.get(searchRequestSource, SearchRequestSource::getOwnerDocRef, DocRef::getUuid));
508514
storedQuery.setComponentId(searchRequestSource.getComponentId());
509515
storedQuery.setQuery(query);
510516
queryService.create(storedQuery);

Diff for: stroom-kafka/stroom-kafka-impl/src/main/java/stroom/kafka/pipeline/StandardKafkaProducer.java

+45-39
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import stroom.pipeline.filter.AbstractXMLFilter;
3131
import stroom.pipeline.shared.data.PipelineElementType;
3232
import stroom.svg.shared.SvgImage;
33+
import stroom.util.CharBuffer;
3334
import stroom.util.logging.LambdaLogger;
3435
import stroom.util.logging.LambdaLoggerFactory;
3536
import stroom.util.shared.Severity;
@@ -51,6 +52,7 @@
5152
import java.time.format.DateTimeParseException;
5253
import java.util.ArrayDeque;
5354
import java.util.ArrayList;
55+
import java.util.Arrays;
5456
import java.util.List;
5557
import java.util.Queue;
5658
import java.util.concurrent.ExecutionException;
@@ -94,6 +96,7 @@ class StandardKafkaProducer extends AbstractXMLFilter {
9496
private final LocationFactoryProxy locationFactory;
9597
private final KafkaProducerFactory stroomKafkaProducerFactory;
9698
private final Queue<Future<RecordMetadata>> kafkaMetaFutures;
99+
private final CharBuffer content = new CharBuffer();
97100

98101
private Locator locator = null;
99102
private DocRef configRef = null;
@@ -143,7 +146,7 @@ public void startProcessing() {
143146

144147
kafkaProducer = sharedKafkaProducer.getKafkaProducer().orElseThrow(() -> {
145148
log(Severity.FATAL_ERROR, "No Kafka produce exists for config " + configRef, null);
146-
throw LoggedException.create("Unable to create Kafka Producer using config " + configRef);
149+
return LoggedException.create("Unable to create Kafka Producer using config " + configRef);
147150
});
148151
} catch (KafkaException ex) {
149152
log(Severity.FATAL_ERROR, "Unable to create Kafka Producer using config " + configRef.getUuid(), ex);
@@ -210,19 +213,19 @@ public void startElement(final String uri, final String localName, final String
210213
if (xmlValueDepth == -1) {
211214
final ErrorListener errorListener = new ErrorListener() {
212215
@Override
213-
public void warning(TransformerException exception) throws TransformerException {
216+
public void warning(TransformerException exception) {
214217
errorReceiverProxy.log(Severity.WARNING, locationFactory.create(locator), getElementId(),
215218
"Kafka XML value parse error", exception);
216219
}
217220

218221
@Override
219-
public void error(TransformerException exception) throws TransformerException {
222+
public void error(TransformerException exception) {
220223
errorReceiverProxy.log(Severity.ERROR, locationFactory.create(locator), getElementId(),
221224
"Kafka XML value parse error", exception);
222225
}
223226

224227
@Override
225-
public void fatalError(TransformerException exception) throws TransformerException {
228+
public void fatalError(TransformerException exception) {
226229
errorReceiverProxy.log(Severity.FATAL_ERROR, locationFactory.create(locator), getElementId(),
227230
"Kafka XML value parse error", exception);
228231
}
@@ -271,73 +274,76 @@ public void fatalError(TransformerException exception) throws TransformerExcepti
271274
}
272275
}
273276

274-
if (HEADER_ELEMENT_LOCAL_NAME.equals(localName)) {
275-
state.inHeader = true;
276-
}
277-
278277
if (state != null) {
278+
if (HEADER_ELEMENT_LOCAL_NAME.equals(localName)) {
279+
state.inHeader = true;
280+
}
279281
state.lastElement = localName;
280282
}
281283
}
282284

283-
285+
content.clear();
284286
super.startElement(uri, localName, qName, atts);
285287
}
286288

287289
@Override
288-
public void characters(char[] ch, int start, int length) throws SAXException {
289-
String val = new String(ch, start, length);
290-
String element = state.lastElement;
291-
if (KEY_ELEMENT_LOCAL_NAME.equals(element)) {
292-
if (state.inHeader) {
293-
state.headerNames.add(val);
294-
} else {
295-
state.key = val;
296-
}
297-
} else if (VALUE_ELEMENT_LOCAL_NAME.equals(element)) {
298-
if (state.inHeader) {
299-
state.headerVals.add(val);
300-
} else if (xmlValueDepth >= 0) {
301-
xmlValueHandler.characters(ch, start, length);
302-
} else {
303-
state.messageValue = val.getBytes(StandardCharsets.UTF_8);
290+
public void endElement(final String uri, final String localName, final String qName) throws SAXException {
291+
if (state != null) {
292+
if (KEY_ELEMENT_LOCAL_NAME.equals(localName)) {
293+
if (state.inHeader) {
294+
state.headerNames.add(content.toString());
295+
} else {
296+
state.key = content.toString();
297+
}
298+
} else if (VALUE_ELEMENT_LOCAL_NAME.equals(localName)) {
299+
if (state.inHeader) {
300+
state.headerVals.add(content.toString());
301+
} else if (xmlValueDepth < 0) {
302+
state.messageValue = content.toString().getBytes(StandardCharsets.UTF_8);
303+
}
304304
}
305305
}
306-
super.characters(ch, start, length);
307-
}
308-
309-
310-
@Override
311-
public void endElement(final String uri, final String localName, final String qName) throws SAXException {
312306

313307
if (xmlValueDepth == 0) {
314308
if (VALUE_ELEMENT_LOCAL_NAME.equals(localName)) {
315-
//Create the val from the contents of XML handler buffer
309+
// Create the val from the contents of XML handler buffer
316310
xmlValueHandler.endDocument();
317311
state.messageValue = outputStream.toByteArray();
318312
xmlValueDepth = -1;
319313
} else {
320314
throw new SAXException("Unexpected tag " + localName + " in kafka message value.");
321315
}
322316
} else if (xmlValueDepth > 0) {
323-
//Closing an XML value element
317+
// Closing an XML value element
324318
xmlValueHandler.endElement(uri, localName, qName);
325319
xmlValueDepth--;
326-
} else {
327-
if (state != null) {
328-
state.lastElement = null;
329-
}
330320

321+
} else if (state != null) {
322+
state.lastElement = null;
331323
if (HEADER_ELEMENT_LOCAL_NAME.equals(localName)) {
332324
state.inHeader = false;
333325
} else if (RECORD_ELEMENT_LOCAL_NAME.equals(localName)) {
334326
createKafkaMessage(state);
335327
state = null;
336328
}
337329
}
330+
331+
content.clear();
338332
super.endElement(uri, localName, qName);
339333
}
340334

335+
@Override
336+
public void characters(char[] ch, int start, int length) throws SAXException {
337+
if (state != null && VALUE_ELEMENT_LOCAL_NAME.equals(state.lastElement)) {
338+
if (!state.inHeader && xmlValueDepth >= 0) {
339+
xmlValueHandler.characters(ch, start, length);
340+
}
341+
}
342+
343+
content.append(ch, start, length);
344+
super.characters(ch, start, length);
345+
}
346+
341347
private void createKafkaMessage(KafkaMessageState state) {
342348

343349
if (state.isInvalid()) {
@@ -384,7 +390,7 @@ private void logState(final KafkaMessageState state) {
384390
}
385391
stringBuilder
386392
.append(" Value: ")
387-
.append(state.messageValue);
393+
.append(Arrays.toString(state.messageValue));
388394

389395
log(Severity.INFO, stringBuilder.toString(), null);
390396
}
@@ -400,7 +406,7 @@ public void setKafkaConfig(final DocRef configRef) {
400406
@SuppressWarnings("unused")
401407
@PipelineProperty(
402408
description = "At the end of the stream, wait for acknowledgement from the Kafka broker for all " +
403-
"the messages sent. This ensures errors are caught in the pipeline process.",
409+
"the messages sent. This ensures errors are caught in the pipeline process.",
404410
defaultValue = "true",
405411
displayPriority = 2)
406412
public void setFlushOnSend(final boolean flushOnSend) {

Diff for: stroom-pipeline/src/main/java/stroom/pipeline/filter/ResultLocator.java

-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@ public void endElement(final String uri, final String localName, final String qN
142142
@Override
143143
public void characters(final char[] ch, final int start, final int length) throws SAXException {
144144
colNo += length;
145-
146145
super.characters(ch, start, length);
147146
}
148147

0 commit comments

Comments
 (0)