Skip to content

Commit

Permalink
Fix AsyncIterator race condition when first element in source raises (#…
Browse files Browse the repository at this point in the history
…74)

* Fix AsyncIterator race condition when first element in source raises
  • Loading branch information
clintval authored Dec 27, 2021
1 parent b851fd3 commit 86d8ca6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,23 @@ class AsyncIterator[T](private val source: Iterator[T], bufferSize: Option[Int]

// Get the next item, or wait until the underlying thread is done and there are no more items in the queue
while (buffer.isEmpty && !(this.done && this.queue.isEmpty)) {
checkAndRaise() // check if hte underlying thread raised an exception
checkAndRaise() // check if the underlying thread raised an exception
tryAndModifyInterruptedException("Interrupted waiting on taking from the queue.") {
buffer = Option(this.queue.poll(50, TimeUnit.MILLISECONDS))
}
}

// Did we get an item from the buffer?
// If there are no more elements in the source iterator then await the signal for final completion of the thread's
// execution work. We must await the finished countdown latch first because a race condition may occur when the
// source iterator raises an exception but we have not yet had a chance to save the exception message before
// finishing the final call to `hasNext()`. Blocking and awaiting the final countdown latch signal guarantees
// we will see the exception message if it is present. If it is present, then we get a final chance to re-raise it.
// If we did not handle exceptions this way, then the iterator may be silently cut short and data truncated.
// Issue: https://github.com/fulcrumgenomics/commons/pull/74
if (buffer.isEmpty) {
awaitDone()
checkAndRaise()
}
buffer.nonEmpty
}

Expand All @@ -86,4 +96,4 @@ class AsyncIterator[T](private val source: Iterator[T], bufferSize: Option[Int]
item
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ class AsyncIteratorTest extends UnitSpec with OptionValues {
}
}

it should s"correctly propagate an exception that originates from within the source iterator" in {
// Issue: https://github.com/fulcrumgenomics/commons/pull/74
var exceptionNotRaisedInTime = false
def raise(num: Int): Int = throw new IllegalArgumentException(num.toString)
val source = Range(1, 10000).iterator.map { num =>
if (num > 1) exceptionNotRaisedInTime = true // We do this because we can't successfully raise in this context.
raise(num)
}
an[IllegalArgumentException] shouldBe thrownBy { new AsyncIterator(source = source).start().toSeq }
withClue("Failed because the illegal argument exception was not caught in time:") { exceptionNotRaisedInTime shouldBe false }
}

"AsyncIterator.apply" should "start a daemon thread via apply" in {
val iter = AsyncIterator[String](Iterator("hello world"))
iter.hasNext() shouldBe true
Expand Down

0 comments on commit 86d8ca6

Please sign in to comment.