-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Partially supported directives #655
base: relational-directives
Are you sure you want to change the base?
Changes from 5 commits
a6d5497
ea2547c
830346c
4c3b9f2
c0d805f
c08cdd8
900bebd
0dc5075
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,11 @@ | |
import io.cdap.cdap.api.annotation.Description; | ||
import io.cdap.cdap.api.annotation.Name; | ||
import io.cdap.cdap.api.annotation.Plugin; | ||
import io.cdap.cdap.etl.api.relational.Expression; | ||
import io.cdap.cdap.etl.api.relational.ExpressionFactory; | ||
import io.cdap.cdap.etl.api.relational.InvalidRelation; | ||
import io.cdap.cdap.etl.api.relational.Relation; | ||
import io.cdap.cdap.etl.api.relational.RelationalTranformContext; | ||
import io.cdap.wrangler.api.Arguments; | ||
import io.cdap.wrangler.api.Directive; | ||
import io.cdap.wrangler.api.DirectiveExecutionException; | ||
|
@@ -30,8 +35,12 @@ | |
import io.cdap.wrangler.api.lineage.Many; | ||
import io.cdap.wrangler.api.lineage.Mutation; | ||
import io.cdap.wrangler.api.parser.UsageDefinition; | ||
import io.cdap.wrangler.utils.SqlExpressionGenerator; | ||
|
||
import java.util.Collection; | ||
import java.util.LinkedHashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* A directive for cleanses columns names. | ||
|
@@ -93,4 +102,30 @@ public Mutation lineage() { | |
.all(Many.of()) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public Relation transform(RelationalTranformContext relationalTranformContext, | ||
Relation relation) { | ||
java.util.Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator | ||
.getExpressionFactory(relationalTranformContext); | ||
if (!expressionFactory.isPresent()) { | ||
return new InvalidRelation("Cannot find an Expression Factory"); | ||
} | ||
List<String> columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); | ||
Map<String, Expression> colmap = generateCleanseColumnMap(columnNames, expressionFactory.get()); | ||
return relation.select(colmap); | ||
} | ||
|
||
public static Map<String, Expression> generateCleanseColumnMap(Collection columns, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make private |
||
ExpressionFactory<String> factory) { | ||
Map<String, Expression> columnExpMap = new LinkedHashMap<>(); | ||
columns.forEach((colName)-> columnExpMap.put(String | ||
.format(colName | ||
.toString() | ||
.toLowerCase() | ||
.replaceAll("[^a-zA-Z0-9_]", "_")), factory | ||
.compile(colName.toString()))); | ||
return columnExpMap; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,11 @@ | |
import io.cdap.cdap.api.annotation.Description; | ||
import io.cdap.cdap.api.annotation.Name; | ||
import io.cdap.cdap.api.annotation.Plugin; | ||
import io.cdap.cdap.etl.api.relational.Expression; | ||
import io.cdap.cdap.etl.api.relational.ExpressionFactory; | ||
import io.cdap.cdap.etl.api.relational.InvalidRelation; | ||
import io.cdap.cdap.etl.api.relational.Relation; | ||
import io.cdap.cdap.etl.api.relational.RelationalTranformContext; | ||
import io.cdap.wrangler.api.Arguments; | ||
import io.cdap.wrangler.api.Directive; | ||
import io.cdap.wrangler.api.DirectiveExecutionException; | ||
|
@@ -32,9 +37,15 @@ | |
import io.cdap.wrangler.api.parser.ColumnNameList; | ||
import io.cdap.wrangler.api.parser.TokenType; | ||
import io.cdap.wrangler.api.parser.UsageDefinition; | ||
import io.cdap.wrangler.utils.SqlExpressionGenerator; | ||
|
||
import java.util.ArrayList; | ||
import java.util.LinkedHashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
|
||
/** | ||
* A directive for setting the columns obtained from wrangling. | ||
|
@@ -90,5 +101,27 @@ public Mutation lineage() { | |
.generate(Many.of(columns)) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public Relation transform(RelationalTranformContext relationalTranformContext, | ||
Relation relation) { | ||
Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator | ||
.getExpressionFactory(relationalTranformContext); | ||
if (!expressionFactory.isPresent()) { | ||
return new InvalidRelation("Cannot find an Expression Factory"); | ||
} | ||
List<String> columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); | ||
Map<String, Expression> columnExpMap = new LinkedHashMap<>(); | ||
|
||
IntStream.range(0, Math.min(columnNames.size(), columns.size())) | ||
.forEach(i -> columnExpMap.put(columns.get(i), expressionFactory.get().compile(columnNames.get(i)))); | ||
|
||
if (columnNames.size() > columns.size()) { | ||
IntStream.range(columns.size(), columnNames.size()) | ||
.forEach(i -> columnExpMap.put(columnNames.get(i), expressionFactory.get().compile(columnNames.get(i)))); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can move to a function with a simple for loop |
||
return relation.select(columnExpMap); | ||
} | ||
|
||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,10 @@ | |
import io.cdap.cdap.api.annotation.Description; | ||
import io.cdap.cdap.api.annotation.Name; | ||
import io.cdap.cdap.api.annotation.Plugin; | ||
import io.cdap.cdap.etl.api.relational.ExpressionFactory; | ||
import io.cdap.cdap.etl.api.relational.InvalidRelation; | ||
import io.cdap.cdap.etl.api.relational.Relation; | ||
import io.cdap.cdap.etl.api.relational.RelationalTranformContext; | ||
import io.cdap.wrangler.api.Arguments; | ||
import io.cdap.wrangler.api.Directive; | ||
import io.cdap.wrangler.api.DirectiveExecutionException; | ||
|
@@ -37,6 +41,7 @@ | |
import io.cdap.wrangler.expression.ELContext; | ||
import io.cdap.wrangler.expression.ELException; | ||
import io.cdap.wrangler.expression.ELResult; | ||
import io.cdap.wrangler.utils.SqlExpressionGenerator; | ||
|
||
import java.util.List; | ||
|
||
|
@@ -116,4 +121,20 @@ public List<EntityCountMetric> getCountMetrics() { | |
EntityCountMetric jexlCategoryMetric = getJexlCategoryMetric(el.getScriptParsedText()); | ||
return (jexlCategoryMetric == null) ? null : ImmutableList.of(jexlCategoryMetric); | ||
} | ||
|
||
@Override | ||
public Relation transform(RelationalTranformContext relationalTranformContext, | ||
Relation relation) { | ||
java.util.Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator | ||
.getExpressionFactory(relationalTranformContext); | ||
if (!expressionFactory.isPresent()) { | ||
return new InvalidRelation("Cannot find an Expression Factory"); | ||
} | ||
|
||
return relation.setColumn("tempColumn", expressionFactory.get().compile( | ||
String.format("if(%s, raise_error(\"Condition '%s' evaluated to true. " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can move this to a variable |
||
"Terminating processing.\"), %s)", el.getScriptParsedText(), | ||
el.getScriptParsedText(), el.getScriptParsedText()))); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this method gets the list of input columns from the transform context, which is not really related to "generating a SQL expression". I would either suggest keeping this in a different util class or you can just rename the method to getColumnList