Skip to content

Commit 6f7963d

Browse files
committed
Simplified stages #245
1 parent 2de758d commit 6f7963d

20 files changed

+412
-173
lines changed

RELEASE-NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
* #239 Remove APIs deprecated in 3.0.0
44
* #244 Simplify StringConverter
5+
* #245 Simplified stages
56
* #246 Missing stage: "SourceKeysStage.CONVERT_SOURCE_ROWS"
67
* #247 Missing stage: "DeleteStage.DELETE_TARGET"
78

link-move/src/main/java/com/nhl/link/move/CreateBuilder.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import com.nhl.link.move.extractor.model.ExtractorName;
44
import com.nhl.link.move.runtime.task.create.CreateSegment;
55
import com.nhl.link.move.runtime.task.create.CreateStage;
6+
import org.dflib.DataFrame;
67

78
import java.util.function.BiConsumer;
9+
import java.util.function.Function;
810

911
/**
1012
* A builder of an {@link LmTask} that performs fast "create" synchronization without any source/target key matching.
@@ -50,9 +52,20 @@ default CreateBuilder sourceExtractor(String location) {
5052
CreateBuilder batchSize(int batchSize);
5153

5254
/**
53-
* Adds a callback invoked for each processed segment after the specified stage in the "create" processing pipeline.
55+
* Adds a callback invoked for each data segment after the specified stage in the "create" processing pipeline.
5456
*
5557
* @since 3.0.0
5658
*/
57-
CreateBuilder stage(CreateStage stageType, BiConsumer<Execution, CreateSegment> callback);
59+
CreateBuilder stage(CreateStage stage, BiConsumer<Execution, CreateSegment> callback);
60+
61+
/**
62+
* Adds a callback invoked for each date segment after the specified stage in the "create" pipeline was processed.
63+
* The result of that stage is passed to the transformer argument. The value returned from the transformer overrides
64+
* the previous result for the stage.
65+
*
66+
* @since 4.0.0
67+
*/
68+
default CreateBuilder stage(CreateStage stage, Function<DataFrame, DataFrame> transformer) {
69+
return stage(stage, (e, s) -> s.postProcess(stage, transformer));
70+
}
5871
}

link-move/src/main/java/com/nhl/link/move/CreateOrUpdateBuilder.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
import com.nhl.link.move.runtime.task.createorupdate.CreateOrUpdateSegment;
66
import com.nhl.link.move.runtime.task.createorupdate.CreateOrUpdateStage;
77
import org.apache.cayenne.exp.property.Property;
8+
import org.dflib.DataFrame;
89

910
import java.util.function.BiConsumer;
11+
import java.util.function.Function;
1012

1113
/**
1214
* A builder of an {@link LmTask} that performs create-or-update synchronization.
@@ -87,5 +89,16 @@ default CreateOrUpdateBuilder sourceExtractor(String location) {
8789
*
8890
* @since 3.0.0
8991
*/
90-
CreateOrUpdateBuilder stage(CreateOrUpdateStage stageType, BiConsumer<Execution, CreateOrUpdateSegment> callback);
92+
CreateOrUpdateBuilder stage(CreateOrUpdateStage stage, BiConsumer<Execution, CreateOrUpdateSegment> callback);
93+
94+
/**
95+
* Adds a callback invoked for each date segment after the specified stage in the "create-or-update" pipeline was processed.
96+
* The result of that stage is passed to the transformer argument. The value returned from the transformer overrides
97+
* the previous result for the stage.
98+
*
99+
* @since 4.0.0
100+
*/
101+
default CreateOrUpdateBuilder stage(CreateOrUpdateStage stage, Function<DataFrame, DataFrame> transformer) {
102+
return stage(stage, (e, s) -> s.postProcess(stage, transformer));
103+
}
91104
}

link-move/src/main/java/com/nhl/link/move/DeleteBuilder.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import com.nhl.link.move.runtime.task.delete.DeleteStage;
77
import org.apache.cayenne.exp.Expression;
88
import org.apache.cayenne.exp.property.Property;
9+
import org.dflib.DataFrame;
910

1011
import java.util.function.BiConsumer;
12+
import java.util.function.Function;
1113

