-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance LOOKUP JOIN csv-spec tests to cover more cases and fix several bugs found #117843
Conversation
Adds several more tests to lookup-join.csv-spec, and fixes the following bugs: * FieldCaps on right hand side should ignore fieldNames method and just use "*" because we don’t specify the right hand side fields at all in LOOKUP JOIN (presumably we will in future, and then we can change this). * Stop using the lookup index in the ComputeService (so we don’t get both index data coming in from the left, and other weird behaviour). * Ignore failing SearchStats checks on fields from the right hand side in the logical planner (so it does not plan EVAL field = null for all right hand fields). This should be fixed properly with the correct updates to TransportSearchShardsAction (or rather to making multiple use of that for each branch of the execution model).
Pinging @elastic/es-analytical-engine (Team:Analytics) |
And re-enable one disabled test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. There are a number of culprits but the PR nicely avoids them while adding minimal changes.
@@ -102,15 +101,18 @@ public PhysicalPlan apply(PhysicalPlan plan) { | |||
|
|||
private static Set<Attribute> missingAttributes(PhysicalPlan p) { | |||
var missing = new LinkedHashSet<Attribute>(); | |||
var inputSet = p.inputSet(); | |||
var input = new AttributeSet(p.inputSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why make another copy?
|
||
// TODO: We need to extract whatever fields are missing from the left hand side. | ||
// skip the lookup join since the right side is always materialized and a projection | ||
// For LOOKUP JOIN we only need field-extraction on left fields used to match, since the right side is always materialized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FTR: we want to get field extraction on the right side as well at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field extraction is implicitly done inside the LookupFromIndexService
. I agree it'd be nicer to make the field extraction visible in the physical plan; currently, that'd have no benefit, though, because we cannot really materialize fields late - all looked up fields need to be extracted inside the LookupFromIndexService
.
It'd require a fundamental change of the execution to extract additional fields after the execution of the lookup join.
private Set<String> findLookupIndexNames(PhysicalPlan physicalPlan) { | ||
Set<String> lookupIndexNames = new HashSet<>(); | ||
physicalPlan.forEachDown( | ||
LookupJoinExec.class, | ||
lookupJoinExec -> lookupJoinExec.lookup().forEachDown(EsQueryExec.class, es -> lookupIndexNames.add(es.index().name())) | ||
); | ||
physicalPlan.forEachDown( | ||
LookupJoinExec.class, | ||
lookupJoinExec -> lookupJoinExec.lookup() | ||
.forEachDown( | ||
FragmentExec.class, | ||
frag -> frag.fragment().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name())) | ||
) | ||
); | ||
// TODO this only works for LEFT join, so we still need to support RIGHT join | ||
physicalPlan.forEachDown( | ||
FragmentExec.class, | ||
fragmentExec -> fragmentExec.fragment() | ||
.forEachDown( | ||
Join.class, | ||
join -> join.right().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name())) | ||
) | ||
); | ||
return lookupIndexNames; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combine the iterations into one.
💔 Backport failed
You can use sqren/backport to manually backport by running |
return termQueryList(fieldType, context, inputBlock, inputDataType); | ||
} | ||
|
||
private static void validateTypes(DataType inputDataType, MappedFieldType fieldType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not so much a bug-fix as early error generation. This could be made less strict going forward, but we thought to start strict and then open later if appropriate.
|
||
for (NamedExpression projection : projections) { | ||
// Do not use the attribute name, this can deviate from the field name for union types. | ||
if (projection instanceof FieldAttribute f && stats.exists(f.fieldName()) == false) { | ||
if (projection instanceof FieldAttribute f && stats.exists(f.fieldName()) == false && joinAttributes.contains(f) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a temporary fix to the fact that SearchStats can currently only contain all stats for the main FROM index
part of the query. We need a full-stack change of context to support multiple SearchStats.
@@ -117,12 +119,17 @@ public static String[] planOriginalIndices(PhysicalPlan plan) { | |||
var indices = new LinkedHashSet<String>(); | |||
plan.forEachUp( | |||
FragmentExec.class, | |||
f -> f.fragment() | |||
.forEachUp(EsRelation.class, r -> indices.addAll(asList(Strings.commaDelimitedListToStringArray(r.index().name())))) | |||
f -> f.fragment().forEachUp(EsRelation.class, r -> addOriginalIndexIfNotLookup(indices, r.index())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of the fix to remove join indexes from the left-hand-side of the join (ie. stop mixing left and right)
@@ -160,9 +165,11 @@ public void execute( | |||
Map<String, OriginalIndices> clusterToConcreteIndices = transportService.getRemoteClusterService() | |||
.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planConcreteIndices(physicalPlan).toArray(String[]::new)); | |||
QueryPragmas queryPragmas = configuration.pragmas(); | |||
Set<String> lookupIndexNames = findLookupIndexNames(physicalPlan); | |||
Set<String> concreteIndexNames = selectConcreteIndices(clusterToConcreteIndices, lookupIndexNames); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of the fix to remove join indexes from the left-hand-side of the join (ie. stop mixing left and right)
@@ -313,7 +313,7 @@ private <T> void preAnalyze( | |||
// First resolve the lookup indices, then the main indices | |||
preAnalyzeLookupIndices( | |||
preAnalysis.lookupIndices, | |||
fieldNames, | |||
Set.of("*"), // Current LOOKUP JOIN syntax does not allow for field selection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Temporary fix to work around a bug in fieldNames for lookup-join when there are aliases before, and keep after the lookup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @craigtaverner , this is a very, very nice PR.
I added some late comments to have a reference for follow-ups.
x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec
Show resolved
Hide resolved
|
||
FROM sample_data | ||
| LOOKUP JOIN message_types_lookup ON message | ||
| KEEP @timestamp, client_ip, event_duration, message, type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could also try a different order/subset of columns for good measure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, good idea. I did have issues with column ordering in the layout, so...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll open a PR with more tests.
ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" | ||
| EVAL client_ip = client_ip::keyword | ||
| LOOKUP JOIN clientips_lookup ON client_ip | ||
| KEEP left, client_ip, right, env |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here.
validateTypes(request.inputDataType, fieldType); | ||
return termQueryList(fieldType, context, inputBlock, inputDataType); | ||
} | ||
|
||
private static void validateTypes(DataType inputDataType, MappedFieldType fieldType) { | ||
// TODO: consider supporting implicit type conversion as done in ENRICH for some types | ||
if (fieldType.typeName().equals(inputDataType.typeName()) == false) { | ||
throw new EsqlIllegalArgumentException( | ||
"LOOKUP JOIN match and input types are incompatible: match[" + fieldType.typeName() + "], input[" + inputDataType + "]" | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should perform this kind of validation during query planning and should return 400 not 500.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the same is true for ENRICH. And for ENRICH we have both validation during planning and at this point, and the planning one is less comprehensive because less is known at that point. We should check what is known for JOIN; and if we can move this entirely to planning.
@@ -565,6 +565,7 @@ private PhysicalOperation planHashJoin(HashJoinExec join, LocalExecutionPlannerC | |||
|
|||
private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlannerContext context) { | |||
PhysicalOperation source = plan(join.left(), context); | |||
// TODO: The source builder includes incoming fields including the ones we're going to drop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's indeed weird but I don't think it's an incorrect layout. The physical operation represented by the LookupJoinExec
actually can only append blocks - this means that physically, pages will still contain shadowed blocks until we pass through a project operator that actually strips them.
It's the same for Eval
and Enrich
. The physical operators usually do not drop columns.
The fact that layouts are based on name ids instead of names makes it so this isn't incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment was added very early on during discussions with you. We added it as a reminder. Could it be that it went stale immediately, since you did your fixes? This should be verified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's always been stale, I just didn't understand the physical operators enough at the time :D
If you look here, you can see that the corresponding operator will only ever append blocks from the right.
I thought the layout was wrong because it would not take into account shadowing. But that's actually fine! Shadowing is a logical concept, the physical operators don't care. They only care about channels and which data is present in which. And for this, the layout constructed here appears to be very correct: it has the channels from the left hand side and additionally more channels slapped onto it from the right, containing the looked up fields.
.../org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java
Show resolved
Hide resolved
|
||
// TODO: We need to extract whatever fields are missing from the left hand side. | ||
// skip the lookup join since the right side is always materialized and a projection | ||
// For LOOKUP JOIN we only need field-extraction on left fields used to match, since the right side is always materialized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field extraction is implicitly done inside the LookupFromIndexService
. I agree it'd be nicer to make the field extraction visible in the physical plan; currently, that'd have no benefit, though, because we cannot really materialize fields late - all looked up fields need to be extracted inside the LookupFromIndexService
.
It'd require a fundamental change of the execution to extract additional fields after the execution of the lookup join.
|
||
// TODO: We need to extract whatever fields are missing from the left hand side. | ||
// skip the lookup join since the right side is always materialized and a projection | ||
// For LOOKUP JOIN we only need field-extraction on left fields used to match, since the right side is always materialized | ||
if (p instanceof LookupJoinExec join) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, I think this can be simplified by just using p.references
instead of having a special case for lookup joins, and otherwise doing something on p.forEacExpression(...)
.
@@ -313,7 +313,7 @@ private <T> void preAnalyze( | |||
// First resolve the lookup indices, then the main indices | |||
preAnalyzeLookupIndices( | |||
preAnalysis.lookupIndices, | |||
fieldNames, | |||
Set.of("*"), // Current LOOKUP JOIN syntax does not allow for field selection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment is insufficient; I believe this is a good hack but should be improved later; in a query like
FROM idx | LOOKUP JOIN lookup_idx ON lookup_field | KEEP another_lookup_field
we do not have to ask for all existing field names. We just need another_lookup_field
.
In fact, I think this will make it so that we ask the LookupFromIndexService
to fetch all fields from the index, meaning that the performance of this operation will depend on the number of fields in the lookup index. In scenarios where that index has, say, 6000 fields (not completely unheard of), that may be a serious performance drag and could cause memory issues.
To be more precise, I think we'll have to augmentfieldNames
. We should be able to walk the tree from the top, stopping at the first LOOKUP JOIN
we see - and then determining that the missing fields so far are the only candidates that may come from the lookup index. This means that the output of fieldNames
shouldn't be a single Set<String>
- but, instead, a Set<String>
for the main index and a List<Set<String>>
corresponding to the missing fields we might obtain from the lookup indices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment in the followup work is:
EsqlSession.fieldNames does not handle lookup references that are also mentioned in aliases (erases them)
So we want to fix this bug, and I have a reasonable idea how.
|
||
private Set<String> findLookupIndexNames(PhysicalPlan physicalPlan) { | ||
Set<String> lookupIndexNames = new HashSet<>(); | ||
// When planning JOIN on the coordinator node: "LookupJoinExec.lookup()->FragmentExec.fragment()->EsRelation.index()" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super useful comments!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually ran a test that checked which of the 19 queries hit which of these two patterns, just to be sure, and the trend was pretty clear.
Adds several more tests to lookup-join.csv-spec, and fixes the following bugs:
Fixes #117702