Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modifying execute API to get column nullability state #686

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@
import io.cdap.wrangler.api.ReportErrorAndProceed;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction;
import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator;
import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext;
import io.cdap.wrangler.schema.TransientStoreKeys;
import io.cdap.wrangler.utils.RecordConvertor;
import io.cdap.wrangler.utils.RecordConvertorException;
import io.cdap.wrangler.utils.SchemaConverter;
import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Map;
import java.util.List;
import javax.annotation.Nullable;

Expand All @@ -56,10 +58,13 @@ public final class RecipePipelineExecutor implements RecipePipeline<Row, Structu
private final RecipeParser recipeParser;
private final ExecutorContext context;
private List<Directive> directives;
private final Map<String, UserDefinedAction> nullabilityMap;

public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context) {
public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context,
Map<String, UserDefinedAction> nullabilityMap) {
this.context = context;
this.recipeParser = recipeParser;
this.nullabilityMap = new HashMap<>(nullabilityMap);
}

/**
Expand Down
7 changes: 4 additions & 3 deletions wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
import io.cdap.wrangler.schema.TransientStoreKeys;
import java.util.Collections;
import org.junit.Assert;

import java.util.Iterator;
Expand Down Expand Up @@ -89,7 +90,7 @@ public static List<Row> execute(String[] recipe, List<Row> rows, ExecutorContext

String migrate = new MigrateToV2(recipe).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
return new RecipePipelineExecutor(parser, context).execute(rows);
return new RecipePipelineExecutor(parser, context, Collections.emptyMap()).execute(rows);
}

/**
Expand All @@ -112,7 +113,7 @@ public static Pair<List<Row>, List<Row>> executeWithErrors(String[] recipe, List

String migrate = new MigrateToV2(recipe).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context);
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context, null);
List<Row> results = pipeline.execute(rows);
List<Row> errors = pipeline.errors();
return new Pair<>(results, errors);
Expand All @@ -126,7 +127,7 @@ public static RecipePipeline execute(String[] recipe)

String migrate = new MigrateToV2(recipe).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
return new RecipePipelineExecutor(parser, new TestingPipelineContext());
return new RecipePipelineExecutor(parser, new TestingPipelineContext(), null);
}

public static RecipeParser parse(String[] recipe) throws DirectiveParseException, DirectiveLoadException {
Expand Down
harshdeeppruthi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,26 @@

package io.cdap.wrangler.proto.workspace.v2;

import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Directive execution request for v2 endpoint
*/
public class DirectiveExecutionRequest {
private final List<String> directives;
private final int limit;
private final Map<String, UserDefinedAction> nullabilityMap;

public DirectiveExecutionRequest(List<String> directives, int limit) {

public DirectiveExecutionRequest(List<String> directives, int limit,
Map<String, UserDefinedAction> nullabilityMap) {
this.directives = directives;
this.limit = limit;
this.nullabilityMap = nullabilityMap;
}

public int getLimit() {
Expand All @@ -39,4 +46,9 @@ public int getLimit() {
public List<String> getDirectives() {
return directives == null ? Collections.emptyList() : directives;
}

public Map<String, UserDefinedAction> getNullabilityMap() {

return nullabilityMap == null ? Collections.emptyMap() : nullabilityMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.cdap.wrangler.proto.workspace.v2;

/**
* UserDefinedAction enum.
*/
public enum UserDefinedAction {
FILTER,
SEND_TO_ERROR_COLLECTOR,
ERROR,
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import com.google.gson.JsonObject;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;

Expand All @@ -38,16 +41,19 @@ public class Workspace {
// this is for insights page in UI
private final JsonObject insights;

private final Map<String, UserDefinedAction> nullabilityMap;

private Workspace(String workspaceName, String workspaceId, List<String> directives,
long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec,
JsonObject insights) {
JsonObject insights, Map<String, UserDefinedAction> nullabilityMap) {
this.workspaceName = workspaceName;
this.workspaceId = workspaceId;
this.directives = directives;
this.createdTimeMillis = createdTimeMillis;
this.updatedTimeMillis = updatedTimeMillis;
this.sampleSpec = sampleSpec;
this.insights = insights;
this.nullabilityMap = Collections.unmodifiableMap(new HashMap(nullabilityMap));
}

public String getWorkspaceName() {
Expand Down Expand Up @@ -79,6 +85,10 @@ public JsonObject getInsights() {
return insights;
}

public Map<String, UserDefinedAction> getNullabilityMap() {
return nullabilityMap;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -111,7 +121,8 @@ public static Builder builder(Workspace existing) {
.setCreatedTimeMillis(existing.getCreatedTimeMillis())
.setUpdatedTimeMillis(existing.getUpdatedTimeMillis())
.setSampleSpec(existing.getSampleSpec())
.setInsights(existing.getInsights());
.setInsights(existing.getInsights())
.setNullabilityMap(existing.getNullabilityMap());
}

/**
Expand All @@ -125,6 +136,7 @@ public static class Builder {
private long updatedTimeMillis;
private SampleSpec sampleSpec;
private JsonObject insights;
private Map<String, UserDefinedAction> nullabilityMap;

Builder(String name, String workspaceId) {
this.workspaceName = name;
Expand Down Expand Up @@ -159,9 +171,14 @@ public Builder setInsights(JsonObject insights) {
return this;
}

public Builder setNullabilityMap(Map<String, UserDefinedAction> nullabilityMap) {
this.nullabilityMap = nullabilityMap;
return this;
}

public Workspace build() {
return new Workspace(workspaceName, workspaceId, directives, createdTimeMillis, updatedTimeMillis, sampleSpec,
insights);
insights, nullabilityMap);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import io.cdap.wrangler.proto.workspace.ColumnValidationResult;
import io.cdap.wrangler.proto.workspace.WorkspaceValidationResult;
import io.cdap.wrangler.proto.workspace.v2.DirectiveExecutionResponse;
import io.cdap.wrangler.proto.workspace.v2.SampleSpec;
import io.cdap.wrangler.proto.workspace.v2.Workspace;
import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction;
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.DirectiveRegistry;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
Expand Down Expand Up @@ -118,7 +121,9 @@ protected <E extends Exception> List<Row> executeDirectives(
String namespace,
List<String> directives,
List<Row> sample,
GrammarWalker.Visitor<E> grammarVisitor) throws DirectiveParseException, E, RecipeException {
GrammarWalker.Visitor<E> grammarVisitor,
Map<String, UserDefinedAction> nullabilityMap)
throws DirectiveParseException, E, RecipeException {

if (directives.isEmpty()) {
return sample;
Expand All @@ -139,8 +144,11 @@ protected <E extends Exception> List<Row> executeDirectives(
new ConfigDirectiveContext(DirectiveConfig.EMPTY));
try (RecipePipelineExecutor executor = new RecipePipelineExecutor(parser,
new ServicePipelineContext(
namespace, ExecutorContext.Environment.SERVICE,
getContext(), TRANSIENT_STORE))) {
namespace,
ExecutorContext.Environment.SERVICE,
getContext(),
TRANSIENT_STORE),
nullabilityMap)) {
List<Row> result = executor.execute(sample);

List<ErrorRecordBase> errors = executor.errors()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ private <E extends Exception> List<Row> executeDirectives(NamespacedId id, List<
// Extract rows from the workspace.
List<Row> rows = fromWorkspace(workspace);
return executeDirectives(id.getNamespace().getName(), directives, sample.apply(rows),
grammarVisitor);
grammarVisitor, null);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception {
namespace,
ExecutorContext.Environment.SERVICE,
systemAppContext,
new DefaultTransientStore()))) {
new DefaultTransientStore()), null)) {
harshdeeppruthi marked this conversation as resolved.
Show resolved Hide resolved
rows = executor.execute(rows);
List<ErrorRecordBase> errors = executor.errors().stream()
.filter(ErrorRecordBase::isShownInWrangler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.cdap.wrangler.proto.workspace.v2.ServiceResponse;
import io.cdap.wrangler.proto.workspace.v2.StageSpec;
import io.cdap.wrangler.proto.workspace.v2.Workspace;
import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail;
import io.cdap.wrangler.proto.workspace.v2.WorkspaceId;
Expand Down Expand Up @@ -169,7 +170,8 @@ public void createWorkspace(HttpServiceRequest request, HttpServiceResponder res
long now = System.currentTimeMillis();
Workspace workspace = Workspace.builder(generateWorkspaceName(wsId, creationRequest.getSampleRequest().getPath()),
wsId.getWorkspaceId())
.setCreatedTimeMillis(now).setUpdatedTimeMillis(now).setSampleSpec(spec).build();
.setCreatedTimeMillis(now).setUpdatedTimeMillis(now)
.setSampleSpec(spec).setNullabilityMap(new HashMap<>()).build();
wsStore.saveWorkspace(wsId, new WorkspaceDetail(workspace, rows));
responder.sendJson(wsId.getWorkspaceId());
});
Expand Down Expand Up @@ -472,6 +474,11 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque

WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId);
UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector();
Map<String, UserDefinedAction> nullabilityMap = executionRequest.getNullabilityMap();
if (!nullabilityMap.isEmpty()) {
//create new workspace object with the new nullabilityMap
changeNullability(nullabilityMap, workspaceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should update the workspace after executing the directives (this is already happening), not before. Otherwise the execution can fail and now there's a partially updated workspace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the updated nullabilityMap before directives are executed.

}
List<Row> result = executeDirectives(ns.getName(), directives, detail,
userDirectivesCollector);
DirectiveExecutionResponse response = generateExecutionResponse(result,
Expand All @@ -484,6 +491,20 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque
return response;
}

private void changeNullability(Map<String, UserDefinedAction> nullabilityMap,
WorkspaceId workspaceId) throws Exception {
try {
Workspace workspace = wsStore.getWorkspace(workspaceId);
Workspace newWorkspace = Workspace.builder(workspace)
.setUpdatedTimeMillis(System.currentTimeMillis())
.setNullabilityMap(nullabilityMap).build();
wsStore.updateWorkspace(workspaceId, newWorkspace);
} catch (Exception e) {
throw new RuntimeException("Error in setting nullabilityMap of columns ", e);
}
}


/**
* Get source specs, contains some hacky way on dealing with the csv parser
*/
Expand Down Expand Up @@ -580,7 +601,7 @@ private <E extends Exception> List<Row> executeLocally(String namespace, List<St
// load the udd
composite.reload(namespace);
return executeDirectives(namespace, directives, new ArrayList<>(detail.getSample()),
grammarVisitor);
grammarVisitor, detail.getWorkspace().getNullabilityMap());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static RecipePipeline pipeline(Class<? extends Directive> directive, Test

String migrate = new MigrateToV2(recipe.toArray()).migrate();
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry);
return new RecipePipelineExecutor(parser, null);
return new RecipePipelineExecutor(parser, null, null);
}

public static RecipeParser parser(Class<? extends Directive> directive, String[] recipe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.cdap.wrangler.parser.NoOpDirectiveContext;
import io.cdap.wrangler.parser.RecipeCompiler;
import io.cdap.wrangler.proto.Contexts;
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction;
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.DirectiveInfo;
import io.cdap.wrangler.registry.DirectiveRegistry;
Expand Down Expand Up @@ -365,7 +366,7 @@ && checkPreconditionNotEmpty(false)) {

try {
// Create the pipeline executor with context being set.
pipeline = new RecipePipelineExecutor(recipe, ctx);
pipeline = new RecipePipelineExecutor(recipe, ctx, null);
} catch (Exception e) {
throw new Exception(String.format("Stage:%s - %s", getContext().getStageName(), e.getMessage()), e);
}
Expand Down
Loading