Skip to content

Commit

Permalink
#282: Fix sourcetype IN not working (#383)
Browse files Browse the repository at this point in the history
* fix sourcetype IN not working, cleanup searchQualifier in LogicalStatementCatalyst/XML, enable sourcetype IN test in logicalOperationTest

* introduce OrColumn object with tests

* run spotless

* add archive(xml) and spark query tests for sourcetype IN

* applied spotless
  • Loading branch information
eemhu authored Oct 28, 2024
1 parent 76cda4b commit 68dba59
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 55 deletions.
92 changes: 92 additions & 0 deletions src/main/java/com/teragrep/pth10/ast/OrColumn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10.ast;

import org.apache.spark.sql.Column;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

public final class OrColumn {

private final List<Column> columns;

public OrColumn(Column ... columns) {
this(Arrays.asList(columns));
}

public OrColumn(List<Column> columns) {
this.columns = columns;
}

public Column column() {
if (columns.isEmpty()) {
throw new IllegalStateException("No columns found");
}

Column rv = columns.get(0);
for (Column current : columns.subList(1, columns.size())) {
rv = rv.or(current);
}
return rv;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
OrColumn orColumn = (OrColumn) o;
return Objects.equals(columns, orColumn.columns);
}

@Override
public int hashCode() {
return Objects.hashCode(columns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -424,23 +424,17 @@ public Node visitSubindexStatement(DPLParser.SubindexStatementContext ctx) {
*/
@Override
public Node visitSearchQualifier(DPLParser.SearchQualifierContext ctx) {
// LOGGER.info("visitSearchQualifier: ");
Column sQualifier = null;
String value = null;
Column sQualifier;
String value;

TerminalNode left = (TerminalNode) ctx.getChild(0);
TerminalNode operation = (TerminalNode) ctx.getChild(1);

List<String> listOfIndices = new ArrayList<>();

// Default clause used in WHERE-part
String columnName = null;
final String columnName;
// HOST and SOURCETYPE qualifier stored as additional list and used for Kafka content filtering
if (left.getSymbol().getType() == DPLLexer.INDEX_IN) {
value = "";
ctx.indexStringType().forEach(str -> {
listOfIndices.add(new UnquotedText(new TextString(str.getText().toLowerCase())).read());
});
columnName = "index";
}
else if (left.getSymbol().getType() == DPLLexer.HOST) {
Expand All @@ -467,16 +461,24 @@ else if (left.getSymbol().getType() == DPLLexer.SOURCETYPE) {

}
else if (left.getSymbol().getType() == DPLLexer.INDEX_IN) {
for (String index : listOfIndices) {
String rlikeStatement = glob2rlike(index);
if (sQualifier == null) {
sQualifier = col.rlike(rlikeStatement);
}
else {
sQualifier = sQualifier.or(col.rlike(rlikeStatement));
}

}
OrColumn orColumn = new OrColumn(
ctx
.indexStringType()
.stream()
.map(st -> col.rlike(glob2rlike(new UnquotedText(new TextString(st.getText().toLowerCase())).read()))).collect(Collectors.toList())
);

sQualifier = orColumn.column();
}
else if (left.getSymbol().getType() == DPLLexer.SOURCETYPE && operation.getSymbol().getType() == DPLLexer.IN) {
OrColumn orColumn = new OrColumn(
ctx
.stringType()
.stream()
.map(st -> col.rlike(glob2rlike(new UnquotedText(new TextString(st.getText().toLowerCase())).read()))).collect(Collectors.toList())
);

sQualifier = orColumn.column();
}
else {
String rlikeStatement = glob2rlike(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,10 @@ public Node visitSublogicalStatement(DPLParser.SublogicalStatementContext ctx) {
*/
@Override
public Node visitSearchQualifier(DPLParser.SearchQualifierContext ctx) {
// LOGGER.info("visitSearchQualifier: ");
Token comparisonToken;
Element el = null;
String value = null;
List<String> listOfIndices = new ArrayList<>();
String field;
final List<String> values = new ArrayList<>();

String qualifier;
TerminalNode left = (TerminalNode) ctx.getChild(0);
TerminalNode operation = (TerminalNode) ctx.getChild(1);

Expand All @@ -400,52 +397,56 @@ public Node visitSearchQualifier(DPLParser.SearchQualifierContext ctx) {
}

// Default clause used in WHERE-part
qualifier = left.getText() + " " + operation.getSymbol().getText() + " ";
final String qualifier = left.getText() + " " + operation.getSymbol().getText() + " ";

// HOST and SOURCETYPE qualifier stored as additional list and used for Kafka content filtering
if (left.getSymbol().getType() == DPLLexer.INDEX_IN) {
value = "";
ctx.indexStringType().forEach(str -> {
listOfIndices.add(new UnquotedText(new TextString(str.getText().toLowerCase())).read());
values.add(new UnquotedText(new TextString(str.getText().toLowerCase())).read());
});
comparisonToken = new Token(Type.EQUALS); // INDEX IN is equals
el = doc.createElement("index");
field = "index";
}
else if (left.getSymbol().getType() == DPLLexer.HOST) {
value = new UnquotedText(new TextString(ctx.getChild(2).getText().toLowerCase())).read();
el = doc.createElement("host");
values.add(new UnquotedText(new TextString(ctx.getChild(2).getText().toLowerCase())).read());
field = "host";
}
else if (left.getSymbol().getType() == DPLLexer.SOURCETYPE && operation.getSymbol().getType() == DPLLexer.IN) {
ctx.stringType().forEach(str -> {
values.add(new UnquotedText(new TextString(str.getText().toLowerCase())).read());
});
comparisonToken = new Token(Type.EQUALS); // SOURCETYPE IN is equals
field = "sourcetype";
}
else if (left.getSymbol().getType() == DPLLexer.SOURCETYPE) {
value = new UnquotedText(new TextString(ctx.getChild(2).getText().toLowerCase())).read();
el = doc.createElement("sourcetype");
LOGGER.debug("qualifier=<{}> sourcetype=<{}>", qualifier, value);
values.add(new UnquotedText(new TextString(ctx.getChild(2).getText().toLowerCase())).read());
field = "sourcetype";
LOGGER.debug("qualifier=<{}> sourcetype=<{}>", qualifier, values.get(0));
}
else {
// other column=value qualifier
value = new UnquotedText(new TextString(ctx.getChild(2).getText().toLowerCase())).read();
el = doc.createElement(ctx.getChild(0).getText().toLowerCase());
values.add(new UnquotedText(new TextString(ctx.getChild(2).getText().toLowerCase())).read());
field = ctx.getChild(0).getText().toLowerCase();
LOGGER
.debug("custom qualifier: field=<{}> = value=<{}>", ctx.getChild(0).getText(), ctx.getChild(2).getText());
}

if (listOfIndices.isEmpty()) {
Element el;
if (values.size() < 2) {
el = doc.createElement(field);
el.setAttribute("operation", comparisonToken.toString());
el.setAttribute("value", value);
el.setAttribute("value", values.get(0));
}
else {
// build xml string for index IN ( 1 2 )
el = doc.createElement("OR");

for (int i = 0; i < listOfIndices.size(); i++) {
Element indexElem = doc.createElement("index");
for (int i = 0; i < values.size(); i++) {
Element indexElem = doc.createElement(field);
indexElem.setAttribute("operation", comparisonToken.toString());
indexElem.setAttribute("value", listOfIndices.get(i));
indexElem.setAttribute("value", values.get(i));

if (listOfIndices.size() == 1) {
el = indexElem;
break;
}
else if (i < 2) {
if (i < 2) {
el.appendChild(indexElem);
}
else {
Expand All @@ -458,8 +459,7 @@ else if (i < 2) {
}
}

Node rv = new ElementNode(el);
return rv;
return new ElementNode(el);
}

/**
Expand Down
90 changes: 90 additions & 0 deletions src/test/java/com/teragrep/pth10/OrColumnTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10;

import com.teragrep.pth10.ast.OrColumn;
import org.apache.spark.sql.Column;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class OrColumnTest {

@Test
void testOrColumn() {
OrColumn orColumn = new OrColumn(new Column("a"), new Column("b"));

Assertions.assertEquals(new Column("a").or(new Column("b")), orColumn.column());
}

@Test
void testOrColumnWithOneColumn() {
OrColumn orColumn = new OrColumn(new Column("a"));

Assertions.assertEquals(new Column("a"), orColumn.column());
}

@Test
void testOrColumnWithNoColumn() {
OrColumn orColumn = new OrColumn();

IllegalStateException ise = Assertions.assertThrows(IllegalStateException.class, orColumn::column);
Assertions.assertEquals("No columns found", ise.getMessage());
}

@Test
void testEquals() {
OrColumn orColumn = new OrColumn();
OrColumn orColumn2 = new OrColumn();
Assertions.assertEquals(orColumn, orColumn2);
}

@Test
void testNotEquals() {
OrColumn orColumn = new OrColumn();
OrColumn orColumn2 = new OrColumn(new Column("a"));
Assertions.assertNotEquals(orColumn, orColumn2);
}
}
Loading

0 comments on commit 68dba59

Please sign in to comment.