diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala index b501d67db..971a6006d 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala @@ -100,7 +100,7 @@ object Migration { * Works with separate tables (both create and update) and does support migration (hence all * schema info) */ - final case class Table(schemaList: NonEmptyList[IgluSchema]) extends Description + final case class Table(schemaList: NonEmptyList[IgluSchema], highestSchemaKey: SchemaKey) extends Description /** Works with only `events` table, creating a column for every new schema */ final case class WideRow(shreddedType: ShreddedType.Info) extends Description @@ -146,15 +146,30 @@ object Migration { } } + /** Find the maximum SchemaKey for all table names in a given set of shredded types */ + def getMaxSchemaKeyPerTableName(shreddedTypes: List[ShreddedType]): Map[String, SchemaKey] = + shreddedTypes + .filterNot(_.isAtomic) + .filter { + case _: ShreddedType.Tabular => true + case _ => false + } + .groupBy(_.info.getName) + .mapValues(_.maxBy(_.info.version).info.getSchemaKey) + /** Inspect DB state and create a [[Migration]] object that contains all necessary actions */ def build[F[_]: Transaction[*[_], C]: MonadThrow: Iglu, C[_]: MonadThrow: Logging: DAO, I]( discovery: DataDiscovery, target: Target[I] ): F[Migration[C]] = { + val maxSchemaKeyPerName = getMaxSchemaKeyPerTableName(discovery.shreddedTypes) + val descriptions: LoaderAction[F, List[Description]] = discovery.shreddedTypes.filterNot(_.isAtomic).traverse { case s: ShreddedType.Tabular => - EitherT(Iglu[F].getSchemasWithSameModel(s.info.getSchemaKey)).map(Description.Table) + EitherT(Iglu[F].getSchemasWithSameModel(s.info.getSchemaKey)).map { schemas => + Description.Table(schemas, maxSchemaKeyPerName.getOrElse(s.info.getName, s.info.getSchemaKey)) + } case ShreddedType.Widerow(info) => EitherT.rightT[F, LoaderError](Description.WideRow(info)) case ShreddedType.Json(_, _) => @@ -182,11 +197,10 @@ object Migration { def buildBlock[F[_]: MonadThrow: DAO, I](description: Description, target: Target[I]): F[List[Block]] = description match { - case Description.Table(schemas) => + case Description.Table(schemas, highestSchemaKey) => val schemaKeysWithModels = foldMapMergeRedshiftSchemas(schemas) val goodModel = schemaKeysWithModels.goodModel val goodTableName = goodModel.tableName - val highestSchemaKey = schemas.last.self.schemaKey val optUpdateGoodTable: F[Option[Block]] = for { diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala index f7a9b9177..19e3fda90 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala @@ -57,7 +57,11 @@ trait Target[I] { def getEventTable: Statement /** Generate a DB-specification migration Block for updating a *separate* table */ - def updateTable(shredModel: ShredModel.GoodModel, currentSchemaKey: SchemaKey, highestSchemaKey: SchemaKey): Block + def updateTable( + shredModel: ShredModel.GoodModel, + currentSchemaKey: SchemaKey, + highestSchemaKey: SchemaKey + ): Block /** Create a table with columns dervived from list of Iglu schemas */ def createTable(shredModel: ShredModel): Block diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/db/MigrationSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/db/MigrationSpec.scala new file mode 100644 index 000000000..e9905043b --- /dev/null +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/db/MigrationSpec.scala @@ -0,0 +1,99 @@ +package com.snowplowanalytics.snowplow.rdbloader.db + +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.SnowplowEntity +import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType +import org.specs2.mutable.Specification + +class MigrationSpec extends Specification { + + "getMaxSchemaKeyPerTableName" should { + "ignore non-tsv & detect max schema key per table name" in { + val shreddedTypes = List( + ShreddedType.Tabular( + ShreddedType.Info( + BlobStorage.Folder.coerce("s3://shredded/archive"), + "com.acme", + "some_context", + SchemaVer.Full(2, 0, 0), + SnowplowEntity.Context + ) + ), + ShreddedType.Tabular( + ShreddedType.Info( + BlobStorage.Folder.coerce("s3://shredded/archive"), + "com.acme", + "some_context", + SchemaVer.Full(2, 1, 0), + SnowplowEntity.Context + ) + ), + ShreddedType.Json( + ShreddedType.Info( + BlobStorage.Folder.coerce("s3://shredded/archive"), + "com.acme", + "some_event", + SchemaVer.Full(1, 0, 0), + SnowplowEntity.Context + ), + BlobStorage.Key.coerce("s3://shredded/jsonpaths") + ) + ) + + val result = Migration.getMaxSchemaKeyPerTableName(shreddedTypes) + + result must beEqualTo(Map(("com_acme_some_context_2", SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 1, 0))))) + } + + "detect max schema key for all table names" in { + val shreddedTypes = List( + ShreddedType.Tabular( + ShreddedType.Info( + BlobStorage.Folder.coerce("s3://shredded/archive"), + "com.acme", + "some_context", + SchemaVer.Full(2, 0, 0), + SnowplowEntity.Context + ) + ), + ShreddedType.Tabular( + ShreddedType.Info( + BlobStorage.Folder.coerce("s3://shredded/archive"), + "com.acme", + "some_context", + SchemaVer.Full(2, 0, 1), + SnowplowEntity.Context + ) + ), + ShreddedType.Tabular( + ShreddedType.Info( + BlobStorage.Folder.coerce("s3://shredded/archive"), + "com.acme", + "example", + SchemaVer.Full(1, 0, 0), + SnowplowEntity.Context + ) + ), + ShreddedType.Tabular( + ShreddedType.Info( + BlobStorage.Folder.coerce("s3://shredded/archive"), + "com.acme", + "example", + SchemaVer.Full(1, 1, 0), + SnowplowEntity.Context + ) + ) + ) + + val result = Migration.getMaxSchemaKeyPerTableName(shreddedTypes) + + result must beEqualTo( + Map( + ("com_acme_some_context_2", SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 1))), + ("com_acme_example_1", SchemaKey("com.acme", "example", "jsonschema", SchemaVer.Full(1, 1, 0))) + ) + ) + } + } +} diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala index d4f9e2292..43c682d13 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala @@ -113,7 +113,11 @@ object PureDAO { override def getEventTable: Statement = Statement.CreateTable(Fragment.const0("CREATE events")) - def updateTable(shredModel: ShredModel.GoodModel, currentSchemaKey: SchemaKey): Migration.Block = + def updateTable( + shredModel: ShredModel.GoodModel, + currentSchemaKey: SchemaKey, + highestSchemaKey: SchemaKey + ): Migration.Block = throw new Throwable("Not implemented in test suite") def extendTable(info: ShreddedType.Info): List[Block] =