diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java new file mode 100644 index 000000000..0d13c6711 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2016-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler.api; + +/** + * A Null Handling specific exception used for communicating issues with Null Handling in a column. + */ +public class NullHandlingException extends Exception { + public NullHandlingException(Exception e) { + super(e); + } + + public NullHandlingException(String message) { + super(message); + } + +} diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java b/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java index 159d6d512..a09301f5a 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java @@ -30,6 +30,7 @@ import io.cdap.wrangler.api.ReportErrorAndProceed; import io.cdap.wrangler.api.Row; import io.cdap.wrangler.api.TransientVariableScope; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator; import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext; import io.cdap.wrangler.schema.TransientStoreKeys; @@ -40,6 +41,8 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; @@ -56,10 +59,13 @@ public final class RecipePipelineExecutor implements RecipePipeline directives; + private HashMap nullabilityMap; - public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context) { + public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context, + HashMap nullabilityMap) { this.context = context; this.recipeParser = recipeParser; + this.nullabilityMap = nullabilityMap; } /** diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java b/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java index a56d12504..5507f0770 100644 --- a/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java +++ b/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java @@ -89,7 +89,7 @@ public static List execute(String[] recipe, List rows, ExecutorContext String migrate = new MigrateToV2(recipe).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - return new RecipePipelineExecutor(parser, context).execute(rows); + return new RecipePipelineExecutor(parser, context, null).execute(rows); } /** @@ -112,7 +112,7 @@ public static Pair, List> executeWithErrors(String[] recipe, List String migrate = new MigrateToV2(recipe).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - RecipePipeline pipeline = new RecipePipelineExecutor(parser, context); + RecipePipeline pipeline = new RecipePipelineExecutor(parser, context, null); List results = pipeline.execute(rows); List errors = pipeline.errors(); return new Pair<>(results, errors); @@ -126,7 +126,7 @@ public static RecipePipeline execute(String[] recipe) String migrate = new MigrateToV2(recipe).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - return new RecipePipelineExecutor(parser, new TestingPipelineContext()); + return new RecipePipelineExecutor(parser, new TestingPipelineContext(), null); } public static RecipeParser parse(String[] recipe) throws DirectiveParseException, DirectiveLoadException { diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java index a77ce1f5d..ffd96e946 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java @@ -17,7 +17,9 @@ package io.cdap.wrangler.proto.workspace.v2; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import java.util.Collections; +import java.util.HashMap; import java.util.List; /** @@ -26,10 +28,14 @@ public class DirectiveExecutionRequest { private final List directives; private final int limit; + private final HashMap nullabilityMap; - public DirectiveExecutionRequest(List directives, int limit) { + + public DirectiveExecutionRequest(List directives, int limit, + HashMap nullabilityMap) { this.directives = directives; this.limit = limit; + this.nullabilityMap = nullabilityMap; } public int getLimit() { @@ -39,4 +45,8 @@ public int getLimit() { public List getDirectives() { return directives == null ? Collections.emptyList() : directives; } + + public HashMap getNullabilityMap() { + return nullabilityMap; + } } diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java index 16aac68f8..79d7aa2ab 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java @@ -20,6 +20,7 @@ import com.google.gson.JsonObject; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Objects; import javax.annotation.Nullable; @@ -38,6 +39,8 @@ public class Workspace { // this is for insights page in UI private final JsonObject insights; + private HashMap nullabilityMap; + private Workspace(String workspaceName, String workspaceId, List directives, long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec, JsonObject insights) { @@ -48,6 +51,7 @@ private Workspace(String workspaceName, String workspaceId, List directi this.updatedTimeMillis = updatedTimeMillis; this.sampleSpec = sampleSpec; this.insights = insights; + this.nullabilityMap = new HashMap<>(); } public String getWorkspaceName() { @@ -79,6 +83,15 @@ public JsonObject getInsights() { return insights; } + public HashMap getColumnMappings() { + return nullabilityMap; + } + + public void setColumnMappings( + HashMap nullabilityMap) { + this.nullabilityMap = nullabilityMap; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -164,4 +177,15 @@ public Workspace build() { insights); } } + + /** + * UserDefinedAction enum. + */ + public enum UserDefinedAction { + NO_ACTION, + SKIP_ROW, + SEND_TO_ERROR_COLLECTOR, + ERROR_PIPELINE, + NULLABLE + } } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java index d7eae7960..74818e131 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java @@ -46,6 +46,9 @@ import io.cdap.wrangler.proto.workspace.ColumnValidationResult; import io.cdap.wrangler.proto.workspace.WorkspaceValidationResult; import io.cdap.wrangler.proto.workspace.v2.DirectiveExecutionResponse; +import io.cdap.wrangler.proto.workspace.v2.SampleSpec; +import io.cdap.wrangler.proto.workspace.v2.Workspace; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.registry.CompositeDirectiveRegistry; import io.cdap.wrangler.registry.DirectiveRegistry; import io.cdap.wrangler.registry.SystemDirectiveRegistry; @@ -118,7 +121,8 @@ protected List executeDirectives( String namespace, List directives, List sample, - GrammarWalker.Visitor grammarVisitor) throws DirectiveParseException, E, RecipeException { + GrammarWalker.Visitor grammarVisitor, + Workspace workspace) throws DirectiveParseException, E, RecipeException { if (directives.isEmpty()) { return sample; @@ -139,8 +143,11 @@ protected List executeDirectives( new ConfigDirectiveContext(DirectiveConfig.EMPTY)); try (RecipePipelineExecutor executor = new RecipePipelineExecutor(parser, new ServicePipelineContext( - namespace, ExecutorContext.Environment.SERVICE, - getContext(), TRANSIENT_STORE))) { + namespace, + ExecutorContext.Environment.SERVICE, + getContext(), + TRANSIENT_STORE), + workspace.getColumnMappings())) { List result = executor.execute(sample); List errors = executor.errors() diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java index a73354cb0..e3b75d321 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java @@ -1095,7 +1095,7 @@ private List executeDirectives(NamespacedId id, List< // Extract rows from the workspace. List rows = fromWorkspace(workspace); return executeDirectives(id.getNamespace().getName(), directives, sample.apply(rows), - grammarVisitor); + grammarVisitor, null); }); } } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java index 27216247f..6d092a4d0 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java @@ -107,7 +107,7 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception { namespace, ExecutorContext.Environment.SERVICE, systemAppContext, - new DefaultTransientStore()))) { + new DefaultTransientStore()), null)) { rows = executor.execute(rows); List errors = executor.errors().stream() .filter(ErrorRecordBase::isShownInWrangler) diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java index 82b45521e..bb39790ea 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java @@ -67,6 +67,7 @@ import io.cdap.wrangler.proto.workspace.v2.ServiceResponse; import io.cdap.wrangler.proto.workspace.v2.StageSpec; import io.cdap.wrangler.proto.workspace.v2.Workspace; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest; import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail; import io.cdap.wrangler.proto.workspace.v2.WorkspaceId; @@ -472,6 +473,12 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId); UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector(); + HashMap nullabilityMap = executionRequest.getNullabilityMap() == null ? + new HashMap<>() : executionRequest.getNullabilityMap(); + if (!nullabilityMap.isEmpty()) { + //change nullabilityMap in Workspace Object + changeNullability(nullabilityMap, workspaceId); + } List result = executeDirectives(ns.getName(), directives, detail, userDirectivesCollector); DirectiveExecutionResponse response = generateExecutionResponse(result, @@ -484,6 +491,18 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque return response; } + private void changeNullability(HashMap columnMappings, + WorkspaceId workspaceId) throws Exception { + try { + Workspace workspace = wsStore.getWorkspace(workspaceId); + workspace.setColumnMappings(columnMappings); + wsStore.updateWorkspace(workspaceId, workspace); + } catch (Exception e) { + throw new RuntimeException("Error in setting nullabilityMap of columns ", e); + } + } + + /** * Get source specs, contains some hacky way on dealing with the csv parser */ @@ -580,7 +599,7 @@ private List executeLocally(String namespace, List(detail.getSample()), - grammarVisitor); + grammarVisitor, detail.getWorkspace()); } /** diff --git a/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java b/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java index da108cf1b..9a1cd9254 100644 --- a/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java +++ b/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java @@ -56,7 +56,7 @@ public static RecipePipeline pipeline(Class directive, Test String migrate = new MigrateToV2(recipe.toArray()).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - return new RecipePipelineExecutor(parser, null); + return new RecipePipelineExecutor(parser, null, null); } public static RecipeParser parser(Class directive, String[] recipe) diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java index d5e57ae69..0fd83dbf2 100644 --- a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java +++ b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java @@ -67,6 +67,7 @@ import io.cdap.wrangler.parser.NoOpDirectiveContext; import io.cdap.wrangler.parser.RecipeCompiler; import io.cdap.wrangler.proto.Contexts; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.registry.CompositeDirectiveRegistry; import io.cdap.wrangler.registry.DirectiveInfo; import io.cdap.wrangler.registry.DirectiveRegistry; @@ -365,7 +366,7 @@ && checkPreconditionNotEmpty(false)) { try { // Create the pipeline executor with context being set. - pipeline = new RecipePipelineExecutor(recipe, ctx); + pipeline = new RecipePipelineExecutor(recipe, ctx, null); } catch (Exception e) { throw new Exception(String.format("Stage:%s - %s", getContext().getStageName(), e.getMessage()), e); }