Skip to content

Commit

Permalink
Merge branch 'sp/#350-ResultEventListener-missing-messages' into all/…
Browse files Browse the repository at this point in the history
…#tbw
  • Loading branch information
sebastian-peter committed Sep 28, 2022
2 parents 5aac7ae + 050e1d2 commit 75a79df
Showing 1 changed file with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class ResultEventListenerSpec
classOf[LineResult]
)

private val timeout = 10.seconds

// the OutputFileHierarchy
private def resultFileHierarchy(
runId: Int,
Expand Down Expand Up @@ -178,19 +180,19 @@ class ResultEventListenerSpec
)

// wait until output file exists (headers are flushed out immediately):
awaitCond(outputFile.exists(), interval = 500.millis)
awaitCond(outputFile.exists(), interval = 500.millis, max = timeout)

// stop listener so that result is flushed out
Await.ready(
gracefulStop(listenerRef, 5.seconds),
5.seconds
gracefulStop(listenerRef, timeout),
timeout
)

// wait until all lines have been written out:
awaitCond(
getFileLinesLength(outputFile) == 2,
interval = 500.millis,
max = 5.seconds
max = timeout
)

val resultFileSource = Source.fromFile(outputFile)
Expand Down Expand Up @@ -262,20 +264,21 @@ class ResultEventListenerSpec
// wait until all output files exist (headers are flushed out immediately):
awaitCond(
outputFiles.values.map(_.exists()).forall(identity),
interval = 500.millis
interval = 500.millis,
max = timeout
)

// stop listener so that result is flushed out
Await.ready(
gracefulStop(listenerRef, 5.seconds),
5.seconds
gracefulStop(listenerRef, timeout),
timeout
)

// wait until all lines have been written out:
awaitCond(
!outputFiles.values.exists(file => getFileLinesLength(file) < 2),
interval = 500.millis,
max = 5.seconds
max = timeout
)

outputFiles.foreach { case (resultRowString, outputFile) =>
Expand Down Expand Up @@ -503,8 +506,8 @@ class ResultEventListenerSpec

// stop listener so that result is flushed out
Await.ready(
gracefulStop(listener, 5.seconds),
5.seconds
gracefulStop(listener, timeout),
timeout
)

/* Await that the result is written */
Expand Down Expand Up @@ -566,8 +569,8 @@ class ResultEventListenerSpec
// this also triggers the compression of result files
import akka.pattern._
Await.ready(
gracefulStop(listenerRef, 5.seconds),
5.seconds
gracefulStop(listenerRef, timeout),
timeout
)

// shutdown the actor system
Expand All @@ -583,7 +586,7 @@ class ResultEventListenerSpec
)
)
).exists,
10.seconds
timeout
)

val resultFileSource = Source.fromInputStream(
Expand Down

0 comments on commit 75a79df

Please sign in to comment.