1214
/**
1315
* since 1.3
@@ -50,5 +52,16 @@ default DeleteBuilder sourceMatchExtractor(String location) {
5052
*
5153
* @since 3.0.0
5254
*/
53-
DeleteBuilder stage(DeleteStage stageType, BiConsumer<Execution, DeleteSegment> callback);
55+
DeleteBuilder stage(DeleteStage stage, BiConsumer<Execution, DeleteSegment> callback);
56+
57+
/**
58+
* Adds a callback invoked for each date segment after the specified stage in the "delete" pipeline was processed.
59+
* The result of that stage is passed to the transformer argument. The value returned from the transformer overrides
60+
* the previous result for the stage.
61+
*
62+
* @since 4.0.0
63+
*/
64+
default DeleteBuilder stage(DeleteStage stage, Function<DataFrame, DataFrame> transformer) {
65+
return stage(stage, (e, s) -> s.postProcess(stage, transformer));
66+
}
5467
}

link-move/src/main/java/com/nhl/link/move/SourceKeysBuilder.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import com.nhl.link.move.mapper.Mapper;
55
import com.nhl.link.move.runtime.task.sourcekeys.SourceKeysSegment;
66
import com.nhl.link.move.runtime.task.sourcekeys.SourceKeysStage;
7+
import org.dflib.DataFrame;
78

89
import java.util.function.BiConsumer;
10+
import java.util.function.Function;
911

1012
/**
1113
* A builder of an {@link LmTask} that extracts all the keys from the source
@@ -62,5 +64,16 @@ default SourceKeysBuilder sourceExtractor(String location) {
6264
*
6365
* @since 3.0.0
6466
*/
65-
SourceKeysBuilder stage(SourceKeysStage stageType, BiConsumer<Execution, SourceKeysSegment> callback);
67+
SourceKeysBuilder stage(SourceKeysStage stage, BiConsumer<Execution, SourceKeysSegment> callback);
68+
69+
/**
70+
* Adds a callback invoked for each date segment after the specified stage in the "source keys" pipeline was processed.
71+
* The result of that stage is passed to the transformer argument. The value returned from the transformer overrides
72+
* the previous result for the stage.
73+
*
74+
* @since 4.0.0
75+
*/
76+
default SourceKeysBuilder stage(SourceKeysStage stage, Function<DataFrame, DataFrame> transformer) {
77+
return stage(stage, (e, s) -> s.postProcess(stage, transformer));
78+
}
6679
}

