Skip to content

Commit

Permalink
Refactor to prepare for fixed-width column support. (#219)
Browse files Browse the repository at this point in the history
* Refactor to prepare for fixed-width column support.
This commit should not introduce any behavioral changes.

* Respond to review feedback
  • Loading branch information
kosak authored Nov 5, 2024
1 parent 1afa9f3 commit 1c52d59
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 127 deletions.
109 changes: 23 additions & 86 deletions src/main/java/io/deephaven/csv/reading/CsvReader.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.deephaven.csv.reading;

import io.deephaven.csv.CsvSpecs;
import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.densestorage.DenseStorageReader;
import io.deephaven.csv.densestorage.DenseStorageWriter;
import io.deephaven.csv.parsers.DataType;
import io.deephaven.csv.parsers.Parser;
import io.deephaven.csv.reading.cells.CellGrabber;
import io.deephaven.csv.reading.cells.DelimitedCellGrabber;
import io.deephaven.csv.reading.headers.DelimitedHeaderFinder;
import io.deephaven.csv.sinks.Sink;
import io.deephaven.csv.sinks.SinkFactory;
import io.deephaven.csv.util.*;
Expand Down Expand Up @@ -54,22 +56,29 @@ private CsvReader() {}
* the CsvReader determines what the column type is, it will use the {@link SinkFactory} to create an
* appropriate Sink<T> for the type. Note that the CsvReader might guess wrong, so it might create a
* Sink, partially populate it, and then abandon it. The final set of fully-populated Sinks will be returned
* in in the CsvReader.Result. Thread safety: The {@link SinkFactory} may be invoked concurrently, therefore
* it must be thread safe.
* in the CsvReader.Result. Thread safety: The {@link SinkFactory} may be invoked concurrently, therefore it
* must be thread safe.
* @return A CsvReader.Result containing the column names, the number of columns, and the final set of
* fully-populated Sinks.
*/
public static Result read(final CsvSpecs specs, final InputStream stream, final SinkFactory sinkFactory)
throws CsvReaderException {
return delimitedReadLogic(specs, stream, sinkFactory);
}

private static Result delimitedReadLogic(
final CsvSpecs specs, final InputStream stream, final SinkFactory sinkFactory)
throws CsvReaderException {
// These two have already been validated by CsvSpecs to be 7-bit ASCII.
final byte quoteAsByte = (byte) specs.quote();
final byte delimiterAsByte = (byte) specs.delimiter();
final CellGrabber grabber =
new CellGrabber(stream, quoteAsByte, delimiterAsByte, specs.ignoreSurroundingSpaces(),
new DelimitedCellGrabber(stream, quoteAsByte, delimiterAsByte, specs.ignoreSurroundingSpaces(),
specs.trim());
// For an "out" parameter
final MutableObject<byte[][]> firstDataRowHolder = new MutableObject<>();
final String[] headersTemp = determineHeadersToUse(specs, grabber, firstDataRowHolder);
final String[] headersTemp = DelimitedHeaderFinder.determineHeadersToUse(specs, grabber,
firstDataRowHolder);
final byte[][] firstDataRow = firstDataRowHolder.getValue();
final int numInputCols = headersTemp.length;

Expand All @@ -85,6 +94,14 @@ public static Result read(final CsvSpecs specs, final InputStream stream, final
final int numOutputCols = headersTemp2.length;
final String[] headersToUse = canonicalizeHeaders(specs, headersTemp2);

return commonReadLogic(specs, grabber, firstDataRow, numInputCols, numOutputCols, headersToUse, sinkFactory);
}


private static Result commonReadLogic(final CsvSpecs specs, CellGrabber grabber, byte[][] optionalFirstDataRow,
int numInputCols, int numOutputCols,
String[] headersToUse, final SinkFactory sinkFactory)
throws CsvReaderException {
final String[][] nullValueLiteralsToUse = new String[numOutputCols][];
for (int ii = 0; ii < numOutputCols; ++ii) {
nullValueLiteralsToUse[ii] =
Expand Down Expand Up @@ -120,7 +137,7 @@ public static Result read(final CsvSpecs specs, final InputStream stream, final

// Start the writer.
final Future<Object> numRowsFuture = ecs.submit(() -> ParseInputToDenseStorage.doit(headersToUse,
firstDataRow, grabber, specs, nullValueLiteralsToUse, dsws));
optionalFirstDataRow, grabber, specs, nullValueLiteralsToUse, dsws));

// Start the readers, taking care to not hold a reference to the DenseStorageReader.
final ArrayList<Future<Object>> sinkFutures = new ArrayList<>();
Expand Down Expand Up @@ -199,62 +216,6 @@ private static List<String> calcNullValueLiteralsToUse(final CsvSpecs specs, fin
return specs.nullValueLiterals();
}

/**
* Determine which headers to use. The result comes from either the first row of the file or the user-specified
* overrides.
*/
private static String[] determineHeadersToUse(final CsvSpecs specs,
final CellGrabber grabber, final MutableObject<byte[][]> firstDataRowHolder)
throws CsvReaderException {
String[] headersToUse = null;
if (specs.hasHeaderRow()) {
long skipCount = specs.skipHeaderRows();
byte[][] headerRow;
while (true) {
headerRow = tryReadOneRow(grabber);
if (headerRow == null) {
throw new CsvReaderException(
"Can't proceed because hasHeaders is set but input file is empty");
}
if (skipCount == 0) {
break;
}
--skipCount;
}
headersToUse = Arrays.stream(headerRow).map(String::new).toArray(String[]::new);
}

// Whether or not the input had headers, maybe override with client-specified headers.
if (specs.headers().size() != 0) {
headersToUse = specs.headers().toArray(new String[0]);
}

// If we still have nothing, try to generate synthetic column headers (works only if the file is
// non-empty, because we need to infer the column count).
final byte[][] firstDataRow;
if (headersToUse == null) {
firstDataRow = tryReadOneRow(grabber);
if (firstDataRow == null) {
throw new CsvReaderException(
"Can't proceed because input file is empty and client has not specified headers");
}
headersToUse = new String[firstDataRow.length];
for (int ii = 0; ii < headersToUse.length; ++ii) {
headersToUse[ii] = "Column" + (ii + 1);
}
} else {
firstDataRow = null;
}

// Apply column specific overrides.
for (Map.Entry<Integer, String> entry : specs.headerForIndex().entrySet()) {
headersToUse[entry.getKey()] = entry.getValue();
}

firstDataRowHolder.setValue(firstDataRow);
return headersToUse;
}

private static String[] canonicalizeHeaders(CsvSpecs specs, final String[] headers) throws CsvReaderException {
final String[] legalized = specs.headerLegalizer().apply(headers);
final Set<String> unique = new HashSet<>();
Expand Down Expand Up @@ -286,30 +247,6 @@ private static String[] canonicalizeHeaders(CsvSpecs specs, final String[] heade
throw new CsvReaderException(sb.toString());
}

/**
* Try to read one row from the input. Returns null if the input is empty
*
* @return The first row as a byte[][] or null if the input was exhausted.
*/
private static byte[][] tryReadOneRow(final CellGrabber grabber) throws CsvReaderException {
final List<byte[]> headers = new ArrayList<>();

// Grab the header
final ByteSlice slice = new ByteSlice();
final MutableBoolean lastInRow = new MutableBoolean();
final MutableBoolean endOfInput = new MutableBoolean();
do {
grabber.grabNext(slice, lastInRow, endOfInput);
final byte[] item = new byte[slice.size()];
slice.copyTo(item, 0);
headers.add(item);
} while (!lastInRow.booleanValue());
if (headers.size() == 1 && headers.get(0).length == 0 && endOfInput.booleanValue()) {
return null;
}
return headers.toArray(new byte[0][]);
}

/** Result of {@link #read}. Represents a set of columns. */
public static final class Result implements Iterable<ResultColumn> {
private final long numRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.densestorage.DenseStorageReader;
import io.deephaven.csv.densestorage.DenseStorageWriter;
import io.deephaven.csv.reading.cells.CellGrabber;
import io.deephaven.csv.util.CsvReaderException;
import io.deephaven.csv.util.MutableBoolean;

Expand Down
93 changes: 93 additions & 0 deletions src/main/java/io/deephaven/csv/reading/ReaderUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.deephaven.csv.reading;

import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.tokenization.RangeTests;
import io.deephaven.csv.util.MutableInt;

public class ReaderUtil {
public static String[] makeSyntheticHeaders(int numHeaders) {
final String[] result = new String[numHeaders];
for (int ii = 0; ii < result.length; ++ii) {
result[ii] = "Column" + (ii + 1);
}
return result;
}

/**
* Trim spaces and tabs from the front and back of the slice.
*
* @param cs The slice, modified in-place to have spaces and tabs (if any) removed.
*/
public static void trimSpacesAndTabs(final ByteSlice cs) {
final byte[] data = cs.data();
int begin = cs.begin();
int end = cs.end();
while (begin != end && RangeTests.isSpaceOrTab(data[begin])) {
++begin;
}
while (begin != end && RangeTests.isSpaceOrTab(data[end - 1])) {
--end;
}
cs.reset(data, begin, end);
}

/**
* Get the expected length of a UTF-8 sequence, given its first byte, and its corresponding length in the specified
* units (UTF-16 or UTF-32).
*
* @param firstByte The first byte of the UTF-8 sequence.
* @param numBytes The number of remaining bytes in the input field (including firstByte). If the UTF-8 sequence
* specifies a number of bytes larger than the number of remaining bytes, an exception is thrown.
* @param useUtf32CountingConvention Whether 'charCountResult' should be in units of UTF-32 or UTF-16.
* @param charCountResult The number of UTF-32 or UTF-16 units specified by the UTF-8 character.
* @return The length of the UTF-8 sequence.
*/
public static int getUtf8LengthAndCharLength(
byte firstByte, int numBytes,
boolean useUtf32CountingConvention, MutableInt charCountResult) {
final int utf8Length = getUtf8Length(firstByte);
if (utf8Length > numBytes) {
throw new RuntimeException(String.format(
"The next UTF-8 character needs %d bytes but there are only %d left in the field",
utf8Length, numBytes));
}
final int numChars = useUtf32CountingConvention || utf8Length < 4 ? 1 : 2;
charCountResult.setValue(numChars);
return utf8Length;
}

/**
* Calculate the expected length of a UTF-8 sequence, given its first byte.
*
* @param firstByte The first byte of the sequence.
* @return The length of the sequence, in the range 1..4 inclusive.
*/
private static int getUtf8Length(byte firstByte) {
if ((firstByte & 0x80) == 0) {
// 0xxxxxxx
// 1-byte UTF-8 character aka ASCII.
// Last code point U+007F
return 1;
}
if ((firstByte & 0xE0) == 0xC0) {
// 110xxxxx
// 2-byte UTF-8 character
// Last code point U+07FF
return 2;
}
if ((firstByte & 0xF0) == 0xE0) {
// 1110xxxx
// 3-byte UTF-8 character
// Last code point U+FFFF
return 3;
}
if ((firstByte & 0xF8) == 0xF0) {
// 11110xxx
// 4-byte UTF-8 character. Note: Java encodes all of these in two "char" variables.
// Last code point U+10FFFF
return 4;
}
throw new IllegalStateException(String.format("0x%x is not a valid starting byte for a UTF-8 sequence",
firstByte));
}
}
31 changes: 31 additions & 0 deletions src/main/java/io/deephaven/csv/reading/cells/CellGrabber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.deephaven.csv.reading.cells;

import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.util.CsvReaderException;
import io.deephaven.csv.util.MutableBoolean;

/**
* This class is used to traverse over text from a Reader, understanding both field and line delimiters, as well as the
* CSV quoting convention, and breaking the text into cells for use by the calling code.
*/
public interface CellGrabber {
/**
* Try to grab the next cell from the input, being aware of field delimiters, line delimiters, quoting, and
* trimming.
*
* @param dest The result, as a {@link ByteSlice}. The ByteSlice is invalidated by the next call to grabNext.
* @param lastInRow An out parameter which will be set to true if the cell just read was the last cell in the row,
* otherwise it will be set to false.
* @param endOfInput An out parameter which will be set to true if the cell just read encountered the end of the
* input, otherwise it will be set to false.
*/
void grabNext(final ByteSlice dest, final MutableBoolean lastInRow,
final MutableBoolean endOfInput) throws CsvReaderException;

/**
* Returns the "physical" row number, that is the row number of the input file. This differs from the "logical" row
* number, which is the row number of the CSV data being processed. The difference arises when, due to quotation
* marks, a single CSV row can span multiple lines of input.
*/
int physicalRowNum();
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.deephaven.csv.reading;
package io.deephaven.csv.reading.cells;

import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.containers.GrowableByteBuffer;
import io.deephaven.csv.reading.ReaderUtil;
import io.deephaven.csv.tokenization.RangeTests;
import io.deephaven.csv.util.CsvReaderException;
import io.deephaven.csv.util.MutableBoolean;
Expand All @@ -13,7 +14,7 @@
* This class is used to traverse over text from a Reader, understanding both field and line delimiters, as well as the
* CSV quoting convention, and breaking the text into cells for use by the calling code.
*/
public final class CellGrabber {
public final class DelimitedCellGrabber implements CellGrabber {
/** Size of chunks to read from the {@link InputStream}. */
public static final int BUFFER_SIZE = 65536;
/** The {@link InputStream} for the input. */
Expand Down Expand Up @@ -51,7 +52,7 @@ public final class CellGrabber {
private int physicalRowNum;

/** Constructor. */
public CellGrabber(
public DelimitedCellGrabber(
final InputStream inputStream,
final byte quoteChar,
final byte fieldDelimiter,
Expand All @@ -70,16 +71,7 @@ public CellGrabber(
this.physicalRowNum = 0;
}

/**
* Try to grab the next cell from the input, being aware of field delimiters, line delimiters, quoting, and
* trimming.
*
* @param dest The result, as a {@link ByteSlice}. The ByteSlice is invalidated by the next call to grabNext.
* @param lastInRow An out parameter which will be set to true if the cell just read was the last cell in the row,
* otherwise it will be set to false.
* @param endOfInput An out parameter which will be set to true if the cell just read encountered the end of the
* input, otherwise it will be set to false.
*/
@Override
public void grabNext(final ByteSlice dest, final MutableBoolean lastInRow,
final MutableBoolean endOfInput) throws CsvReaderException {
spillBuffer.clear();
Expand All @@ -94,12 +86,12 @@ public void grabNext(final ByteSlice dest, final MutableBoolean lastInRow,
++offset;
processQuotedMode(dest, lastInRow, endOfInput);
if (trim) {
trimWhitespace(dest);
ReaderUtil.trimSpacesAndTabs(dest);
}
} else {
processUnquotedMode(dest, lastInRow, endOfInput);
if (ignoreSurroundingSpaces) {
trimWhitespace(dest);
ReaderUtil.trimSpacesAndTabs(dest);
}
}
}
Expand Down Expand Up @@ -315,22 +307,4 @@ private void finish(final ByteSlice dest) {
public int physicalRowNum() {
return physicalRowNum;
}

/**
* Trim whitespace from the front and back of the slice.
*
* @param cs The slice, modified in-place to have whitespace (if any) removed.
*/
private static void trimWhitespace(final ByteSlice cs) {
final byte[] data = cs.data();
int begin = cs.begin();
int end = cs.end();
while (begin != end && RangeTests.isSpaceOrTab(data[begin])) {
++begin;
}
while (begin != end && RangeTests.isSpaceOrTab(data[end - 1])) {
--end;
}
cs.reset(data, begin, end);
}
}
Loading

0 comments on commit 1c52d59

Please sign in to comment.