Skip to content

Commit f62d151

Browse files
committed
modifying execute API to get column nullability state
1 parent 5790e4b commit f62d151

File tree

10 files changed

+96
-12
lines changed

10 files changed

+96
-12
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.cdap.wrangler.api;
2+
3+
/**
4+
* A Null Handling specific exception used for communicating issues with Null Handling in a column.
5+
*/
6+
public class NullHandlingException extends Exception {
7+
public NullHandlingException(Exception e) {
8+
super(e);
9+
}
10+
11+
public NullHandlingException(String message) {
12+
super(message);
13+
}
14+
15+
}

wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.cdap.wrangler.api.ReportErrorAndProceed;
3131
import io.cdap.wrangler.api.Row;
3232
import io.cdap.wrangler.api.TransientVariableScope;
33+
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
3334
import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator;
3435
import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext;
3536
import io.cdap.wrangler.schema.TransientStoreKeys;
@@ -40,6 +41,8 @@
4041
import org.slf4j.LoggerFactory;
4142

4243
import java.util.ArrayList;
44+
import java.util.HashMap;
45+
import java.util.Iterator;
4346
import java.util.List;
4447
import javax.annotation.Nullable;
4548

@@ -56,10 +59,13 @@ public final class RecipePipelineExecutor implements RecipePipeline<Row, Structu
5659
private final RecipeParser recipeParser;
5760
private final ExecutorContext context;
5861
private List<Directive> directives;
62+
private HashMap<String, UserDefinedAction> nullabilityMap;
5963

60-
public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context) {
64+
public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context,
65+
HashMap<String, UserDefinedAction> nullabilityMap) {
6166
this.context = context;
6267
this.recipeParser = recipeParser;
68+
this.nullabilityMap = nullabilityMap;
6369
}
6470

6571
/**

wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public static List<Row> execute(String[] recipe, List<Row> rows, ExecutorContext
8989

9090
String migrate = new MigrateToV2(recipe).migrate();
9191
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
92-
return new RecipePipelineExecutor(parser, context).execute(rows);
92+
return new RecipePipelineExecutor(parser, context, null).execute(rows);
9393
}
9494

9595
/**
@@ -112,7 +112,7 @@ public static Pair<List<Row>, List<Row>> executeWithErrors(String[] recipe, List
112112

113113
String migrate = new MigrateToV2(recipe).migrate();
114114
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
115-
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context);
115+
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context, null);
116116
List<Row> results = pipeline.execute(rows);
117117
List<Row> errors = pipeline.errors();
118118
return new Pair<>(results, errors);
@@ -126,7 +126,7 @@ public static RecipePipeline execute(String[] recipe)
126126

127127
String migrate = new MigrateToV2(recipe).migrate();
128128
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
129-
return new RecipePipelineExecutor(parser, new TestingPipelineContext());
129+
return new RecipePipelineExecutor(parser, new TestingPipelineContext(), null);
130130
}
131131

132132
public static RecipeParser parse(String[] recipe) throws DirectiveParseException, DirectiveLoadException {

wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package io.cdap.wrangler.proto.workspace.v2;
1919

20+
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
2021
import java.util.Collections;
22+
import java.util.HashMap;
2123
import java.util.List;
2224

2325
/**
@@ -26,10 +28,14 @@
2628
public class DirectiveExecutionRequest {
2729
private final List<String> directives;
2830
private final int limit;
31+
private final HashMap<String, UserDefinedAction> nullabilityMap;
2932

30-
public DirectiveExecutionRequest(List<String> directives, int limit) {
33+
34+
public DirectiveExecutionRequest(List<String> directives, int limit,
35+
HashMap<String, UserDefinedAction> nullabilityMap) {
3136
this.directives = directives;
3237
this.limit = limit;
38+
this.nullabilityMap = nullabilityMap;
3339
}
3440

3541
public int getLimit() {
@@ -39,4 +45,8 @@ public int getLimit() {
3945
public List<String> getDirectives() {
4046
return directives == null ? Collections.emptyList() : directives;
4147
}
48+
49+
public HashMap<String, UserDefinedAction> getNullabilityMap() {
50+
return nullabilityMap;
51+
}
4252
}

wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.gson.JsonObject;
2121

2222
import java.util.ArrayList;
23+
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Objects;
2526
import javax.annotation.Nullable;
@@ -38,6 +39,8 @@ public class Workspace {
3839
// this is for insights page in UI
3940
private final JsonObject insights;
4041

42+
private HashMap<String, UserDefinedAction> nullabilityMap;
43+
4144
private Workspace(String workspaceName, String workspaceId, List<String> directives,
4245
long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec,
4346
JsonObject insights) {
@@ -48,6 +51,7 @@ private Workspace(String workspaceName, String workspaceId, List<String> directi
4851
this.updatedTimeMillis = updatedTimeMillis;
4952
this.sampleSpec = sampleSpec;
5053
this.insights = insights;
54+
this.nullabilityMap = new HashMap<>();
5155
}
5256

5357
public String getWorkspaceName() {
@@ -79,6 +83,15 @@ public JsonObject getInsights() {
7983
return insights;
8084
}
8185

86+
public HashMap<String, UserDefinedAction> getColumnMappings() {
87+
return nullabilityMap;
88+
}
89+
90+
public void setColumnMappings(
91+
HashMap<String, UserDefinedAction> nullabilityMap) {
92+
this.nullabilityMap = nullabilityMap;
93+
}
94+
8295
@Override
8396
public boolean equals(Object o) {
8497
if (this == o) {
@@ -164,4 +177,15 @@ public Workspace build() {
164177
insights);
165178
}
166179
}
180+
181+
/**
182+
* UserDefinedAction enum.
183+
*/
184+
public enum UserDefinedAction {
185+
NO_ACTION,
186+
SKIP_ROW,
187+
SEND_TO_ERROR_COLLECTOR,
188+
ERROR_PIPELINE,
189+
NULLABLE
190+
}
167191
}

wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646
import io.cdap.wrangler.proto.workspace.ColumnValidationResult;
4747
import io.cdap.wrangler.proto.workspace.WorkspaceValidationResult;
4848
import io.cdap.wrangler.proto.workspace.v2.DirectiveExecutionResponse;
49+
import io.cdap.wrangler.proto.workspace.v2.SampleSpec;
50+
import io.cdap.wrangler.proto.workspace.v2.Workspace;
51+
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
4952
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
5053
import io.cdap.wrangler.registry.DirectiveRegistry;
5154
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
@@ -118,7 +121,11 @@ protected <E extends Exception> List<Row> executeDirectives(
118121
String namespace,
119122
List<String> directives,
120123
List<Row> sample,
121-
GrammarWalker.Visitor<E> grammarVisitor) throws DirectiveParseException, E, RecipeException {
124+
GrammarWalker.Visitor<E> grammarVisitor,
125+
Workspace workspace) throws DirectiveParseException, E, RecipeException {
126+
127+
HashMap<String, UserDefinedAction> nullabilityMap = workspace.getColumnMappings().isEmpty() ?
128+
new HashMap<>() : workspace.getColumnMappings();
122129

123130
if (directives.isEmpty()) {
124131
return sample;
@@ -139,8 +146,11 @@ protected <E extends Exception> List<Row> executeDirectives(
139146
new ConfigDirectiveContext(DirectiveConfig.EMPTY));
140147
try (RecipePipelineExecutor executor = new RecipePipelineExecutor(parser,
141148
new ServicePipelineContext(
142-
namespace, ExecutorContext.Environment.SERVICE,
143-
getContext(), TRANSIENT_STORE))) {
149+
namespace,
150+
ExecutorContext.Environment.SERVICE,
151+
getContext(),
152+
TRANSIENT_STORE),
153+
nullabilityMap)) {
144154
List<Row> result = executor.execute(sample);
145155

146156
List<ErrorRecordBase> errors = executor.errors()

wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1095,7 +1095,7 @@ private <E extends Exception> List<Row> executeDirectives(NamespacedId id, List<
10951095
// Extract rows from the workspace.
10961096
List<Row> rows = fromWorkspace(workspace);
10971097
return executeDirectives(id.getNamespace().getName(), directives, sample.apply(rows),
1098-
grammarVisitor);
1098+
grammarVisitor, null);
10991099
});
11001100
}
11011101
}

wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception {
107107
namespace,
108108
ExecutorContext.Environment.SERVICE,
109109
systemAppContext,
110-
new DefaultTransientStore()))) {
110+
new DefaultTransientStore()), null)) {
111111
rows = executor.execute(rows);
112112
List<ErrorRecordBase> errors = executor.errors().stream()
113113
.filter(ErrorRecordBase::isShownInWrangler)

wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import io.cdap.wrangler.proto.workspace.v2.ServiceResponse;
6868
import io.cdap.wrangler.proto.workspace.v2.StageSpec;
6969
import io.cdap.wrangler.proto.workspace.v2.Workspace;
70+
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
7071
import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest;
7172
import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail;
7273
import io.cdap.wrangler.proto.workspace.v2.WorkspaceId;
@@ -472,6 +473,12 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque
472473

473474
WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId);
474475
UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector();
476+
HashMap<String, UserDefinedAction> nullabilityMap = executionRequest.getNullabilityMap() == null ?
477+
new HashMap<>() : executionRequest.getNullabilityMap();
478+
if (!nullabilityMap.isEmpty()) {
479+
//change nullabilityMap in Workspace Object
480+
changeNullability(nullabilityMap, workspaceId);
481+
}
475482
List<Row> result = executeDirectives(ns.getName(), directives, detail,
476483
userDirectivesCollector);
477484
DirectiveExecutionResponse response = generateExecutionResponse(result,
@@ -484,6 +491,18 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque
484491
return response;
485492
}
486493

494+
private void changeNullability(HashMap<String, UserDefinedAction> columnMappings,
495+
WorkspaceId workspaceId) throws Exception {
496+
try {
497+
Workspace workspace = wsStore.getWorkspace(workspaceId);
498+
workspace.setColumnMappings(columnMappings);
499+
wsStore.updateWorkspace(workspaceId, workspace);
500+
} catch (Exception e) {
501+
throw new RuntimeException("Error in setting nullabilityMap of columns ", e);
502+
}
503+
}
504+
505+
487506
/**
488507
* Get source specs, contains some hacky way on dealing with the csv parser
489508
*/
@@ -580,7 +599,7 @@ private <E extends Exception> List<Row> executeLocally(String namespace, List<St
580599
// load the udd
581600
composite.reload(namespace);
582601
return executeDirectives(namespace, directives, new ArrayList<>(detail.getSample()),
583-
grammarVisitor);
602+
grammarVisitor, detail.getWorkspace());
584603
}
585604

586605
/**

wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public static RecipePipeline pipeline(Class<? extends Directive> directive, Test
5656

5757
String migrate = new MigrateToV2(recipe.toArray()).migrate();
5858
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
59-
return new RecipePipelineExecutor(parser, null);
59+
return new RecipePipelineExecutor(parser, null, null);
6060
}
6161

6262
public static RecipeParser parser(Class<? extends Directive> directive, String[] recipe)

0 commit comments

Comments
 (0)