Skip to content

Commit

Permalink
add lastRowDs and updateLastRow() to allow separation of last row fro…
Browse files Browse the repository at this point in the history
…m the rest of the dataframe (#14)
  • Loading branch information
eemhu authored May 2, 2024
1 parent cd1cdc8 commit 9603a06
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
public final class BatchCollect extends SortOperation {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchCollect.class);
private Dataset<Row> savedDs = null;
private Dataset<Row> lastRowDs = null;
private final String sortColumn;
private final int numberOfRows;
private StructType inputSchema;
Expand Down Expand Up @@ -87,14 +88,21 @@ public BatchCollect(String sortColumn, int numberOfRows, List<SortByClause> list
* @return sorted dataset
*/
public Dataset<Row> call(Dataset<Row> df, Long id, boolean skipLimiting) {
Dataset<Row> rv;
if (skipLimiting) {
this.processAggregated(df);
}
else {
this.collect(df, id);
}

return this.savedDs;
if (this.lastRowDs != null) {
rv = this.savedDs.union(this.lastRowDs);
} else {
rv = this.savedDs;
}

return rv;
}

public void collect(Dataset<Row> batchDF, Long batchId) {
Expand Down Expand Up @@ -158,16 +166,27 @@ private Dataset<Row> orderDataset(Dataset<Row> ds) {

// TODO: Remove
public List<Row> getCollected() {
return this.savedDs.collectAsList();
return getCollectedAsDataframe().collectAsList();
}

public Dataset<Row> getCollectedAsDataframe() {
return this.savedDs;
Dataset<Row> rv;
if (this.lastRowDs != null) {
rv = this.savedDs.union(this.lastRowDs);
} else {
rv = this.savedDs;
}
return rv;
}

public void clear() {
LOGGER.info("dpf_02 cleared");
this.savedDs = null;
this.lastRowDs = null;
this.inputSchema = null;
}

public void updateLastRow(Dataset<Row> lastRow) {
this.lastRowDs = lastRow;
}
}

0 comments on commit 9603a06

Please sign in to comment.