Skip to content

Commit

Permalink
[PECO-1752] Refactor CloudFetch downloader (#234)
Browse files Browse the repository at this point in the history
[PECO-1752]

This PR is an attempt to fix a CloudFetch error "row number N is not
contained in any arrow batch".

See also databricks/databricks-sql-python#405 - basically, the same
issue, the same root cause, similar solution.

#### The problem

In current implementation, all the links from a single TRowSet are added
to a concurrent thread (goroutine) pool. The pool downloads them in a
random order (all the tasks have the same priority and as a result -
same chance to be executed first). To maintain the order of results,
`startRowOffset`/`rowCount` fields from each CloudFetch link are used:
library keeps track of the current row number, and use it to pick the
right CloudFetch link (looking for the file where the current row is
within [startRowOffset; startRowOffset + rowCount]).

This solution has several caveats. First of all, library allows to fetch
data only from beginning to the end. With a concurrent thread pool, you
never know which file will be downloaded first. In the worst case, while
the user is waiting for the very first file, the library may download
all the other ones and keep them in memory because the user may need
them in future. This increases the latency (on average it will be okay,
but we have no control over it), and also memory consumption.

Another problem with this approach is that if any of the files cannot be
downloaded - there is no need to download the remaining files, the user
won’t be able to process them anyway. But because files are downloaded
in arbitrary order - nobody knows how many files will be downloaded
before the user reaches the failed one.

Also, seems that error handling wasn't done quite right, but that part
of code was a bit unclear to me. Anyway, with this fix all the errors
are properly handled and propagated to user when needed.

#### The solution

This PR changes CloudFetch downloader to use a queue. Downloader keeps a
list of pending links (not scheduled), and current tasks. Number of
tasks is limited, so new files are scheduled only when previous task is
completed and extracted from queue. As user requests next files,
downloader will pick the first task from the queue, and schedule the new
one to run in background - to keep the queue full. Then, downloader will
wait for the task it picked from the queue, and then return it to user.
Tasks are still running in in parallel in background. Also, each task
itself is reponsible for handling errors (e.g. retry failed downloads),
so when task completes - it is either eventually successfull, or failed
after all possible retries.

With this approach, the proper order of files is automatically assured.
All errors are either handled in downloader or propagated to user. If
some file cannot be downloaded due to error - library will not download
the remaining ones (like it did previously). Because new files are
downloaded only when user consumes previous ones - library will not keep
the whole dataset in memory.

[PECO-1752]:
https://databricks.atlassian.net/browse/PECO-1752?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

---------

Signed-off-by: Levko Kravets <levko.ne@gmail.com>
  • Loading branch information
kravets-levko authored Aug 23, 2024
1 parent 93222ab commit f70e5a5
Show file tree
Hide file tree
Showing 9 changed files with 573 additions and 714 deletions.
164 changes: 0 additions & 164 deletions internal/fetcher/fetcher.go

This file was deleted.

123 changes: 0 additions & 123 deletions internal/fetcher/fetcher_test.go

This file was deleted.

23 changes: 2 additions & 21 deletions internal/rows/arrowbased/arrowRecordIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,29 +163,10 @@ func (ri *arrowRecordIterator) getBatchIterator() error {

// Create a new batch iterator from a page of the result set
func (ri *arrowRecordIterator) newBatchIterator(fr *cli_service.TFetchResultsResp) (BatchIterator, error) {
bl, err := ri.newBatchLoader(fr)
if err != nil {
return nil, err
}

bi, err := NewBatchIterator(bl)

return bi, err
}

// Create a new batch loader from a page of the result set
func (ri *arrowRecordIterator) newBatchLoader(fr *cli_service.TFetchResultsResp) (BatchLoader, error) {
rowSet := fr.Results
var bl BatchLoader
var err error
if len(rowSet.ResultLinks) > 0 {
bl, err = NewCloudBatchLoader(ri.ctx, rowSet.ResultLinks, rowSet.StartRowOffset, &ri.cfg)
return NewCloudBatchIterator(ri.ctx, rowSet.ResultLinks, rowSet.StartRowOffset, &ri.cfg)
} else {
bl, err = NewLocalBatchLoader(ri.ctx, rowSet.ArrowBatches, rowSet.StartRowOffset, ri.arrowSchemaBytes, &ri.cfg)
return NewLocalBatchIterator(ri.ctx, rowSet.ArrowBatches, rowSet.StartRowOffset, ri.arrowSchemaBytes, &ri.cfg)
}
if err != nil {
return nil, err
}

return bl, nil
}
Loading

0 comments on commit f70e5a5

Please sign in to comment.