From e73eca3feb3311decc7177a56092c6ee81c04ed1 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Mon, 10 Jul 2023 23:18:50 +0100 Subject: [PATCH] Loader: Pre-transaction migrations must always run to completion --- .../snowplow/rdbloader/db/Migration.scala | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala index 4766db2e2..58bb23b8f 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala @@ -240,12 +240,14 @@ object Migration { migration.addPreTransaction(action) case (migration, b @ Block(pre, in, entity)) if pre.nonEmpty && in.nonEmpty => - val preAction = preMigration[F](shouldAdd, entity, pre) + val preActions = preMigrations[F](shouldAdd, entity, pre) val inAction = Logging[F].info(s"Migrating ${b.getName} (in-transaction)") *> in.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading)) *> DAO[F].executeUpdate(b.getCommentOn, DAO.Purpose.NonLoading) *> Logging[F].info(s"${b.getName} migration completed") - migration.addPreTransaction(preAction).addInTransaction(inAction) + preActions + .foldLeft(migration)(_.addPreTransaction(_)) + .addInTransaction(inAction) case (migration, b @ Block(Nil, in, target)) if b.isCreation => val inAction = Logging[F].info(s"Creating ${b.getName} table for ${target.getInfo.toSchemaUri}") *> @@ -262,28 +264,34 @@ object Migration { migration.addInTransaction(inAction) case (migration, b @ Block(pre, Nil, Entity.Table(_, _, _))) => - val preAction = Logging[F].info(s"Migrating ${b.getName} (pre-transaction)") *> - pre.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading).void) + val preActions = pre.map { item => + Logging[F].info(s"Migrating ${b.getName} $item (pre-transaction)") *> + DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading).void + } val commentAction = DAO[F].executeUpdate(b.getCommentOn, DAO.Purpose.NonLoading).void *> Logging[F].info(s"${b.getName} migration completed") - migration.addPreTransaction(preAction).addPreTransaction(commentAction) + preActions + .foldLeft(migration)(_.addPreTransaction(_)) + .addPreTransaction(commentAction) case (migration, Block(pre, Nil, column)) => - val preAction = preMigration[F](shouldAdd, column, pre) - migration.addPreTransaction(preAction) + val preActions = preMigrations[F](shouldAdd, column, pre) + preActions.foldLeft(migration)(_.addPreTransaction(_)) } } - def preMigration[F[_]: DAO: Logging: Monad]( + def preMigrations[F[_]: DAO: Logging: Monad]( shouldAdd: Entity => Boolean, entity: Entity, items: List[Item] - ) = + ): List[F[Unit]] = if (shouldAdd(entity)) - Logging[F].info(s"Migrating ${entity.getName} (pre-transaction)") *> - items.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading).void) - else Monad[F].unit + items.map { item => + Logging[F].info(s"Migrating ${entity.getName} $item (pre-transaction)") *> + DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading).void + } + else List(Monad[F].unit) /** Find the latest schema version in the table and confirm that it is the latest in `schemas` */ def getVersion[F[_]: DAO](tableName: String): F[SchemaKey] = @@ -329,13 +337,7 @@ object Migration { entity match { case Entity.Table(_, _, _) => false case Entity.Column(info) => - val f = !columns.map(_.toLowerCase).contains(info.getNameFull.toLowerCase) - if (f) { - println(columns) - println(info.getNameFull.toLowerCase) - println(s"True for ${info}") - } else println(s"False for ${info}") - f + !columns.map(_.toLowerCase).contains(info.getNameFull.toLowerCase) } } }