Skip to content

Commit e7c7d83

Browse files
authored
Merge pull request #4813 from gchq/gh-2334_fix_split_depth_for_stepping
Gh 2334 fix split depth for stepping
2 parents ffb5f41 + f8c898e commit e7c7d83

File tree

6 files changed

+372
-43
lines changed

6 files changed

+372
-43
lines changed

Diff for: stroom-pipeline/src/main/java/stroom/pipeline/factory/PipelineFactory.java

+84-43
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,22 @@ public Pipeline create(final PipelineData pipelineData,
155155
throw new PipelineFactoryException("The pipeline has no source element");
156156
}
157157

158+
// Get the split depth to use with the stepping controller.
159+
int controllerSplitDepth = 1;
160+
if (controller != null) {
161+
controllerSplitDepth =
162+
getSplitDepth(elementInstances, elementTypeMap, linkSets, sourceElement.getElementId());
163+
controllerSplitDepth = Math.max(controllerSplitDepth, 1);
164+
}
165+
158166
// Link the instances.
159-
link(elementInstances, elementTypeMap, linkSets, controller, sourceElement, sourceElement.getElementId());
167+
link(elementInstances,
168+
elementTypeMap,
169+
linkSets,
170+
controller,
171+
sourceElement,
172+
sourceElement.getElementId(),
173+
controllerSplitDepth);
160174

