Skip to content

Commit

Permalink
fix & make it more readable
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Jul 20, 2023
1 parent 29bf442 commit fb8d691
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import cats.data.{EitherT, NonEmptyList}
import cats.implicits._
import cats.{Applicative, Monad, MonadThrow}
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
import com.snowplowanalytics.iglu.core.SchemaKey.ordering
import com.snowplowanalytics.iglu.schemaddl.IgluSchema
import com.snowplowanalytics.iglu.schemaddl.redshift._
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel.RecoveryModel
Expand All @@ -25,6 +26,8 @@ import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable
import com.snowplowanalytics.snowplow.rdbloader.{LoaderAction, LoaderError, readSchemaKey}
import doobie.Fragment

import scala.math.Ordered.orderingToOrdered

/**
* Sequences of DDL statement executions that have to be applied to a DB in order to make it
* compatible with a certain `DataDiscovery` (batch of data) Unlike `Block`, which is set of
Expand Down Expand Up @@ -118,7 +121,7 @@ object Migration {
* @note
* since all [[Item]]s contain `Fragment` there's no safe `equals` operations
*/
sealed trait Item {
sealed trait Item extends Product with Serializable {
def statement: Statement
}

Expand All @@ -144,6 +147,11 @@ object Migration {
final case class CreateTable(createTable: Fragment) extends Item {
val statement: Statement = Statement.CreateTable(createTable)
}

/** COMMENT ON. Can be combined with [[CreateTable]] or [[AddColumn]] in [[Block]] */
final case class CommentOn(tableName: String, comment: String) extends Item {
val statement: Statement = Statement.CommentOn(tableName, comment)
}
}

/** Find the maximum SchemaKey for all table names in a given set of shredded types */
Expand Down Expand Up @@ -195,32 +203,44 @@ object Migration {
def empty[F[_]: Applicative]: Migration[F] =
Migration[F](Nil, Applicative[F].unit)

def buildBlock[F[_]: MonadThrow: DAO, I](description: Description, target: Target[I]): F[List[Block]] =
implicit val ord: Ordering[SchemaKey] = ordering

def createMissingRecoveryTables[F[_]: Monad: DAO, I](
target: Target[I],
recoveryModels: Map[SchemaKey, RecoveryModel],
highestSchemaKey: SchemaKey
): F[List[Block]] =
recoveryModels.toList
.filter(_._1 <= highestSchemaKey)
.traverseFilter[F, Block] { case (_, rm) =>
Control.tableExists[F](rm.tableName).ifM(Applicative[F].pure(None), Applicative[F].pure(Some(target.createTable(rm))))
}

def updateGoodTable[F[_]: Monad: DAO, I](
target: Target[I],
goodModel: ShredModel.GoodModel,
highestSchemaKey: SchemaKey
): F[List[Block]] =
for {
schemaKeyInTable <- getVersion[F](goodModel.tableName)
matches = goodModel.schemaKey == schemaKeyInTable
block <- if (matches) Monad[F].pure(Nil)
else Monad[F].pure(List(target.updateTable(goodModel, schemaKeyInTable, highestSchemaKey)))
} yield block

def buildBlock[F[_]: Monad: DAO, I](description: Description, target: Target[I]): F[List[Block]] =
description match {
case Description.Table(schemas, highestSchemaKey) =>
val schemaKeysWithModels = foldMapMergeRedshiftSchemas(schemas)
val goodModel = schemaKeysWithModels.goodModel
val goodTableName = goodModel.tableName

val optUpdateGoodTable: F[Option[Block]] =
val createTables: F[List[Block]] =
for {
schemaKeyInTable <- getVersion[F](goodTableName)
matches = goodModel.schemaKey == schemaKeyInTable
block <- if (matches) Monad[F].pure(None)
else Monad[F].pure(target.updateTable(goodModel, schemaKeyInTable, highestSchemaKey).some)
} yield block

val createMissingRecoveryTables: F[List[Block]] = schemaKeysWithModels.recoveryModels.values.toList
.traverseFilter(rm =>
Control.tableExists[F](rm.tableName).ifM(Applicative[F].pure(Option.empty[RecoveryModel]), Applicative[F].pure(rm.some))
)
.map(_.map(target.createTable))

val createTables: F[List[Block]] = Control
.tableExists[F](goodTableName)
.ifM(createMissingRecoveryTables, createMissingRecoveryTables.map(_.::(target.createTable(goodModel))))

optUpdateGoodTable.flatMap(_.fold(createTables)(ugt => createTables.map(_ :+ ugt)))
createGood <- Monad[F].pure(target.createTable(goodModel))
createRecovery <- createMissingRecoveryTables[F, I](target, schemaKeysWithModels.recoveryModels, highestSchemaKey)
} yield createGood :: createRecovery

Control.tableExists[F](goodModel.tableName).ifM(updateGoodTable[F, I](target, goodModel, highestSchemaKey), createTables)

case Description.WideRow(info) =>
Monad[F].pure(target.extendTable(info))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,33 @@ object Redshift {
): Block = {
val outTransactions = shredModel.migrations.outTransaction(Some(currentSchemaKey), Some(highestSchemaKey))
val inTransactions = shredModel.migrations.inTransaction(Some(currentSchemaKey), Some(highestSchemaKey))
val outTransactionToSql =

val preTransaction =
outTransactions.map { varcharExtension =>
s"""ALTER TABLE $schema.${shredModel.tableName}
| ALTER COLUMN "${varcharExtension.old.columnName}" TYPE ${varcharExtension.newEntry.columnType.show}
|""".stripMargin
}
val inTransactionToSql =
if (inTransactions.isEmpty) {
// No added columns can be expressed in SQL migration
s"COMMENT ON TABLE $schema.${shredModel.tableName} IS '${shredModel.schemaKey.toSchemaUri}'" :: Nil
} else {
inTransactions.map { columnAddition =>
s"""ALTER TABLE $schema.${shredModel.tableName}
| ADD COLUMN "${columnAddition.column.columnName}" ${columnAddition.column.columnType.show} ${columnAddition.column.compressionEncoding.show}
Item.AlterColumn(
Fragment.const0(
s"""ALTER TABLE $schema.${shredModel.tableName}
| ALTER COLUMN "${varcharExtension.old.columnName}" TYPE ${varcharExtension.newEntry.columnType.show}
|""".stripMargin
} :+ s"COMMENT ON TABLE $schema.${shredModel.tableName} IS '${shredModel.schemaKey.toSchemaUri}'"
)
)
}
val preTransaction = outTransactionToSql.map(s => Item.AlterColumn(Fragment.const0(s)))
val inTransaction = inTransactionToSql.map(s => Item.AddColumn(Fragment.const0(s), Nil))

Block(preTransaction, inTransaction, Entity.Table(schema, shredModel.schemaKey, shredModel.tableName))
val inTransaction =
if (inTransactions.isEmpty) Nil
else
inTransactions.map { columnAddition =>
Item.AddColumn(
Fragment.const0(
s"""ALTER TABLE $schema.${shredModel.tableName}
| ADD COLUMN "${columnAddition.column.columnName}" ${columnAddition.column.columnType.show} ${columnAddition.column.compressionEncoding.show}
|""".stripMargin
),
Nil
)
} :+ Item.CommentOn(shredModel.tableName, highestSchemaKey.toSchemaUri)

Block(preTransaction, inTransaction, Entity.Table(schema, highestSchemaKey, shredModel.tableName))
}

override def extendTable(info: ShreddedType.Info): List[Block] =
Expand Down Expand Up @@ -115,7 +121,23 @@ object Redshift {
override def createTable(shredModel: ShredModel): Block =
Block(
Nil,
List(Item.CreateTable(Fragment.const0(shredModel.toTableSql(schema)))),
List(
Item.CreateTable(
Fragment.const0(
s"""
|CREATE TABLE IF NOT EXISTS $schema.${shredModel.tableName} (
|${shredModel.entries.show},
| FOREIGN KEY (root_id) REFERENCES $schema.events(event_id)
|)
|DISTSTYLE KEY
|DISTKEY (root_id)
|SORTKEY (root_tstamp);
|
|COMMENT ON TABLE $schema.${shredModel.tableName} IS '${shredModel.schemaKey.toSchemaUri}';
|""".stripMargin
)
)
),
Entity.Table(schema, shredModel.schemaKey, shredModel.tableName)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,18 @@ class RedshiftSpec extends Specification {
"create a Block with in-transaction migration" in {

implicit val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.custom(jdbcResults))
val description = Description.Table(MigrationSpec.schemaListTwo)
val description = Description.Table(MigrationSpec.schemaListTwo, MigrationSpec.schema101.self.schemaKey)
val (_, result) = Migration.buildBlock[Pure, Unit](description, redshift).run

val expected =
"""Fragment("
|BEGIN TRANSACTION;
|
| ALTER TABLE atomic.com_acme_context_1
| ADD COLUMN "three" VARCHAR(4096) ENCODE ZSTD;
|
| COMMENT ON TABLE atomic.com_acme_context_1 IS 'iglu:com.acme/context/jsonschema/1-0-1';
|
|END TRANSACTION;")""".stripMargin
"""Fragment("ALTER TABLE atomic.com_acme_context_1
| ADD COLUMN "three" VARCHAR(4096) ENCODE ZSTD
|")""".stripMargin

result must beLike {
case Right(f :: Nil) =>
f.preTransaction must haveSize(0)
f.inTransaction must haveSize(1)
f.inTransaction must haveSize(2)
f.inTransaction.head must beLike {
case Migration.Item.AddColumn(fragment, Nil) => fragment.toString() must beEqualTo(expected)
case i => ko(s"unexpected migration item: $i")
Expand All @@ -70,12 +64,12 @@ class RedshiftSpec extends Specification {
"create a Block with pre-transaction migration" in {

implicit val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.custom(jdbcResults))
val description = Description.Table(MigrationSpec.schemaListThree)
val description = Description.Table(MigrationSpec.schemaListThree, MigrationSpec.schema201.self.schemaKey)
val (_, result) = Migration.buildBlock[Pure, Unit](description, redshift).run

val expected =
"""Fragment(" ALTER TABLE atomic.com_acme_context_2
| ALTER COLUMN "one" TYPE VARCHAR(64);
"""Fragment("ALTER TABLE atomic.com_acme_context_2
| ALTER COLUMN "one" TYPE VARCHAR(64)
|")""".stripMargin

result must beLike {
Expand Down

0 comments on commit fb8d691

Please sign in to comment.