diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala index 971a6006d..eb9eb5de2 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala @@ -16,6 +16,7 @@ import cats.data.{EitherT, NonEmptyList} import cats.implicits._ import cats.{Applicative, Monad, MonadThrow} import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} +import com.snowplowanalytics.iglu.core.SchemaKey.ordering import com.snowplowanalytics.iglu.schemaddl.IgluSchema import com.snowplowanalytics.iglu.schemaddl.redshift._ import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel.RecoveryModel @@ -25,6 +26,8 @@ import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable import com.snowplowanalytics.snowplow.rdbloader.{LoaderAction, LoaderError, readSchemaKey} import doobie.Fragment +import scala.math.Ordered.orderingToOrdered + /** * Sequences of DDL statement executions that have to be applied to a DB in order to make it * compatible with a certain `DataDiscovery` (batch of data) Unlike `Block`, which is set of @@ -118,7 +121,7 @@ object Migration { * @note * since all [[Item]]s contain `Fragment` there's no safe `equals` operations */ - sealed trait Item { + sealed trait Item extends Product with Serializable { def statement: Statement } @@ -144,6 +147,11 @@ object Migration { final case class CreateTable(createTable: Fragment) extends Item { val statement: Statement = Statement.CreateTable(createTable) } + + /** COMMENT ON. Can be combined with [[CreateTable]] or [[AddColumn]] in [[Block]] */ + final case class CommentOn(tableName: String, comment: String) extends Item { + val statement: Statement = Statement.CommentOn(tableName, comment) + } } /** Find the maximum SchemaKey for all table names in a given set of shredded types */ @@ -195,32 +203,44 @@ object Migration { def empty[F[_]: Applicative]: Migration[F] = Migration[F](Nil, Applicative[F].unit) - def buildBlock[F[_]: MonadThrow: DAO, I](description: Description, target: Target[I]): F[List[Block]] = + implicit val ord: Ordering[SchemaKey] = ordering + + def createMissingRecoveryTables[F[_]: Monad: DAO, I]( + target: Target[I], + recoveryModels: Map[SchemaKey, RecoveryModel], + highestSchemaKey: SchemaKey + ): F[List[Block]] = + recoveryModels.toList + .filter(_._1 <= highestSchemaKey) + .traverseFilter[F, Block] { case (_, rm) => + Control.tableExists[F](rm.tableName).ifM(Applicative[F].pure(None), Applicative[F].pure(Some(target.createTable(rm)))) + } + + def updateGoodTable[F[_]: Monad: DAO, I]( + target: Target[I], + goodModel: ShredModel.GoodModel, + highestSchemaKey: SchemaKey + ): F[List[Block]] = + for { + schemaKeyInTable <- getVersion[F](goodModel.tableName) + matches = goodModel.schemaKey == schemaKeyInTable + block <- if (matches) Monad[F].pure(Nil) + else Monad[F].pure(List(target.updateTable(goodModel, schemaKeyInTable, highestSchemaKey))) + } yield block + + def buildBlock[F[_]: Monad: DAO, I](description: Description, target: Target[I]): F[List[Block]] = description match { case Description.Table(schemas, highestSchemaKey) => val schemaKeysWithModels = foldMapMergeRedshiftSchemas(schemas) val goodModel = schemaKeysWithModels.goodModel - val goodTableName = goodModel.tableName - val optUpdateGoodTable: F[Option[Block]] = + val createTables: F[List[Block]] = for { - schemaKeyInTable <- getVersion[F](goodTableName) - matches = goodModel.schemaKey == schemaKeyInTable - block <- if (matches) Monad[F].pure(None) - else Monad[F].pure(target.updateTable(goodModel, schemaKeyInTable, highestSchemaKey).some) - } yield block - - val createMissingRecoveryTables: F[List[Block]] = schemaKeysWithModels.recoveryModels.values.toList - .traverseFilter(rm => - Control.tableExists[F](rm.tableName).ifM(Applicative[F].pure(Option.empty[RecoveryModel]), Applicative[F].pure(rm.some)) - ) - .map(_.map(target.createTable)) - - val createTables: F[List[Block]] = Control - .tableExists[F](goodTableName) - .ifM(createMissingRecoveryTables, createMissingRecoveryTables.map(_.::(target.createTable(goodModel)))) - - optUpdateGoodTable.flatMap(_.fold(createTables)(ugt => createTables.map(_ :+ ugt))) + createGood <- Monad[F].pure(target.createTable(goodModel)) + createRecovery <- createMissingRecoveryTables[F, I](target, schemaKeysWithModels.recoveryModels, highestSchemaKey) + } yield createGood :: createRecovery + + Control.tableExists[F](goodModel.tableName).ifM(updateGoodTable[F, I](target, goodModel, highestSchemaKey), createTables) case Description.WideRow(info) => Monad[F].pure(target.extendTable(info)) diff --git a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala index 8961c008f..3d2cd0687 100644 --- a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala +++ b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala @@ -57,27 +57,33 @@ object Redshift { ): Block = { val outTransactions = shredModel.migrations.outTransaction(Some(currentSchemaKey), Some(highestSchemaKey)) val inTransactions = shredModel.migrations.inTransaction(Some(currentSchemaKey), Some(highestSchemaKey)) - val outTransactionToSql = + + val preTransaction = outTransactions.map { varcharExtension => - s"""ALTER TABLE $schema.${shredModel.tableName} - | ALTER COLUMN "${varcharExtension.old.columnName}" TYPE ${varcharExtension.newEntry.columnType.show} - |""".stripMargin - } - val inTransactionToSql = - if (inTransactions.isEmpty) { - // No added columns can be expressed in SQL migration - s"COMMENT ON TABLE $schema.${shredModel.tableName} IS '${shredModel.schemaKey.toSchemaUri}'" :: Nil - } else { - inTransactions.map { columnAddition => - s"""ALTER TABLE $schema.${shredModel.tableName} - | ADD COLUMN "${columnAddition.column.columnName}" ${columnAddition.column.columnType.show} ${columnAddition.column.compressionEncoding.show} + Item.AlterColumn( + Fragment.const0( + s"""ALTER TABLE $schema.${shredModel.tableName} + | ALTER COLUMN "${varcharExtension.old.columnName}" TYPE ${varcharExtension.newEntry.columnType.show} |""".stripMargin - } :+ s"COMMENT ON TABLE $schema.${shredModel.tableName} IS '${shredModel.schemaKey.toSchemaUri}'" + ) + ) } - val preTransaction = outTransactionToSql.map(s => Item.AlterColumn(Fragment.const0(s))) - val inTransaction = inTransactionToSql.map(s => Item.AddColumn(Fragment.const0(s), Nil)) - Block(preTransaction, inTransaction, Entity.Table(schema, shredModel.schemaKey, shredModel.tableName)) + val inTransaction = + if (inTransactions.isEmpty) Nil + else + inTransactions.map { columnAddition => + Item.AddColumn( + Fragment.const0( + s"""ALTER TABLE $schema.${shredModel.tableName} + | ADD COLUMN "${columnAddition.column.columnName}" ${columnAddition.column.columnType.show} ${columnAddition.column.compressionEncoding.show} + |""".stripMargin + ), + Nil + ) + } :+ Item.CommentOn(shredModel.tableName, highestSchemaKey.toSchemaUri) + + Block(preTransaction, inTransaction, Entity.Table(schema, highestSchemaKey, shredModel.tableName)) } override def extendTable(info: ShreddedType.Info): List[Block] = @@ -115,7 +121,23 @@ object Redshift { override def createTable(shredModel: ShredModel): Block = Block( Nil, - List(Item.CreateTable(Fragment.const0(shredModel.toTableSql(schema)))), + List( + Item.CreateTable( + Fragment.const0( + s""" + |CREATE TABLE IF NOT EXISTS $schema.${shredModel.tableName} ( + |${shredModel.entries.show}, + | FOREIGN KEY (root_id) REFERENCES $schema.events(event_id) + |) + |DISTSTYLE KEY + |DISTKEY (root_id) + |SORTKEY (root_tstamp); + | + |COMMENT ON TABLE $schema.${shredModel.tableName} IS '${shredModel.schemaKey.toSchemaUri}'; + |""".stripMargin + ) + ) + ), Entity.Table(schema, shredModel.schemaKey, shredModel.tableName) ) diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala index a067447fb..c5d6f9847 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala @@ -40,24 +40,18 @@ class RedshiftSpec extends Specification { "create a Block with in-transaction migration" in { implicit val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.custom(jdbcResults)) - val description = Description.Table(MigrationSpec.schemaListTwo) + val description = Description.Table(MigrationSpec.schemaListTwo, MigrationSpec.schema101.self.schemaKey) val (_, result) = Migration.buildBlock[Pure, Unit](description, redshift).run val expected = - """Fragment(" - |BEGIN TRANSACTION; - | - | ALTER TABLE atomic.com_acme_context_1 - | ADD COLUMN "three" VARCHAR(4096) ENCODE ZSTD; - | - | COMMENT ON TABLE atomic.com_acme_context_1 IS 'iglu:com.acme/context/jsonschema/1-0-1'; - | - |END TRANSACTION;")""".stripMargin + """Fragment("ALTER TABLE atomic.com_acme_context_1 + | ADD COLUMN "three" VARCHAR(4096) ENCODE ZSTD + |")""".stripMargin result must beLike { case Right(f :: Nil) => f.preTransaction must haveSize(0) - f.inTransaction must haveSize(1) + f.inTransaction must haveSize(2) f.inTransaction.head must beLike { case Migration.Item.AddColumn(fragment, Nil) => fragment.toString() must beEqualTo(expected) case i => ko(s"unexpected migration item: $i") @@ -70,12 +64,12 @@ class RedshiftSpec extends Specification { "create a Block with pre-transaction migration" in { implicit val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.custom(jdbcResults)) - val description = Description.Table(MigrationSpec.schemaListThree) + val description = Description.Table(MigrationSpec.schemaListThree, MigrationSpec.schema201.self.schemaKey) val (_, result) = Migration.buildBlock[Pure, Unit](description, redshift).run val expected = - """Fragment(" ALTER TABLE atomic.com_acme_context_2 - | ALTER COLUMN "one" TYPE VARCHAR(64); + """Fragment("ALTER TABLE atomic.com_acme_context_2 + | ALTER COLUMN "one" TYPE VARCHAR(64) |")""".stripMargin result must beLike {