Skip to content

Commit

Permalink
Loader: Pre-transaction migrations must always run to completion
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and oguzhanunlu committed Sep 5, 2023
1 parent cbacfb9 commit e73eca3
Showing 1 changed file with 21 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,14 @@ object Migration {
migration.addPreTransaction(action)

case (migration, b @ Block(pre, in, entity)) if pre.nonEmpty && in.nonEmpty =>
val preAction = preMigration[F](shouldAdd, entity, pre)
val preActions = preMigrations[F](shouldAdd, entity, pre)
val inAction = Logging[F].info(s"Migrating ${b.getName} (in-transaction)") *>
in.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading)) *>
DAO[F].executeUpdate(b.getCommentOn, DAO.Purpose.NonLoading) *>
Logging[F].info(s"${b.getName} migration completed")
migration.addPreTransaction(preAction).addInTransaction(inAction)
preActions
.foldLeft(migration)(_.addPreTransaction(_))
.addInTransaction(inAction)

case (migration, b @ Block(Nil, in, target)) if b.isCreation =>
val inAction = Logging[F].info(s"Creating ${b.getName} table for ${target.getInfo.toSchemaUri}") *>
Expand All @@ -262,28 +264,34 @@ object Migration {
migration.addInTransaction(inAction)

case (migration, b @ Block(pre, Nil, Entity.Table(_, _, _))) =>
val preAction = Logging[F].info(s"Migrating ${b.getName} (pre-transaction)") *>
pre.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading).void)
val preActions = pre.map { item =>
Logging[F].info(s"Migrating ${b.getName} $item (pre-transaction)") *>
DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading).void
}
val commentAction =
DAO[F].executeUpdate(b.getCommentOn, DAO.Purpose.NonLoading).void *>
Logging[F].info(s"${b.getName} migration completed")
migration.addPreTransaction(preAction).addPreTransaction(commentAction)
preActions
.foldLeft(migration)(_.addPreTransaction(_))
.addPreTransaction(commentAction)

case (migration, Block(pre, Nil, column)) =>
val preAction = preMigration[F](shouldAdd, column, pre)
migration.addPreTransaction(preAction)
val preActions = preMigrations[F](shouldAdd, column, pre)
preActions.foldLeft(migration)(_.addPreTransaction(_))
}
}

def preMigration[F[_]: DAO: Logging: Monad](
def preMigrations[F[_]: DAO: Logging: Monad](
shouldAdd: Entity => Boolean,
entity: Entity,
items: List[Item]
) =
): List[F[Unit]] =
if (shouldAdd(entity))
Logging[F].info(s"Migrating ${entity.getName} (pre-transaction)") *>
items.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading).void)
else Monad[F].unit
items.map { item =>
Logging[F].info(s"Migrating ${entity.getName} $item (pre-transaction)") *>
DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading).void
}
else List(Monad[F].unit)

/** Find the latest schema version in the table and confirm that it is the latest in `schemas` */
def getVersion[F[_]: DAO](tableName: String): F[SchemaKey] =
Expand Down Expand Up @@ -329,13 +337,7 @@ object Migration {
entity match {
case Entity.Table(_, _, _) => false
case Entity.Column(info) =>
val f = !columns.map(_.toLowerCase).contains(info.getNameFull.toLowerCase)
if (f) {
println(columns)
println(info.getNameFull.toLowerCase)
println(s"True for ${info}")
} else println(s"False for ${info}")
f
!columns.map(_.toLowerCase).contains(info.getNameFull.toLowerCase)
}
}
}
Expand Down

0 comments on commit e73eca3

Please sign in to comment.