Skip to content

Commit

Permalink
Loader: limit the total time spent retrying a failed load (close #1251)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed May 17, 2023
1 parent 6bd28b7 commit da18b21
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class ConfigSpec extends Specification {
password = StorageTarget.PasswordConfig.PlainText("Supersecret1")
)
val cloud = Config.Cloud.AWS(RegionSpec.DefaultTestRegion, exampleMessageQueue.copy(region = Some(RegionSpec.DefaultTestRegion)))
val retries = exampleRetries.copy(cumulativeBound = None)
val retries = exampleRetries.copy(cumulativeBound = Some(20.minutes))
val readyCheck = exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds)
val initRetries = exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes))
val expected = Config(
Expand All @@ -110,7 +110,7 @@ class ConfigSpec extends Specification {
catalog = None,
password = StorageTarget.PasswordConfig.PlainText("Supersecret1")
)
val retries = exampleRetries.copy(cumulativeBound = None)
val retries = exampleRetries.copy(cumulativeBound = Some(20.minutes))
val readyCheck = exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds)
val initRetries = exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes))
val expected = Config(
Expand Down
1 change: 1 addition & 0 deletions modules/loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"backoff": "30 seconds",
"strategy": "EXPONENTIAL",
"attempts": 3
"cumulativeBound": "20 minutes"
},
"readyCheck": {
"backoff": "15 seconds",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scala.concurrent.duration._
import cats.{Apply, Monad, MonadThrow}
import cats.implicits._
import cats.effect.{Async, Clock}
import retry._
import retry.RetryDetails
import fs2.Stream
import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
Expand Down Expand Up @@ -103,7 +103,7 @@ object Loader {
val periodicMetrics: Stream[F, Unit] =
Monitoring[F].periodicMetrics.report

def initRetry(f: F[Unit]) = retryingOnAllErrors(Retry.getRetryPolicy[F](config.initRetries), initRetryLog[F])(f)
def initRetry(f: F[Unit]) = Retry.retryingOnAllErrors(config.initRetries, initRetryLog[F], f)

val blockUntilReady = initRetry(TargetCheck.prepareTarget[F, C]).onError { case t: Throwable =>
Monitoring[F].alert(Alert.FailedInitialConnection(t))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ package com.snowplowanalytics.snowplow.rdbloader.dsl

import cats.{MonadThrow, ~>}
import cats.implicits._
import retry._
import cats.effect.Clock
import retry.{RetryDetails, Sleep}

import com.snowplowanalytics.snowplow.rdbloader.config.Config
import com.snowplowanalytics.snowplow.rdbloader.loading.Retry
Expand All @@ -24,29 +25,27 @@ import com.snowplowanalytics.snowplow.rdbloader.transactors.RetryingTransactor
object RetryingTransaction {

/** A Transaction-handler that retries the io if there is an exception */
def wrap[F[_]: MonadThrow: Logging: Sleep, C[_]](
def wrap[F[_]: MonadThrow: Logging: Clock: Sleep, C[_]](
retries: Config.Retries,
inner: Transaction[F, C]
): Transaction[F, C] = {
val policy = Retry.getRetryPolicy[F](retries)
): Transaction[F, C] =
new Transaction[F, C] {

def transact[A](io: C[A]): F[A] =
withErrorAdaption(policy) {
withErrorAdaption(retries) {
inner.transact(io)
}

def run[A](io: C[A]): F[A] =
withErrorAdaption(policy) {
withErrorAdaption(retries) {
inner.run(io)
}

def arrowBack: F ~> C = inner.arrowBack
}
}

private def withErrorAdaption[F[_]: MonadThrow: Sleep: Logging, A](policy: RetryPolicy[F])(io: F[A]): F[A] =
retryingOnSomeErrors(policy, isWorthRetry.andThen(_.pure[F]), onError[F](_, _))(io)
private def withErrorAdaption[F[_]: MonadThrow: Clock: Sleep: Logging, A](retries: Config.Retries)(io: F[A]): F[A] =
Retry.retryingOnSomeErrors(retries, isWorthRetry.andThen(_.pure[F]), onError[F](_, _), io)

private val isWorthRetry: Throwable => Boolean = {
case RetryingTransactor.ExceededRetriesException() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.loading

import cats.{Applicative, Show}
import cats.{Applicative, MonadThrow, Show}
import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.snowplow.rdbloader.config.Config.{Retries, Strategy}
import retry.{RetryDetails, RetryPolicies, RetryPolicy}
import retry._
import scala.concurrent.duration.{Duration, FiniteDuration}

/**
* A module responsible for retrying a transaction Unlike, `discovery.Retries` it's all about
Expand Down Expand Up @@ -47,9 +49,30 @@ object Retry {
e => e.toString.toLowerCase.contains("cannot decode sql row: table comment is not valid schemakey, invalid_igluuri")
)

def retryingOnSomeErrors[F[_]: MonadThrow: Clock: Sleep, A](
retries: Retries,
isWorthRetrying: Throwable => F[Boolean],
onError: (Throwable, RetryDetails) => F[Unit],
f: F[A]
): F[A] =
for {
policy <- getRetryPolicy[F](retries)
result <- retry.retryingOnSomeErrors(policy, isWorthRetrying, onError)(f)
} yield result

def retryingOnAllErrors[F[_]: MonadThrow: Clock: Sleep, A](
retries: Retries,
onError: (Throwable, RetryDetails) => F[Unit],
f: F[A]
): F[A] =
for {
policy <- getRetryPolicy[F](retries)
result <- retry.retryingOnAllErrors(policy, onError)(f)
} yield result

/** Build a cats-retry-specific retry policy from Loader's config */
def getRetryPolicy[F[_]: Applicative](retries: Retries): RetryPolicy[F] =
if (retries.attempts.contains(0)) RetryPolicies.alwaysGiveUp
private def getRetryPolicy[F[_]: Applicative: Clock](retries: Retries): F[RetryPolicy[F]] =
if (retries.attempts.contains(0)) RetryPolicies.alwaysGiveUp[F].pure[F]
else {
val policy = retries.strategy match {
case Strategy.Jitter => RetryPolicies.fullJitter[F](retries.backoff)
Expand All @@ -67,10 +90,24 @@ object Retry {

retries.cumulativeBound match {
case Some(bound) =>
RetryPolicies.limitRetriesByCumulativeDelay(bound, withAttempts)
joinWithCumulativeBound(withAttempts, bound)
case None =>
withAttempts
withAttempts.pure[F]
}
}

private def joinWithCumulativeBound[F[_]: Applicative: Clock](policy: RetryPolicy[F], bound: FiniteDuration): F[RetryPolicy[F]] =
Clock[F].realTime.map { startedAt =>
val withCumulativeBound = RetryPolicy[F] { _ =>
Clock[F].realTime.map { now =>
if (now - startedAt <= bound)
PolicyDecision.DelayAndRetry(Duration.Zero)
else
PolicyDecision.GiveUp
}
}

policy.join(withCumulativeBound)
}

implicit val detailsShow: Show[RetryDetails] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.transactors

import cats.effect.Resource
import cats.effect.{Clock, Resource}
import cats.effect.kernel.{MonadCancelThrow, Temporal}
import cats.syntax.all._
import doobie.Transactor
Expand Down Expand Up @@ -68,19 +68,19 @@ object RetryingTransactor {
def wrap[F[_]: Temporal: Logging: Sleep, A](
config: Config.Retries,
inner: Transactor.Aux[F, A]
): Transactor.Aux[F, A] = {
val policy = Retry.getRetryPolicy[Resource[F, *]](config)
inner.copy(connect0 = a => wrapResource(policy, inner.connect(a)))
}
): Transactor.Aux[F, A] =
inner.copy(connect0 = a => wrapResource(config, inner.connect(a)))

private def wrapResource[F[_]](
policy: RetryPolicy[Resource[F, *]],
config: Config.Retries,
resource: Resource[F, Connection]
)(implicit F: MonadCancelThrow[F],
L: Logging[F],
S: Sleep[Resource[F, *]]
S: Sleep[Resource[F, *]],
C: Clock[Resource[F, *]]
): Resource[F, Connection] =
retryingOnSomeErrors(policy, isTransientError.andThen(_.pure[Resource[F, *]]), onError[F](_, _))(resource)
Retry
.retryingOnSomeErrors(config, isTransientError.andThen(_.pure[Resource[F, *]]), onError[F](_, _), resource)
.adaptError {
case t if isTransientError(t) => new ExceededRetriesException(t)
case t: Throwable => new UnskippableConnectionException(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.rdbloader.dsl
import cats.arrow.FunctionK
import cats.data.Kleisli
import cats.effect.testkit.TestControl
import cats.effect.{IO, Resource}
import cats.effect.{Clock, IO, Resource}
import cats.effect.std.Dispatcher
import cats.effect.unsafe.implicits.global
import doobie.{ConnectionIO, Transactor}
Expand All @@ -27,7 +27,7 @@ import retry.Sleep
import com.snowplowanalytics.snowplow.rdbloader.SpecHelpers
import com.snowplowanalytics.snowplow.rdbloader.config.Config
import com.snowplowanalytics.snowplow.rdbloader.db.Statement
import com.snowplowanalytics.snowplow.rdbloader.test.{Pure, PureDAO, PureLogging, PureOps, PureSleep, PureTransaction, TestState}
import com.snowplowanalytics.snowplow.rdbloader.test.{Pure, PureClock, PureDAO, PureLogging, PureOps, PureSleep, PureTransaction, TestState}
import com.snowplowanalytics.snowplow.rdbloader.test.TestState.LogEntry
import com.snowplowanalytics.snowplow.rdbloader.transactors.RetryingTransactor
import scala.concurrent.duration.DurationInt
Expand All @@ -41,6 +41,7 @@ class RetryingTransactionSpec extends Specification {
"not retry transaction when there are no errors" in {
implicit val logging: Logging[Pure] = PureLogging.interpreter(noop = true)
implicit val sleep: Sleep[Pure] = PureSleep.interpreter
implicit val clock: Clock[Pure] = PureClock.interpreter

val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init)

Expand All @@ -49,6 +50,7 @@ class RetryingTransactionSpec extends Specification {
val program = dao.executeUpdate(Statement.VacuumEvents, DAO.Purpose.NonLoading)

val expected = List(
LogEntry.Message("TICK REALTIME"),
PureTransaction.StartMessage,
LogEntry.Sql(Statement.VacuumEvents),
PureTransaction.CommitMessage
Expand All @@ -62,6 +64,7 @@ class RetryingTransactionSpec extends Specification {
"not retry no-transaction when there are no errors" in {
implicit val logging: Logging[Pure] = PureLogging.interpreter(noop = true)
implicit val sleep: Sleep[Pure] = PureSleep.interpreter
implicit val clock: Clock[Pure] = PureClock.interpreter

val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init)

Expand All @@ -70,6 +73,7 @@ class RetryingTransactionSpec extends Specification {
val program = dao.executeUpdate(Statement.VacuumEvents, DAO.Purpose.NonLoading)

val expected = List(
LogEntry.Message("TICK REALTIME"),
PureTransaction.NoTransactionMessage,
LogEntry.Sql(Statement.VacuumEvents)
)
Expand All @@ -82,6 +86,7 @@ class RetryingTransactionSpec extends Specification {
"retry transaction when there is an error" in {
implicit val logging: Logging[Pure] = PureLogging.interpreter(noop = true)
implicit val sleep: Sleep[Pure] = PureSleep.interpreter
implicit val clock: Clock[Pure] = PureClock.interpreter

val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init.withExecuteUpdate(isFirstAttempt, raiseException))

Expand All @@ -90,8 +95,10 @@ class RetryingTransactionSpec extends Specification {
val program = dao.executeUpdate(Statement.VacuumEvents, DAO.Purpose.NonLoading)

val expected = List(
LogEntry.Message("TICK REALTIME"),
PureTransaction.StartMessage,
PureTransaction.RollbackMessage,
LogEntry.Message("TICK REALTIME"),
LogEntry.Message("SLEEP 30000000000 nanoseconds"),
PureTransaction.StartMessage,
LogEntry.Sql(Statement.VacuumEvents),
Expand All @@ -106,6 +113,7 @@ class RetryingTransactionSpec extends Specification {
"retry no-transaction when there is an error" in {
implicit val logging: Logging[Pure] = PureLogging.interpreter(noop = true)
implicit val sleep: Sleep[Pure] = PureSleep.interpreter
implicit val clock: Clock[Pure] = PureClock.interpreter

val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init.withExecuteUpdate(isFirstAttempt, raiseException))

Expand All @@ -114,7 +122,9 @@ class RetryingTransactionSpec extends Specification {
val program = dao.executeUpdate(Statement.VacuumEvents, DAO.Purpose.NonLoading)

val expected = List(
LogEntry.Message("TICK REALTIME"),
PureTransaction.NoTransactionMessage,
LogEntry.Message("TICK REALTIME"),
LogEntry.Message("SLEEP 30000000000 nanoseconds"),
PureTransaction.NoTransactionMessage,
LogEntry.Sql(Statement.VacuumEvents)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import java.time.Instant
import java.sql.SQLException
import scala.concurrent.duration.FiniteDuration
import cats.syntax.option._
import cats.effect.Clock
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{Processor, Timestamps, TypesInfo}
Expand All @@ -33,6 +34,7 @@ import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, Colum
import com.snowplowanalytics.snowplow.rdbloader.test.TestState.LogEntry
import com.snowplowanalytics.snowplow.rdbloader.test.{
Pure,
PureClock,
PureDAO,
PureIglu,
PureLoadAuthService,
Expand Down Expand Up @@ -168,6 +170,7 @@ class LoadSpec extends Specification {
implicit val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init.withExecuteUpdate(isBeforeFirstCommit, failCommit))
implicit val iglu: Iglu[Pure] = PureIglu.interpreter
implicit val sleep: Sleep[Pure] = PureSleep.interpreter
implicit val clock: Clock[Pure] = PureClock.interpreter
implicit val transaction: Transaction[Pure, Pure] = RetryingTransaction.wrap(validConfig.retries, PureTransaction.interpreter)
implicit val loadAuthService: LoadAuthService[Pure] = PureLoadAuthService.interpreter

Expand All @@ -176,11 +179,15 @@ class LoadSpec extends Specification {
"s3://assets/com.acme/json_context_1.json".key
)
val expected = List(
LogEntry.Message("TICK REALTIME"),
PureTransaction.NoTransactionMessage,
LogEntry.Sql(Statement.ReadyCheck),
LogEntry.Message("TICK REALTIME"),
PureTransaction.NoTransactionMessage, // Migration.build
LogEntry.Message("TICK REALTIME"),
PureTransaction.NoTransactionMessage, // setStage and migrations.preTransactions

LogEntry.Message("TICK REALTIME"),
PureTransaction.StartMessage,
LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)),
LogEntry.Sql(
Expand All @@ -196,6 +203,7 @@ class LoadSpec extends Specification {
),
LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip, LoadAuthMethod.NoCreds)),
PureTransaction.RollbackMessage,
LogEntry.Message("TICK REALTIME"),
LogEntry.Message("SLEEP 30000000000 nanoseconds"),
PureTransaction.StartMessage,
LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ConfigSpec extends Specification {
None,
defaultSchedules,
exampleTimeouts,
exampleRetries.copy(cumulativeBound = None),
exampleRetries.copy(cumulativeBound = Some(20.minutes)),
exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds),
exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)),
exampleFeatureFlags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class ConfigSpec extends Specification {
None,
defaultSchedules,
exampleTimeouts,
exampleRetries.copy(cumulativeBound = None),
exampleRetries.copy(cumulativeBound = Some(20.minutes)),
exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds),
exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)),
exampleFeatureFlags,
Expand Down Expand Up @@ -135,7 +135,7 @@ class ConfigSpec extends Specification {
None,
defaultSchedules,
exampleTimeouts,
exampleRetries.copy(cumulativeBound = None),
exampleRetries.copy(cumulativeBound = Some(20.minutes)),
exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds),
exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)),
exampleFeatureFlags,
Expand Down

0 comments on commit da18b21

Please sign in to comment.