161175
// We need to create a root element that will be a target for the input
162176
// stream.
@@ -236,12 +250,11 @@ public static void setProperty(final ElementRegistry pipelineElementRegistry,
236250
// stepping and have code to insert.
237251
if (controller != null) {
238252
final PipelineStepRequest request = controller.getRequest();
239-
if (request.getCode() != null && request.getCode().size() > 0) {
253+
if (request.getCode() != null && !request.getCode().isEmpty()) {
240254
final String code = request.getCode().get(id);
241255
if (code != null) {
242-
if (elementInstance instanceof SupportsCodeInjection) {
243-
final SupportsCodeInjection supportsCodeInjection =
244-
(SupportsCodeInjection) elementInstance;
256+
if (elementInstance instanceof
257+
final SupportsCodeInjection supportsCodeInjection) {
245258
supportsCodeInjection.setInjectedCode(code);
246259
}
247260
}
@@ -308,7 +321,8 @@ private void link(final Map<String, Element> elementInstances,
308321
final Map<String, Set<String>> linkSets,
309322
final SteppingController controller,
310323
final Element parentElement,
311-
final String parentElementId) {
324+
final String parentElementId,
325+
final int controllerSplitDepth) {
312326
// Get the child elements of the supplied 'from' element id that we want
313327
// to work with.
314328
final List<Element> childElements = getChildElements(parentElementId, elementInstances, elementTypeMap,
@@ -335,37 +349,71 @@ private void link(final Map<String, Element> elementInstances,
335349
addMonitor(elementId, elementType, childElement, fragment, controller);
336350

337351
} else {
338-
fragment = insertRecorder(elementId, elementType, fragment, true, controller);
339-
fragment = insertRecorder(elementId, elementType, fragment, false, controller);
352+
fragment = insertRecorder(elementId, elementType, fragment, true, controller,
353+
controllerSplitDepth);
354+
fragment = insertRecorder(elementId, elementType, fragment, false, controller,
355+
controllerSplitDepth);
340356
addMonitor(elementId, elementType, childElement, fragment, controller);
341357
fragment = insertRecordDetector(elementType, fragment, true, controller);
342358
fragment = insertRecordDetector(elementType, fragment, false, controller);
343359
}
344360
}
345361

346362
// Continue to link the children of this child.
347-
link(elementInstances, elementTypeMap, linkSets, controller, fragment.getOut(), elementId);
363+
link(elementInstances,
364+
elementTypeMap,
365+
linkSets,
366+
controller,
367+
fragment.getOut(),
368+
elementId,
369+
controllerSplitDepth);
348370

349371
// Now set the target of the parent element to be the 'wrapped'
350372
// child to complete the link.
351373
if (parentElement != null && parentElement != fragment.getIn()) {
352-
if (!(parentElement instanceof HasTargets)) {
374+
if (parentElement instanceof final HasTargets hasTargets) {
375+
if (childElement instanceof Target) {
376+
if (fragment.getIn() instanceof final Target target) {
377+
hasTargets.addTarget(target);
378+
}
379+
} else {
380+
throw new PipelineFactoryException("Attempt to link to an element that is not a target: "
381+
+ parentElement.getElementId() + " > " + elementId);
382+
}
383+
} else {
353384
throw new PipelineFactoryException(
354385
"Attempt to link from an element that cannot target another element: "
355-
+ parentElement.getElementId());
356-
}
357-
if (!(childElement instanceof Target)) {
358-
throw new PipelineFactoryException("Attempt to link to an element that is not a target: "
359-
+ parentElement.getElementId() + " > " + elementId);
386+
+ parentElement.getElementId());
360387
}
388+
}
389+
}
390+
}
361391

362-
final HasTargets hasTargets = (HasTargets) parentElement;
363-
if (fragment.getIn() instanceof Target) {
364-
final Target target = (Target) fragment.getIn();
365-
hasTargets.addTarget(target);
366-
}
392+
/**
393+
* Get the split depth for use in stepping.
394+
*/
395+
private int getSplitDepth(final Map<String, Element> elementInstances,
396+
final Map<Element, PipelineElementType> elementTypeMap,
397+
final Map<String, Set<String>> linkSets,
398+
final String parentElementId) {
399+
// Get the child elements of the supplied 'from' element id that we want
400+
// to work with.
401+
final List<Element> childElements = getChildElements(parentElementId, elementInstances, elementTypeMap,
402+
linkSets, null);
403+
404+
// Loop over the child elements and link them to the parent.
405+
for (final Element childElement : childElements) {
406+
if (childElement instanceof final SplitFilter splitFilter) {
407+
return splitFilter.getSplitDepth();
408+
}
409+
final int childSplitDepth =
410+
getSplitDepth(elementInstances, elementTypeMap, linkSets, childElement.getElementId());
411+
if (childSplitDepth != -1) {
412+
return childSplitDepth;
367413
}
368414
}
415+
416+
return -1;
369417
}
370418

371419
/**
@@ -390,7 +438,7 @@ private List<Element> getChildElements(final String fromElementId,
390438
List<Element> childElements = Collections.emptyList();
391439

392440
final Set<String> toElementIdSet = linkSets.get(fromElementId);
393-
if (toElementIdSet != null && toElementIdSet.size() > 0) {
441+
if (toElementIdSet != null && !toElementIdSet.isEmpty()) {
394442
childElements = new ArrayList<>(toElementIdSet.size());
395443

396444
for (final String toElementId : toElementIdSet) {
@@ -440,7 +488,8 @@ private Fragment insertRecorder(final String elementId,
440488
final PipelineElementType elementType,
441489
final Fragment fragment,
442490
final boolean input,
443-
final SteppingController controller) {
491+
final SteppingController controller,
492+
final int controllerSplitDepth) {
444493
Fragment result = fragment;
445494

446495
// Get any filter settings that might be applied to XML output.
@@ -458,29 +507,25 @@ private Fragment insertRecorder(final String elementId,
458507
outputRecorder.setElementId(elementId);
459508
result = new Fragment(outputRecorder);
460509

461-
} else if (in instanceof AbstractInputElement) {
462-
final AbstractInputElement filter = (AbstractInputElement) in;
510+
} else if (in instanceof final AbstractInputElement filter) {
463511
final ReaderRecorder recorder = new ReaderRecorder(errorReceiverProxy);
464512
recorder.setElementId(elementId);
465513
recorder.setTarget(filter);
466514
result = new Fragment(recorder, fragment.getOut());
467515

468-
} else if (in instanceof AbstractReaderElement) {
469-
final AbstractReaderElement filter = (AbstractReaderElement) in;
516+
} else if (in instanceof final AbstractReaderElement filter) {
470517
final ReaderRecorder recorder = new ReaderRecorder(errorReceiverProxy);
471518
recorder.setElementId(elementId);
472519
recorder.setTarget(filter);
473520
result = new Fragment(recorder, fragment.getOut());
474521

475-
} else if (in instanceof AbstractParser) {
476-
final AbstractParser parser = (AbstractParser) in;
522+
} else if (in instanceof final AbstractParser parser) {
477523
final ReaderRecorder recorder = new ReaderRecorder(errorReceiverProxy);
478524
recorder.setElementId(elementId);
479525
recorder.setTarget(parser);
480526
result = new Fragment(recorder, fragment.getOut());
481527

482-
} else if (in instanceof XMLFilter && elementType.hasRole(PipelineElementType.ROLE_MUTATOR)) {
483-
final XMLFilter filter = (XMLFilter) in;
528+
} else if (in instanceof final XMLFilter filter && elementType.hasRole(PipelineElementType.ROLE_MUTATOR)) {
484529

485530
// Create stepping filter.
486531
final SAXEventRecorder recorder = elementFactory.getElementInstance(SAXEventRecorder.class);
@@ -502,27 +547,24 @@ private Fragment insertRecorder(final String elementId,
502547
outputRecorder.setElementId(elementId);
503548
result = new Fragment(outputRecorder);
504549

505-
} else if (out instanceof AbstractInputElement) {
506-
final AbstractInputElement filter = (AbstractInputElement) out;
550+
} else if (out instanceof final AbstractInputElement filter) {
507551
final ReaderRecorder recorder = new ReaderRecorder(errorReceiverProxy);
508552
recorder.setElementId(elementId);
509553
filter.setTarget(recorder);
510554
result = new Fragment(fragment.getIn(), recorder);
511555

512-
} else if (out instanceof AbstractReaderElement) {
513-
final AbstractReaderElement filter = (AbstractReaderElement) out;
556+
} else if (out instanceof final AbstractReaderElement filter) {
514557
final ReaderRecorder recorder = new ReaderRecorder(errorReceiverProxy);
515558
recorder.setElementId(elementId);
516559
filter.setTarget(recorder);
517560
result = new Fragment(fragment.getIn(), recorder);
518561

519-
} else if (out instanceof AbstractParser) {
520-
final AbstractParser parser = (AbstractParser) out;
562+
} else if (out instanceof final AbstractParser parser) {
521563

522564
// Insert a split filter after the parser to split all XML into
523565
// single records.
524566
final SplitFilter splitFilter = elementFactory.getElementInstance(SplitFilter.class);
525-
splitFilter.setSplitDepth(1);
567+
splitFilter.setSplitDepth(controllerSplitDepth);
526568
splitFilter.setSplitCount(controller.getRequest().getStepSize());
527569
parser.setTarget(splitFilter);
528570

@@ -537,18 +579,17 @@ private Fragment insertRecorder(final String elementId,
537579

538580
result = new Fragment(fragment.getIn(), recorder);
539581

540-
} else if (out instanceof XMLFilter && elementType.hasRole(PipelineElementType.ROLE_WRITER)) {
541-
final XMLFilter filter = (XMLFilter) out;
582+
} else if (out instanceof final XMLFilter filter && elementType.hasRole(PipelineElementType.ROLE_WRITER)) {
542583

543584
final OutputRecorder outputRecorder = elementFactory.getElementInstance(OutputRecorder.class);
544585
outputRecorder.setElementId(elementId);
545586
((HasTargets) filter).setTarget(outputRecorder);
546587

547588
result = new Fragment(fragment.getIn(), outputRecorder);
548589

549-
} else if (out instanceof XMLFilter && (elementType.hasRole(PipelineElementType.ROLE_MUTATOR)
550-
|| elementType.hasRole(PipelineElementType.ROLE_VALIDATOR))) {
551-
final XMLFilter filter = (XMLFilter) out;
590+
} else if (out instanceof final XMLFilter filter &&
591+
(elementType.hasRole(PipelineElementType.ROLE_MUTATOR) ||
592+
elementType.hasRole(PipelineElementType.ROLE_VALIDATOR))) {
552593

553594
// Create stepping filter.
554595
final SAXEventRecorder recorder = elementFactory.getElementInstance(SAXEventRecorder.class);
@@ -629,7 +670,7 @@ private Fragment insertRecordDetector(final PipelineElementType elementType,
629670
if (!input) {
630671
if (elementType.hasRole(PipelineElementType.ROLE_PARSER)) {
631672
if (controller.getRecordDetector() == null
632-
|| !(controller.getRecordDetector() instanceof SAXRecordDetector)) {
673+
|| !(controller.getRecordDetector() instanceof SAXRecordDetector)) {
633674
final SAXRecordDetector recordDetector = elementFactory.getElementInstance(SAXRecordDetector.class);
634675
controller.setRecordDetector(recordDetector);
635676
((HasTargets) fragment.getOut()).setTarget(recordDetector);

Diff for: stroom-pipeline/src/main/java/stroom/pipeline/filter/SplitFilter.java

+5
Original file line numberDiff line numberDiff line change
@@ -479,4 +479,9 @@ public void setStoreLocations(final boolean storeLocations) {
479479
locationHolder.setStoreLocations(storeLocations);
480480
}
481481
}
482+
483+
public int getSplitDepth() {
484+
// Add a fudge in here to cope with legacy depth being 0 based.
485+
return splitDepth - 1;
486+
}
482487
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package stroom.pipeline.xml.converter.json;
2+
3+
import java.io.IOException;
4+
import java.io.Writer;
5+
import java.nio.file.Files;
6+
import java.nio.file.Path;
7+
8+
public class JSONTestDataCreator {
9+
10+
void writeFile(final Path path, final JsonType[][] structure) throws IOException {
11+
try (final Writer writer = Files.newBufferedWriter(path)) {
12+
final JsonType[] level = structure[0];
13+
for (int i = 0; i < level.length; i++) {
14+
final JsonType type = level[i];
15+
writeType(writer, structure, 0, type, i + 1);
16+
writer.write("\n");
17+
}
18+
}
19+
}
20+
21+
private void writeType(final Writer writer,
22+
final JsonType[][] structure,
23+
final int depth,
24+
final JsonType type,
25+
final int no) throws IOException {
26+
switch (type) {
27+
case OBJECT -> writeJsonObject(writer, structure, depth);
28+
case ARRAY -> writeJsonArray(writer, structure, depth);
29+
case VALUE -> writer.write("\"value" + no + "\"");
30+
}
31+
}
32+
33+
private void writeJsonArray(final Writer writer,
34+
final JsonType[][] structure,
35+
final int depth) throws IOException {
36+
writer.write("[\n");
37+
final int childDepth = depth + 1;
38+
if (childDepth < structure.length) {
39+
final JsonType[] types = structure[childDepth];
40+
for (int i = 0; i < types.length; i++) {
41+
writeIndent(writer, childDepth);
42+
43+
writeType(writer, structure, childDepth, types[i], i + 1);
44+
45+
if (i + 1 < types.length) {
46+
writer.write(",");
47+
}
48+
writer.write("\n");
49+
}
50+
}
51+
writeIndent(writer, depth);
52+
writer.write("]");
53+
}
54+
55+
private void writeJsonObject(final Writer writer,
56+
final JsonType[][] structure,
57+
final int depth) throws IOException {
58+
writer.write("{\n");
59+
final int childDepth = depth + 1;
60+
if (childDepth < structure.length) {
61+
final JsonType[] types = structure[childDepth];
62+
for (int i = 0; i < types.length; i++) {
63+
writeIndent(writer, childDepth);
64+
65+
writer.write("\"key" + (i + 1) + "\" : ");
66+
writeType(writer, structure, childDepth, types[i], i + 1);
67+
68+
if (i + 1 < types.length) {
69+
writer.write(",");
70+
}
71+
writer.write("\n");
72+
}
73+
}
74+
writeIndent(writer, depth);
75+
writer.write("}");
76+
}
77+
78+
private void writeIndent(final Writer writer, final int depth) throws IOException {
79+
for (int i = 0; i < depth; i++) {
80+
writer.write(" ");
81+
}
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package stroom.pipeline.xml.converter.json;
2+
3+
public enum JsonType {
4+
OBJECT,
5+
ARRAY,
6+
VALUE
7+
}

0 commit comments

Comments
 (0)