From bc64327944414cbc1623a3d453aebb44be220129 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Mon, 27 Mar 2023 08:55:24 +0100 Subject: [PATCH] Loader: Retry failures for all warehouse operations (close #1225) --- .../snowplow/rdbloader/Loader.scala | 7 +- .../snowplow/rdbloader/dsl/Environment.scala | 3 +- .../rdbloader/dsl/FolderMonitoring.scala | 13 +- .../rdbloader/dsl/RetryingTransaction.scala | 62 +++++ .../snowplow/rdbloader/dsl/Transaction.scala | 39 ++- .../rdbloader/dsl/VacuumScheduling.scala | 32 +-- .../snowplow/rdbloader/loading/Load.scala | 39 ++- .../snowplow/rdbloader/loading/Retry.scala | 23 +- .../rdbloader/loading/TargetCheck.scala | 33 +-- .../transactors/RetryingTransactor.scala | 93 +++++++ .../{utils => transactors}/SSH.scala | 2 +- .../rdbloader/dsl/FolderMonitoringSpec.scala | 3 - .../dsl/RetryingTransactionSpec.scala | 226 ++++++++++++++++++ .../snowplow/rdbloader/loading/LoadSpec.scala | 17 +- .../rdbloader/test/PureTransaction.scala | 2 - 15 files changed, 455 insertions(+), 139 deletions(-) create mode 100644 modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/RetryingTransaction.scala create mode 100644 modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transactors/RetryingTransactor.scala rename modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/{utils => transactors}/SSH.scala (99%) create mode 100644 modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/RetryingTransactionSpec.scala diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala index dfd5c4095..63bc25435 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala @@ -81,7 +81,6 @@ object Loader { initQueryResult <- initQuery[F, C, I](target) _ <- FolderMonitoring.run[F, C, I]( config.monitoring.folders, - config.readyCheck, control.isBusy, initQueryResult, target.prepareAlertTable @@ -109,7 +108,7 @@ object Loader { def initRetry(f: F[Unit]) = retryingOnAllErrors(Retry.getRetryPolicy[F](config.initRetries), initRetryLog[F])(f) - val blockUntilReady = initRetry(TargetCheck.blockUntilReady[F, C](config.readyCheck)) *> + val blockUntilReady = initRetry(TargetCheck.prepareTarget[F, C]) *> Logging[F].info("Target check is completed") val noOperationPrepare = NoOperation.prepare(config.schedules.noOperation, control.makePaused) *> Logging[F].info("No operation prepare step is completed") @@ -189,6 +188,8 @@ object Loader { val setStageC: Stage => C[Unit] = stage => Transaction[F, C].arrowBack(control.setStage(stage)) + val incrementAttemptsC: C[Unit] = + Transaction[F, C].arrowBack(control.incrementAttempts) val addFailure: Throwable => F[Boolean] = control.addFailure(config.retryQueue)(folder)(_) @@ -196,7 +197,7 @@ object Loader { for { start <- Clock[F].realTimeInstant _ <- discovery.origin.timestamps.min.map(t => Monitoring[F].periodicMetrics.setEarliestKnownUnloadedData(t)).sequence.void - result <- Load.load[F, C, I](config, setStageC, control.incrementAttempts, discovery, initQueryResult, target) + result <- Load.load[F, C, I](setStageC, incrementAttemptsC, discovery, initQueryResult, target) attempts <- control.getAndResetAttempts _ <- result match { case Load.LoadSuccess(ingested) => diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala index 1378d84ca..f14331d88 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala @@ -112,7 +112,8 @@ object Environment { Monitoring.monitoringInterpreter[F](tracker, sentry, reporters, cli.config.monitoring.webhook, httpClient, periodicMetrics) implicit0(secretStore: SecretStore[F]) = cloudServices.secretStore implicit0(dispatcher: Dispatcher[F]) <- Dispatcher.parallel[F] - transaction <- Transaction.interpreter[F](cli.config.storage, cli.config.timeouts) + transaction <- Transaction.interpreter[F](cli.config.storage, cli.config.timeouts, cli.config.readyCheck) + transaction <- Resource.pure(RetryingTransaction.wrap(cli.config.retries, transaction)) telemetry <- Telemetry.build[F]( cli.config.telemetry, appName, diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala index 0ed8a82f2..a6d532423 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala @@ -163,8 +163,6 @@ object FolderMonitoring { * `shredding_complete.json` and turned into corresponding `AlertPayload` * @param loadFrom * list shredded folders - * @param readyCheck - * config for retry logic * @param initQueryResult * results of the queries sent to warehouse when application is initialized * @param prepareAlertTable @@ -174,7 +172,6 @@ object FolderMonitoring { */ def check[F[_]: MonadThrow: BlobStorage: Sleep: Transaction[*[_], C]: Logging, C[_]: DAO: Monad: LoadAuthService, I]( loadFrom: BlobStorage.Folder, - readyCheck: Config.Retries, initQueryResult: I, prepareAlertTable: List[Statement] ): F[List[AlertPayload]] = { @@ -186,7 +183,7 @@ object FolderMonitoring { } yield onlyS3Batches for { - _ <- TargetCheck.blockUntilReady[F, C](readyCheck) + _ <- TargetCheck.prepareTarget[F, C] onlyS3Batches <- Transaction[F, C].transact(getBatches) foldersWithChecks <- checkShreddingComplete[F](onlyS3Batches) } yield foldersWithChecks.map { case (folder, exists) => @@ -218,14 +215,13 @@ object FolderMonitoring { */ def run[F[_]: Async: BlobStorage: Transaction[*[_], C]: Logging: Monitoring: MonadThrow, C[_]: DAO: LoadAuthService: Monad, I]( foldersCheck: Option[Config.Folders], - readyCheck: Config.Retries, isBusy: Stream[F, Boolean], initQueryResult: I, prepareAlertTable: List[Statement] ): Stream[F, Unit] = foldersCheck match { case Some(folders) => - stream[F, C, I](folders, readyCheck, isBusy, initQueryResult, prepareAlertTable) + stream[F, C, I](folders, isBusy, initQueryResult, prepareAlertTable) case None => Stream.eval[F, Unit](Logging[F].info("Configuration for monitoring.folders hasn't been provided - monitoring is disabled")) } @@ -237,8 +233,6 @@ object FolderMonitoring { * * @param folders * configuration for folders monitoring - * @param readyCheck - * configuration for target ready check * @param isBusy * discrete stream signalling when folders monitoring should not work * @param initQueryResult @@ -248,7 +242,6 @@ object FolderMonitoring { */ def stream[F[_]: Transaction[*[_], C]: Async: BlobStorage: Logging: Monitoring: MonadThrow, C[_]: DAO: LoadAuthService: Monad, I]( folders: Config.Folders, - readyCheck: Config.Retries, isBusy: Stream[F, Boolean], initQueryResult: I, prepareAlertTable: List[Statement] @@ -263,7 +256,7 @@ object FolderMonitoring { Logging[F].info("Monitoring shredded folders") *> sinkFolders[F](folders.since, folders.until, folders.transformerOutput, outputFolder).ifM( for { - alerts <- check[F, C, I](outputFolder, readyCheck, initQueryResult, prepareAlertTable) + alerts <- check[F, C, I](outputFolder, initQueryResult, prepareAlertTable) _ <- alerts.traverse_ { payload => val warn = payload.base match { case Some(folder) => Logging[F].warning(s"${payload.message} $folder") diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/RetryingTransaction.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/RetryingTransaction.scala new file mode 100644 index 000000000..1e6d36e28 --- /dev/null +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/RetryingTransaction.scala @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.dsl + +import cats.{MonadThrow, ~>} +import cats.implicits._ +import retry._ + +import com.snowplowanalytics.snowplow.rdbloader.config.Config +import com.snowplowanalytics.snowplow.rdbloader.loading.Retry +import com.snowplowanalytics.snowplow.rdbloader.loading.Retry._ +import com.snowplowanalytics.snowplow.rdbloader.transactors.RetryingTransactor + +object RetryingTransaction { + + /** A Transaction-handler that retries the io if there is an exception */ + def wrap[F[_]: MonadThrow: Logging: Sleep, C[_]]( + retries: Config.Retries, + inner: Transaction[F, C] + ): Transaction[F, C] = { + val policy = Retry.getRetryPolicy[F](retries) + new Transaction[F, C] { + + def transact[A](io: C[A]): F[A] = + withErrorAdaption(policy) { + inner.transact(io) + } + + def run[A](io: C[A]): F[A] = + withErrorAdaption(policy) { + inner.run(io) + } + + def arrowBack: F ~> C = inner.arrowBack + } + } + + private def withErrorAdaption[F[_]: MonadThrow: Sleep: Logging, A](policy: RetryPolicy[F])(io: F[A]): F[A] = + retryingOnSomeErrors(policy, isWorthRetry.andThen(_.pure[F]), onError[F](_, _))(io) + + private val isWorthRetry: Throwable => Boolean = { + case _: RetryingTransactor.ExceededRetriesException => + // The relevant retry policy has already been applied and exceeded + false + case e => + Retry.isWorth(e) + } + + private def onError[F[_]: Logging](t: Throwable, d: RetryDetails): F[Unit] = + Logging[F].error(t)(show"Error executing transaction. $d") + +} diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Transaction.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Transaction.scala index 10cbd6dc2..04cf27a68 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Transaction.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Transaction.scala @@ -26,10 +26,11 @@ import doobie.implicits._ import doobie.util.transactor.Strategy import doobie.hikari._ import com.zaxxer.hikari.HikariConfig +import retry.Sleep import java.sql.SQLException import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} -import com.snowplowanalytics.snowplow.rdbloader.utils.SSH +import com.snowplowanalytics.snowplow.rdbloader.transactors.{RetryingTransactor, SSH} import com.snowplowanalytics.snowplow.rdbloader.common.cloud.SecretStore /** @@ -62,11 +63,6 @@ trait Transaction[F[_], C[_]] { */ def run[A](io: C[A]): F[A] - /** - * Same as run, but narrowed down to transaction to allow migration error handling. - */ - def run_(io: C[Unit]): F[Unit] - /** * A kind-function (`mapK`) to downcast `F` into `C` This is a very undesirable, but necessary * hack that allows us to chain `F` effects (real side-effects) with `C` (DB) in both directions. @@ -109,8 +105,9 @@ object Transaction { ds.setDataSourceProperties(target.properties) } - def buildPool[F[_]: Async: SecretStore]( - target: StorageTarget + def buildPool[F[_]: Async: SecretStore: Logging: Sleep]( + target: StorageTarget, + retries: Config.Retries ): Resource[F, Transactor[F]] = for { ce <- ExecutionContexts.fixedThreadPool[F](2) @@ -123,6 +120,7 @@ object Transaction { xa <- HikariTransactor .newHikariTransactor[F](target.driver, target.connectionUrl, target.username, password, ce) _ <- Resource.eval(xa.configure(configureHikari[F](target, _))) + xa <- Resource.pure(RetryingTransactor.wrap(retries, xa)) xa <- target.sshTunnel.fold(Resource.pure[F, Transactor[F]](xa))(SSH.transactor(_, xa)) } yield xa @@ -131,11 +129,14 @@ object Transaction { * close a JDBC connection. If connection could not be acquired, it will retry several times * according to `retryPolicy` */ - def interpreter[F[_]: Async: Dispatcher: Monitoring: SecretStore]( + def interpreter[F[_]: Async: Dispatcher: Logging: Monitoring: SecretStore: Sleep]( target: StorageTarget, - timeouts: Config.Timeouts + timeouts: Config.Timeouts, + connectionRetries: Config.Retries ): Resource[F, Transaction[F, ConnectionIO]] = - buildPool[F](target).map(xa => Transaction.jdbcRealInterpreter[F](target, timeouts, xa)) + buildPool[F](target, connectionRetries).map { xa => + Transaction.jdbcRealInterpreter[F](target, timeouts, xa) + } def defaultStrategy(rollbackCommitTimeout: FiniteDuration): Strategy = { val timeoutSeconds = rollbackCommitTimeout.toSeconds.toInt @@ -174,9 +175,8 @@ object Transaction { case (fiber, Outcome.Canceled()) => fiber.cancel.timeout(timeouts.rollbackCommit) } - .adaptError { - case e: SQLException => new TransactionException(s"${e.getMessage} - SqlState: ${e.getSQLState}", e) - case e => new TransactionException(e.getMessage, e) + .adaptError { case e: SQLException => + new TransactionException(s"${e.getMessage} - SqlState: ${e.getSQLState}", e) } } @@ -186,17 +186,6 @@ object Transaction { def run[A](io: ConnectionIO[A]): F[A] = NoCommitTransactor.trans.apply(io).withErrorAdaption - val awsColumnResizeError: String = - raw"""\[Amazon\]\(500310\) Invalid operation: cannot alter column "[^\s]+" of relation "[^\s]+", target column size should be different; - SqlState: 0A000""" - - // If premigration was successful, but migration failed. It would leave the columns resized. - // This recovery makes it so resizing error would be ignored. - // Note: AWS will return 500310 error code other SQL errors (i.e. COPY errors), don't use for pattern matching. - def run_(io: ConnectionIO[Unit]): F[Unit] = - run[Unit](io).recoverWith { - case e: TransactionException if e.getMessage matches awsColumnResizeError => ().pure[F] - } - def arrowBack: F ~> ConnectionIO = new FunctionK[F, ConnectionIO] { def apply[A](fa: F[A]): ConnectionIO[A] = diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/VacuumScheduling.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/VacuumScheduling.scala index 60b0927a5..5cc9b68b2 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/VacuumScheduling.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/VacuumScheduling.scala @@ -1,37 +1,15 @@ package com.snowplowanalytics.snowplow.rdbloader.dsl import fs2.Stream -import retry._ -import retry.syntax.all._ import cats.syntax.all._ -import cats.effect.Concurrent import cats.MonadThrow import cats.effect.kernel.Async import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Statement import eu.timepit.fs2cron.cron4s.Cron4sScheduler -import retry.RetryDetails.{GivingUp, WillDelayAndRetry} - -import scala.concurrent.duration._ object VacuumScheduling { - def retryPolicy[F[_]: Concurrent]: RetryPolicy[F] = - RetryPolicies.fibonacciBackoff[F](1.minute) join RetryPolicies.limitRetries[F](10) - - def logError[F[_]: Logging](err: Throwable, details: RetryDetails): F[Unit] = details match { - - case WillDelayAndRetry(nextDelay: FiniteDuration, retriesSoFar: Int, cumulativeDelay: FiniteDuration) => - Logging[F].warning( - s"Failed to vacuum with ${err.getMessage}. So far we have retried $retriesSoFar times over for $cumulativeDelay. Next attempt in $nextDelay." - ) - - case GivingUp(totalRetries: Int, totalDelay: FiniteDuration) => - Logging[F].error( - s"Failed to vacuum with ${err.getMessage}. Giving up after $totalRetries retries after $totalDelay." - ) - } - def run[F[_]: Transaction[*[_], C]: Async: Logging, C[_]: DAO: MonadThrow: Logging]( tgt: StorageTarget, cfg: Config.Schedules @@ -49,8 +27,9 @@ object VacuumScheduling { Logging[C].info("initiating events vacuum") *> DAO[C].executeQuery(Statement.VacuumEvents) *> Logging[C] .info("vacuum events complete") ) - .retryingOnAllErrors(retryPolicy[F], logError[F]) - .orElse(().pure[F]) + .recoverWith { case t: Throwable => + Logging[F].error(t)("Failed to vacuum events table") + } } case _ => Stream.empty[F] } @@ -70,8 +49,9 @@ object VacuumScheduling { Logging[C].info("initiating manifest vacuum") *> DAO[C].executeQuery(Statement.VacuumManifest) *> Logging[C] .info("vacuum manifest complete") ) - .retryingOnAllErrors(retryPolicy[F], logError[F]) - .orElse(().pure[F]) + .recoverWith { case t: Throwable => + Logging[F].error(t)("Failed to vacuum manifest table") + } } case _ => Stream.empty[F] } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala index 7e5cd920c..a0be5495a 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala @@ -21,7 +21,6 @@ import retry.Sleep // This project import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage -import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.{Control, Manifest, Migration, Target} import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery @@ -74,8 +73,6 @@ object Load { * exception in `F` context The only failure that the function silently handles is duplicated dir, * in which case left AlertPayload is returned * - * @param config - * RDB Loader app configuration * @param setStage * function setting a stage in global state * @param incrementAttempt @@ -90,20 +87,24 @@ object Load { * either alert payload in case of duplicate event or ingestion timestamp in case of success */ def load[F[_]: MonadThrow: Logging: Iglu: Sleep: Transaction[*[_], C], C[_]: MonadThrow: Logging: LoadAuthService: DAO, I]( - config: Config[StorageTarget], setStage: Stage => C[Unit], - incrementAttempt: F[Unit], + incrementAttempt: C[Unit], discovery: DataDiscovery.WithOrigin, initQueryResult: I, target: Target[I] ): F[LoadResult] = for { - _ <- TargetCheck.blockUntilReady[F, C](config.readyCheck) + _ <- TargetCheck.prepareTarget[F, C] migrations <- Migration.build[F, C, I](discovery.discovery, target) - _ <- Transaction[F, C].run(setStage(Stage.MigrationPre)) - _ <- migrations.preTransaction.traverse_(Transaction[F, C].run_) - transaction = getTransaction[C, I](setStage, discovery, initQueryResult, target)(migrations.inTransaction) - result <- Retry.retryLoad(config.retries, incrementAttempt, Transaction[F, C].transact(transaction)) + _ <- Transaction[F, C].run { + getPreTransaction[C](setStage, migrations.preTransaction) + } + result <- Transaction[F, C].transact { + getTransaction[C, I](setStage, discovery, initQueryResult, target)(migrations.inTransaction) + .onError { case _: Throwable => + incrementAttempt + } + } } yield result /** @@ -157,6 +158,24 @@ object Load { } } yield result + val awsColumnResizeError: String = + raw"""\[Amazon\]\(500310\) Invalid operation: cannot alter column "[^\s]+" of relation "[^\s]+", target column size should be different; - SqlState: 0A000""" + + def getPreTransaction[C[_]: Logging: MonadThrow: DAO]( + setStage: Stage => C[Unit], + preTransactionMigrations: List[C[Unit]] + ): C[Unit] = for { + _ <- setStage(Stage.MigrationPre) + _ <- preTransactionMigrations.map { io => + io.recoverWith { + // If premigration was successful, but migration failed. It would leave the columns resized. + // This recovery makes it so resizing error would be ignored. + // Note: AWS will return 500310 error code other SQL errors (i.e. COPY errors), don't use for pattern matching. + case e if e.getMessage.matches(awsColumnResizeError) => ().pure[C] + } + }.sequence_ + } yield () + /** * Run loading actions for atomic and shredded data * diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala index aa800e3f3..71f4708c7 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala @@ -12,11 +12,10 @@ */ package com.snowplowanalytics.snowplow.rdbloader.loading -import cats.{Applicative, MonadThrow, Show} +import cats.{Applicative, Show} import cats.implicits._ import com.snowplowanalytics.snowplow.rdbloader.config.Config.{Retries, Strategy} -import com.snowplowanalytics.snowplow.rdbloader.dsl.Logging -import retry.{RetryDetails, RetryPolicies, RetryPolicy, Sleep} +import retry.{RetryDetails, RetryPolicies, RetryPolicy} import retry._ /** @@ -25,24 +24,6 @@ import retry._ */ object Retry { - /** - * This retry policy will attempt several times with short pauses (30 + 60 + 90 sec) Because most - * of errors such connection drops should be happening in in connection acquisition The error - * handler will also abort the transaction (it should start in the original action again) - */ - def retryLoad[F[_]: MonadThrow: Sleep: Logging, A]( - config: Retries, - incrementAttempt: F[Unit], - fa: F[A] - ): F[A] = { - val onError = (e: Throwable, d: RetryDetails) => incrementAttempt *> log[F](e, d) - val retryPolicy = getRetryPolicy[F](config) - retryingOnSomeErrors(retryPolicy, { t: Throwable => isWorth(t).pure[F] }, onError)(fa) - } - - def log[F[_]: Logging](e: Throwable, d: RetryDetails): F[Unit] = - Logging[F].error(show"Transaction aborted. $d. Caught exception: ${e.toString}") - /** Check if error is worth retrying */ def isWorth(e: Throwable): Boolean = { val isFatal = FatalFailures.foldLeft(false)((isPreviousFatal, predicate) => predicate(e) || isPreviousFatal) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/TargetCheck.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/TargetCheck.scala index d7783d4fd..2871a33f7 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/TargetCheck.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/TargetCheck.scala @@ -12,44 +12,21 @@ */ package com.snowplowanalytics.snowplow.rdbloader.loading -import cats.{Applicative, MonadThrow} +import cats.Applicative import cats.implicits._ import retry._ -import java.sql.SQLTransientConnectionException - -import com.snowplowanalytics.snowplow.rdbloader.config.Config import com.snowplowanalytics.snowplow.rdbloader.db.Statement import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Logging, Transaction} -import com.snowplowanalytics.snowplow.rdbloader.loading.Retry._ /** - * Module checks whether target is ready to load or not. It blocks the application until target - * become ready to accept statements. + * Module prepares the target to make it ready for loading. */ object TargetCheck { /** - * Probe the target database to find out if it is operational. Continue to make this check until - * it become ready. + * Prepare the target to make it ready for loading e.g. start the warehouse running */ - def blockUntilReady[F[_]: MonadThrow: Transaction[*[_], C]: Logging: Sleep, C[_]: DAO]( - readyCheckConfig: Config.Retries - ): F[Unit] = { - val onError = (e: Throwable, d: RetryDetails) => log(e, d) - val retryPolicy = Retry.getRetryPolicy[F](readyCheckConfig) - val fa: F[Unit] = Transaction[F, C].run(DAO[C].executeQuery[Unit](Statement.ReadyCheck)).void - val isWorthF = isWorth.andThen(_.pure[F]) - retryingOnSomeErrors(retryPolicy, isWorthF, onError)(fa) - } - - def log[F[_]: Logging: Applicative](e: Throwable, d: RetryDetails): F[Unit] = - Logging[F].info(show"Target is not ready. $d") *> - Logging[F].debug(show"Caught exception during target check: ${e.toString}") - - /** Check if error is worth retrying */ - def isWorth: Throwable => Boolean = { - case e: SQLTransientConnectionException if Option(e.getCause).isEmpty => true - case _ => false - } + def prepareTarget[F[_]: Applicative: Transaction[*[_], C]: Logging: Sleep, C[_]: DAO]: F[Unit] = + Transaction[F, C].run(DAO[C].executeQuery[Unit](Statement.ReadyCheck)).void } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transactors/RetryingTransactor.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transactors/RetryingTransactor.scala new file mode 100644 index 000000000..656b4689b --- /dev/null +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transactors/RetryingTransactor.scala @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.transactors + +import cats.effect.Resource +import cats.effect.kernel.{MonadCancelThrow, Temporal} +import cats.syntax.all._ +import doobie.Transactor +import retry._ + +import java.sql.Connection +import java.sql.{SQLException, SQLTransientConnectionException} + +import com.snowplowanalytics.snowplow.rdbloader.config.Config +import com.snowplowanalytics.snowplow.rdbloader.dsl.Logging +import com.snowplowanalytics.snowplow.rdbloader.loading.Retry +import com.snowplowanalytics.snowplow.rdbloader.loading.Retry._ + +object RetryingTransactor { + + class ExceededRetriesException(cause: Throwable) extends Exception("Exceeded retry limits trying to get a JDBC connection", cause) + + /** + * A doobie transactor that retries getting a connection if the HikariPool times out. It blocks + * the application until the connection is available, or until the retry limits are exceeded. + */ + def wrap[F[_]: Temporal: Logging: Sleep, A]( + config: Config.Retries, + inner: Transactor.Aux[F, A] + ): Transactor.Aux[F, A] = { + val policy = Retry.getRetryPolicy[Resource[F, *]](config) + inner.copy(connect0 = a => wrapResource(policy, inner.connect(a))) + } + + private def wrapResource[F[_]]( + policy: RetryPolicy[Resource[F, *]], + resource: Resource[F, Connection] + )(implicit F: MonadCancelThrow[F], + L: Logging[F], + S: Sleep[Resource[F, *]] + ): Resource[F, Connection] = + retryingOnSomeErrors(policy, isConnectionError.andThen(_.pure[Resource[F, *]]), onError[F](_, _))(resource) + .adaptError { + case t if isConnectionError(t) => new ExceededRetriesException(t) + case t: Throwable => t + } + + /** + * Matches against the exception against the recognizable signatures which tell us the Hikari pool + * could not get any connection within a time limit. For Databricks, it likely tells us the + * cluster is still starting up. + */ + private val isConnectionError: Throwable => Boolean = { + case e: SQLTransientConnectionException => + Option(e.getCause) match { + case None => + // Expected when: + // - The HikariPool has only just started up, and has not yet made any connection + // - ...and the JDBC server (e.g. Databricks cluster) times out on a connection + true + case Some(cause: SQLException) => + Option(cause.getSQLState) match { + case Some("HYT01") => + // Expected when: + // - The HikariPool has already received 1+ connection timeouts from the JDBC server + // - ...and the JDBC server (e.g. Databricks cluster) times out on follow-up attempts to get a connection + true + case _ => + // Expected when the JDBC driver cannot connect to the server for any other reason, + // e.g. authorization failure. + // We don't want to retry these exceptions, so return false + false + } + case _ => + false + } + case _: Throwable => + false + } + + private def onError[F[_]: Logging](t: Throwable, d: RetryDetails): Resource[F, Unit] = + Resource.eval(Logging[F].info(show"Target is not ready. $d. ${t.getMessage}")) +} diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/SSH.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transactors/SSH.scala similarity index 99% rename from modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/SSH.scala rename to modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transactors/SSH.scala index a03920db2..7faa91e4b 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/utils/SSH.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transactors/SSH.scala @@ -10,7 +10,7 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ -package com.snowplowanalytics.snowplow.rdbloader.utils +package com.snowplowanalytics.snowplow.rdbloader.transactors import cats.Monad import cats.effect.{Async, Resource, Sync} diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala index 8907062e8..f27f26b56 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala @@ -77,7 +77,6 @@ class FolderMonitoringSpec extends Specification { FolderMonitoring .check[Pure, Pure, Unit]( loadFrom, - exampleReadyCheckConfig, (), Target.defaultPrepareAlertTable ) @@ -126,7 +125,6 @@ class FolderMonitoringSpec extends Specification { FolderMonitoring .check[Pure, Pure, Unit]( loadFrom, - exampleReadyCheckConfig, (), Target.defaultPrepareAlertTable ) @@ -268,5 +266,4 @@ object FolderMonitoringSpec { } } - val exampleReadyCheckConfig: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour)) } diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/RetryingTransactionSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/RetryingTransactionSpec.scala new file mode 100644 index 000000000..4d26af088 --- /dev/null +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/RetryingTransactionSpec.scala @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.dsl + +import cats.arrow.FunctionK +import cats.data.Kleisli +import cats.effect.testkit.TestControl +import cats.effect.{IO, Resource} +import cats.effect.std.Dispatcher +import cats.effect.unsafe.implicits.global +import doobie.{ConnectionIO, Transactor} +import doobie.util.transactor.Strategy +import doobie.free.connection.ConnectionOp +import doobie.implicits._ +import java.sql.{Connection, SQLTransientConnectionException, SQLTransientException} +import retry.Sleep +import com.snowplowanalytics.snowplow.rdbloader.SpecHelpers +import com.snowplowanalytics.snowplow.rdbloader.config.Config +import com.snowplowanalytics.snowplow.rdbloader.db.Statement +import com.snowplowanalytics.snowplow.rdbloader.test.{Pure, PureDAO, PureLogging, PureOps, PureSleep, PureTransaction, TestState} +import com.snowplowanalytics.snowplow.rdbloader.test.TestState.LogEntry +import com.snowplowanalytics.snowplow.rdbloader.transactors.RetryingTransactor +import scala.concurrent.duration.DurationInt + +import org.specs2.mutable.Specification + +class RetryingTransactionSpec extends Specification { + import RetryingTransactorSpec._ + + "the retrying transactor" should { + "not retry transaction when there are no errors" in { + implicit val logging: Logging[Pure] = PureLogging.interpreter(noop = true) + implicit val sleep: Sleep[Pure] = PureSleep.interpreter + + val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init) + + val transaction: Transaction[Pure, Pure] = RetryingTransaction.wrap(testRetries, PureTransaction.interpreter) + + val program = dao.executeUpdate(Statement.VacuumEvents, DAO.Purpose.NonLoading) + + val expected = List( + PureTransaction.StartMessage, + LogEntry.Sql(Statement.VacuumEvents), + PureTransaction.CommitMessage + ) + + val result = transaction.transact(program).runS + + result.getLog must beEqualTo(expected) + } + + "not retry no-transaction when there are no errors" in { + implicit val logging: Logging[Pure] = PureLogging.interpreter(noop = true) + implicit val sleep: Sleep[Pure] = PureSleep.interpreter + + val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init) + + val transaction: Transaction[Pure, Pure] = RetryingTransaction.wrap(testRetries, PureTransaction.interpreter) + + val program = dao.executeUpdate(Statement.VacuumEvents, DAO.Purpose.NonLoading) + + val expected = List( + PureTransaction.NoTransactionMessage, + LogEntry.Sql(Statement.VacuumEvents) + ) + + val result = transaction.run(program).runS + + result.getLog must beEqualTo(expected) + } + + "retry transaction when there is an error" in { + implicit val logging: Logging[Pure] = PureLogging.interpreter(noop = true) + implicit val sleep: Sleep[Pure] = PureSleep.interpreter + + val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init.withExecuteUpdate(isFirstAttempt, raiseException)) + + val transaction: Transaction[Pure, Pure] = RetryingTransaction.wrap(testRetries, PureTransaction.interpreter) + + val program = dao.executeUpdate(Statement.VacuumEvents, DAO.Purpose.NonLoading) + + val expected = List( + PureTransaction.StartMessage, + PureTransaction.RollbackMessage, + LogEntry.Message("SLEEP 30000000000 nanoseconds"), + PureTransaction.StartMessage, + LogEntry.Sql(Statement.VacuumEvents), + PureTransaction.CommitMessage + ) + + val result = transaction.transact(program).runS + + result.getLog must beEqualTo(expected) + } + + "retry no-transaction when there is an error" in { + implicit val logging: Logging[Pure] = PureLogging.interpreter(noop = true) + implicit val sleep: Sleep[Pure] = PureSleep.interpreter + + val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init.withExecuteUpdate(isFirstAttempt, raiseException)) + + val transaction: Transaction[Pure, Pure] = RetryingTransaction.wrap(testRetries, PureTransaction.interpreter) + + val program = dao.executeUpdate(Statement.VacuumEvents, DAO.Purpose.NonLoading) + + val expected = List( + PureTransaction.NoTransactionMessage, + LogEntry.Message("SLEEP 30000000000 nanoseconds"), + PureTransaction.NoTransactionMessage, + LogEntry.Sql(Statement.VacuumEvents) + ) + + val result = transaction.run(program).runS + + result.getLog must beEqualTo(expected) + } + + "retry according to the target check retry config if we cannot get a connection" in { + // Simulates the case when we are waiting for the Databricks cluster to start up. + + // The known exception we get from Hikari Pool when the target is not ready + val targetNotReadyException = new SQLTransientConnectionException("timeout getting exception") + + // Retry config that tries for 10 * 30 seconds = 300 seconds + val targetCheckRetries = Config.Retries(Config.Strategy.Constant, Some(10), 30.seconds, Some(1.hour)) + + // Retry config that tries for 2 * 10 seconds = 20 seconds + val executionRetries = Config.Retries(Config.Strategy.Constant, Some(2), 10.seconds, Some(1.hour)) + + val resources = for { + implicit0(dispatcher: Dispatcher[IO]) <- Dispatcher.parallel[IO] + implicit0(logging: Logging[IO]) = Logging.noOp[IO] + transactor = RetryingTransactor.wrap(targetCheckRetries, failingTransactor(targetNotReadyException)) + realTransaction = Transaction.jdbcRealInterpreter(SpecHelpers.validConfig.storage, SpecHelpers.validConfig.timeouts, transactor) + } yield RetryingTransaction.wrap(executionRetries, realTransaction) + + val io = resources.use { retryingTransaction => + for { + either <- retryingTransaction.transact(simpleConnectionIO).attempt + now <- IO.monotonic + } yield (either, now) + } + + val (either, timeTaken) = TestControl.executeEmbed(io).unsafeRunSync() + + either must beLeft(haveClass[RetryingTransactor.ExceededRetriesException]) + timeTaken must_== 300.seconds // expected time taken when using the target check retries config + } + + "retry according to the main retry config if the connection has an error" in { + + // The kind of exception we get from Hikari when a connection is made but results in error + val unexpectedException = + new SQLTransientConnectionException("timeout getting exception", new SQLTransientException("some underyling problem")) + + // Retry config that tries for 10 * 30 seconds = 300 seconds + val targetCheckRetries = Config.Retries(Config.Strategy.Constant, Some(10), 30.seconds, Some(1.hour)) + + // Retry config that tries for 2 * 10 seconds = 20 seconds + val executionRetries = Config.Retries(Config.Strategy.Constant, Some(2), 10.seconds, Some(1.hour)) + + val resources = for { + implicit0(dispatcher: Dispatcher[IO]) <- Dispatcher.parallel[IO] + implicit0(logging: Logging[IO]) = Logging.noOp[IO] + transactor = RetryingTransactor.wrap(targetCheckRetries, failingTransactor(unexpectedException)) + realTransaction = Transaction.jdbcRealInterpreter(SpecHelpers.validConfig.storage, SpecHelpers.validConfig.timeouts, transactor) + } yield RetryingTransaction.wrap(executionRetries, realTransaction) + + val io = resources.use { retryingTransaction => + for { + either <- retryingTransaction.transact(simpleConnectionIO).attempt + now <- IO.monotonic + } yield (either, now) + } + + val (either, timeTaken) = TestControl.executeEmbed(io).unsafeRunSync() + + either must beLeft(haveClass[Transaction.TransactionException]) + timeTaken must_== 20.seconds // expected time taken when using the main (not target check) retries config + } + } +} + +object RetryingTransactorSpec { + + val testRetries: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour)) + + def isFirstAttempt(sql: Statement, ts: TestState) = + sql match { + case Statement.VacuumEvents => + ts.getLog.count { + case PureTransaction.StartMessage => true + case PureTransaction.NoTransactionMessage => true + case _ => false + } == 1 + case _ => false + } + + def raiseException: Pure[Int] = + Pure.fail(new RuntimeException("boom")) + + // A doobie Transactor that throws an exception when trying to get a connection + def failingTransactor(exception: Throwable): Transactor.Aux[IO, Unit] = { + val interpreter = new FunctionK[ConnectionOp, Kleisli[IO, Connection, *]] { + def apply[A](fa: ConnectionOp[A]): Kleisli[IO, Connection, A] = + Kleisli(_ => IO.raiseError(new RuntimeException("interpreter error"))) + } + val resource: Resource[IO, Connection] = + Resource.eval(IO.raiseError[Connection](exception)) + Transactor((), _ => resource, interpreter, Strategy.void) + } + + val simpleConnectionIO: ConnectionIO[Int] = for { + one <- sql"SELECT 1".query[Int].unique + } yield one +} diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala index 9a4e93936..e5e94850b 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala @@ -16,13 +16,13 @@ import java.time.Instant import scala.concurrent.duration.FiniteDuration import cats.syntax.option._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} -import com.snowplowanalytics.snowplow.rdbloader.{LoaderError, SpecHelpers} +import com.snowplowanalytics.snowplow.rdbloader.LoaderError import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{Processor, Timestamps, TypesInfo} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression import com.snowplowanalytics.snowplow.rdbloader.common.config.Semver import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} -import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Transaction} +import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, RetryingTransaction, Transaction} import com.snowplowanalytics.snowplow.rdbloader.db.{Manifest, Statement} import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService.LoadAuthMethod @@ -87,7 +87,6 @@ class LoadSpec extends Specification { val result = Load .load[Pure, Pure, Unit]( - SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, @@ -120,7 +119,6 @@ class LoadSpec extends Specification { val result = Load .load[Pure, Pure, Unit]( - SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, @@ -154,7 +152,6 @@ class LoadSpec extends Specification { val result = Load .load[Pure, Pure, Unit]( - SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, @@ -168,10 +165,10 @@ class LoadSpec extends Specification { "abort, sleep and start transaction again if first commit failed" in { implicit val logging: Logging[Pure] = PureLogging.interpreter(noop = true) - implicit val transaction: Transaction[Pure, Pure] = PureTransaction.interpreter implicit val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init.withExecuteUpdate(isBeforeFirstCommit, failCommit)) implicit val iglu: Iglu[Pure] = PureIglu.interpreter implicit val sleep: Sleep[Pure] = PureSleep.interpreter + implicit val transaction: Transaction[Pure, Pure] = RetryingTransaction.wrap(validConfig.retries, PureTransaction.interpreter) implicit val loadAuthService: LoadAuthService[Pure] = PureLoadAuthService.interpreter val info = ShreddedType.Json( @@ -220,7 +217,6 @@ class LoadSpec extends Specification { ) val result = Load .load[Pure, Pure, Unit]( - SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, @@ -262,7 +258,6 @@ class LoadSpec extends Specification { ) val result = Load .load[Pure, Pure, Unit]( - SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, @@ -353,7 +348,11 @@ object LoadSpec { def isBeforeFirstCommit(sql: Statement, ts: TestState) = sql match { - case Statement.ManifestAdd(_) => ts.getLog.length == 8 + case Statement.ManifestAdd(_) => + ts.getLog.count { + case PureTransaction.StartMessage => true + case _ => false + } == 1 case _ => false } diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureTransaction.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureTransaction.scala index 10101efad..49ff83bed 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureTransaction.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureTransaction.scala @@ -49,8 +49,6 @@ object PureTransaction { Pure.modify(_.log(NoTransaction)) *> io - override def run_(io: Pure[Unit]): Pure[Unit] = run[Unit](io) - def arrowBack: Pure ~> Pure = new FunctionK[Pure, Pure] { def apply[A](fa: Pure[A]): Pure[A] = fa