link-move/src/main/java/com/nhl/link/move/runtime/task/BaseTaskBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public BaseTaskBuilder(LmLogger logger) {
3131
/**
3232
* @since 3.0.0
3333
*/
34-
public T stage(U stageType, BiConsumer<Execution, S> callback) {
35-
stageListenersBuilder.addStageCallback(stageType, callback);
34+
public T stage(U stage, BiConsumer<Execution, S> callback) {
35+
stageListenersBuilder.addStageCallback(stage, callback);
3636
return (T) this;
3737
}
3838

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,51 @@
11
package com.nhl.link.move.runtime.task.common;
22

3-
public interface DataSegment {
3+
import org.dflib.DataFrame;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
import java.util.function.Function;
10+
11+
public abstract class DataSegment<U extends TaskStageType> {
12+
13+
private static final Logger LOGGER = LoggerFactory.getLogger(DataSegment.class);
14+
15+
protected final Map<U, DataFrame> stageResults;
16+
17+
public DataSegment() {
18+
// while results are mutable, stages are sequential (and DataSegment is assumed to be single-threaded), so no
19+
// need for concurrency
20+
this.stageResults = new HashMap<>();
21+
}
22+
23+
/**
24+
* @since 4.0.0
25+
*/
26+
public DataFrame get(U stage) {
27+
return stageResults.get(stage);
28+
}
29+
30+
/**
31+
* @since 4.0.0
32+
*/
33+
public void set(U stage, DataFrame df) {
34+
stageResults.put(stage, df);
35+
}
36+
37+
/**
38+
* Retrieves data from the specified stage and, if it is present, applies the processor and saves the result back
39+
* replacing the previous result of this stage.
40+
*
41+
* @since 4.0.0
42+
*/
43+
public void postProcess(U stage, Function<DataFrame, DataFrame> processor) {
44+
DataFrame df = get(stage);
45+
if (df != null) {
46+
set(stage, processor.apply(df));
47+
} else {
48+
LOGGER.warn("Skipping postprocessing. Stage {} has no results", stage);
49+
}
50+
}
451
}
Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,21 @@
11
package com.nhl.link.move.runtime.task.create;
22

33
import com.nhl.link.move.RowAttribute;
4-
import org.dflib.DataFrame;
54
import com.nhl.link.move.runtime.task.common.DataSegment;
65
import org.apache.cayenne.ObjectContext;
6+
import org.dflib.DataFrame;
77

88
/**
99
* @since 2.6
1010
*/
11-
public class CreateSegment implements DataSegment {
11+
public class CreateSegment extends DataSegment<CreateStage> {
1212

1313
public static final String TARGET_COLUMN = "$lm_target";
1414
public static final String TARGET_CREATED_COLUMN = "$lm_target_created";
1515

1616
private final ObjectContext context;
1717
private final RowAttribute[] sourceRowsHeader;
1818

19-
private DataFrame sourceRows;
20-
private DataFrame sources;
21-
private DataFrame mapped;
22-
private DataFrame fksResolved;
23-
private DataFrame merged;
24-
2519
public CreateSegment(ObjectContext context, RowAttribute[] sourceRowsHeader) {
2620
this.sourceRowsHeader = sourceRowsHeader;
2721
this.context = context;
@@ -36,56 +30,56 @@ public RowAttribute[] getSourceRowsHeader() {
3630
}
3731

3832
public DataFrame getSourceRows() {
39-
return sourceRows;
33+
return get(CreateStage.EXTRACT_SOURCE_ROWS);
4034
}
4135

4236
/**
4337
* @since 2.17
4438
*/
45-
public CreateSegment setSourceRows(DataFrame sourceRows) {
46-
this.sourceRows = sourceRows;
39+
public CreateSegment setSourceRows(DataFrame df) {
40+
set(CreateStage.EXTRACT_SOURCE_ROWS, df);
4741
return this;
4842
}
4943

5044
public DataFrame getSources() {
51-
return sources;
45+
return get(CreateStage.CONVERT_SOURCE_ROWS);
5246
}
5347

54-
public CreateSegment setSources(DataFrame translatedSegment) {
55-
this.sources = translatedSegment;
56-
return this;
57-
}
58-
59-
public DataFrame getMerged() {
60-
return merged;
61-
}
62-
63-
public CreateSegment setMerged(DataFrame merged) {
64-
this.merged = merged;
48+
public CreateSegment setSources(DataFrame df) {
49+
set(CreateStage.CONVERT_SOURCE_ROWS, df);
6550
return this;
6651
}
6752

6853
public DataFrame getMapped() {
69-
return mapped;
54+
return get(CreateStage.MAP_TARGET);
7055
}
7156

72-
public CreateSegment setMapped(DataFrame mapped) {
73-
this.mapped = mapped;
57+
public CreateSegment setMapped(DataFrame df) {
58+
set(CreateStage.MAP_TARGET, df);
7459
return this;
7560
}
7661

7762
/**
7863
* @since 2.12
7964
*/
8065
public DataFrame getFksResolved() {
81-
return fksResolved;
66+
return get(CreateStage.RESOLVE_FK_VALUES);
8267
}
8368

8469
/**
8570
* @since 2.12
8671
*/
87-
public CreateSegment setFksResolved(DataFrame fksResolved) {
88-
this.fksResolved = fksResolved;
72+
public CreateSegment setFksResolved(DataFrame df) {
73+
set(CreateStage.RESOLVE_FK_VALUES, df);
74+
return this;
75+
}
76+
77+
public DataFrame getMerged() {
78+
return get(CreateStage.MERGE_TARGET);
79+
}
80+
81+
public CreateSegment setMerged(DataFrame df) {
82+
set(CreateStage.MERGE_TARGET, df);
8983
return this;
9084
}
9185
}

0 commit comments

Comments
 (0)