diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala index 7ab8c2437..ba33dbbc4 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala @@ -18,7 +18,6 @@ import cats.{Applicative, Monad, MonadThrow} import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.iglu.schemaddl.IgluSchema import com.snowplowanalytics.iglu.schemaddl.redshift._ -import com.snowplowanalytics.snowplow.rdbloader.db.Columns.ColumnName import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Transaction} import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable @@ -195,14 +194,8 @@ object Migration { for { schemaKeyInTable <- getVersion[F](tableName) matches = goodModel.schemaKey == schemaKeyInTable - block <- if (matches) emptyBlock[F] - else - Control.getColumns[F](tableName).flatMap[Option[Block]] { (columns: List[ColumnName]) => - migrateTable(target, schemaKeyInTable, goodModel, columns, schemas) match { - case Left(migrationError) => MonadThrow[F].raiseError(migrationError) - case Right(block) => Monad[F].pure(block.some) - } - } + block <- if (matches) Monad[F].pure(None) + else Monad[F].pure(target.updateTable(goodModel, schemaKeyInTable).some) } yield block val recoveryBlocks: List[Block] = schemaKeysWithModels.values @@ -221,23 +214,6 @@ object Migration { Monad[F].pure(Nil) } - def migrateTable[I]( - target: Target[I], - current: SchemaKey, - shredModel: ShredModel.GoodModel, - columns: List[ColumnName], - schemaList: NonEmptyList[IgluSchema] - ): Either[LoaderError, Block] = - if (schemaList.size > 1) { - target.updateTable(shredModel, current).asRight - } else { - val message = - s"Illegal State: updateTable called for a table with known single schema [${shredModel.schemaKey.toSchemaUri}]\ncolumns: ${columns - .map(_.value) - .mkString(", ")}\nstate: $schemaList" - LoaderError.MigrationError(message).asLeft - } - def fromBlocks[F[_]: Monad: DAO: Logging](blocks: List[Block]): F[Migration[F]] = getPredicate[F](blocks).map { shouldAdd => blocks.foldLeft(Migration.empty[F]) { @@ -291,9 +267,6 @@ object Migration { items.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading).void) else Monad[F].unit - def emptyBlock[F[_]: Monad]: F[Option[Block]] = - Monad[F].pure(None) - /** Find the latest schema version in the table and confirm that it is the latest in `schemas` */ def getVersion[F[_]: DAO](tableName: String): F[SchemaKey] = DAO[F].executeQuery[SchemaKey](Statement.GetVersion(tableName))(readSchemaKey)