Skip to content

Commit

Permalink
use highest schema key in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Jul 20, 2023
1 parent c8ac626 commit 29bf442
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object Migration {
* Works with separate tables (both create and update) and does support migration (hence all
* schema info)
*/
final case class Table(schemaList: NonEmptyList[IgluSchema]) extends Description
final case class Table(schemaList: NonEmptyList[IgluSchema], highestSchemaKey: SchemaKey) extends Description

/** Works with only `events` table, creating a column for every new schema */
final case class WideRow(shreddedType: ShreddedType.Info) extends Description
Expand Down Expand Up @@ -146,15 +146,30 @@ object Migration {
}
}

/** Find the maximum SchemaKey for all table names in a given set of shredded types */
def getMaxSchemaKeyPerTableName(shreddedTypes: List[ShreddedType]): Map[String, SchemaKey] =
shreddedTypes
.filterNot(_.isAtomic)
.filter {
case _: ShreddedType.Tabular => true
case _ => false
}
.groupBy(_.info.getName)
.mapValues(_.maxBy(_.info.version).info.getSchemaKey)

/** Inspect DB state and create a [[Migration]] object that contains all necessary actions */
def build[F[_]: Transaction[*[_], C]: MonadThrow: Iglu, C[_]: MonadThrow: Logging: DAO, I](
discovery: DataDiscovery,
target: Target[I]
): F[Migration[C]] = {
val maxSchemaKeyPerName = getMaxSchemaKeyPerTableName(discovery.shreddedTypes)

val descriptions: LoaderAction[F, List[Description]] =
discovery.shreddedTypes.filterNot(_.isAtomic).traverse {
case s: ShreddedType.Tabular =>
EitherT(Iglu[F].getSchemasWithSameModel(s.info.getSchemaKey)).map(Description.Table)
EitherT(Iglu[F].getSchemasWithSameModel(s.info.getSchemaKey)).map { schemas =>
Description.Table(schemas, maxSchemaKeyPerName.getOrElse(s.info.getName, s.info.getSchemaKey))
}
case ShreddedType.Widerow(info) =>
EitherT.rightT[F, LoaderError](Description.WideRow(info))
case ShreddedType.Json(_, _) =>
Expand Down Expand Up @@ -182,11 +197,10 @@ object Migration {

def buildBlock[F[_]: MonadThrow: DAO, I](description: Description, target: Target[I]): F[List[Block]] =
description match {
case Description.Table(schemas) =>
case Description.Table(schemas, highestSchemaKey) =>
val schemaKeysWithModels = foldMapMergeRedshiftSchemas(schemas)
val goodModel = schemaKeysWithModels.goodModel
val goodTableName = goodModel.tableName
val highestSchemaKey = schemas.last.self.schemaKey

val optUpdateGoodTable: F[Option[Block]] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ trait Target[I] {
def getEventTable: Statement

/** Generate a DB-specification migration Block for updating a *separate* table */
def updateTable(shredModel: ShredModel.GoodModel, currentSchemaKey: SchemaKey, highestSchemaKey: SchemaKey): Block
def updateTable(
shredModel: ShredModel.GoodModel,
currentSchemaKey: SchemaKey,
highestSchemaKey: SchemaKey
): Block

/** Create a table with columns dervived from list of Iglu schemas */
def createTable(shredModel: ShredModel): Block
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.snowplowanalytics.snowplow.rdbloader.db

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.SnowplowEntity
import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType
import org.specs2.mutable.Specification

class MigrationSpec extends Specification {

"getMaxSchemaKeyPerTableName" should {
"ignore non-tsv & detect max schema key per table name" in {
val shreddedTypes = List(
ShreddedType.Tabular(
ShreddedType.Info(
BlobStorage.Folder.coerce("s3://shredded/archive"),
"com.acme",
"some_context",
SchemaVer.Full(2, 0, 0),
SnowplowEntity.Context
)
),
ShreddedType.Tabular(
ShreddedType.Info(
BlobStorage.Folder.coerce("s3://shredded/archive"),
"com.acme",
"some_context",
SchemaVer.Full(2, 1, 0),
SnowplowEntity.Context
)
),
ShreddedType.Json(
ShreddedType.Info(
BlobStorage.Folder.coerce("s3://shredded/archive"),
"com.acme",
"some_event",
SchemaVer.Full(1, 0, 0),
SnowplowEntity.Context
),
BlobStorage.Key.coerce("s3://shredded/jsonpaths")
)
)

val result = Migration.getMaxSchemaKeyPerTableName(shreddedTypes)

result must beEqualTo(Map(("com_acme_some_context_2", SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 1, 0)))))
}

"detect max schema key for all table names" in {
val shreddedTypes = List(
ShreddedType.Tabular(
ShreddedType.Info(
BlobStorage.Folder.coerce("s3://shredded/archive"),
"com.acme",
"some_context",
SchemaVer.Full(2, 0, 0),
SnowplowEntity.Context
)
),
ShreddedType.Tabular(
ShreddedType.Info(
BlobStorage.Folder.coerce("s3://shredded/archive"),
"com.acme",
"some_context",
SchemaVer.Full(2, 0, 1),
SnowplowEntity.Context
)
),
ShreddedType.Tabular(
ShreddedType.Info(
BlobStorage.Folder.coerce("s3://shredded/archive"),
"com.acme",
"example",
SchemaVer.Full(1, 0, 0),
SnowplowEntity.Context
)
),
ShreddedType.Tabular(
ShreddedType.Info(
BlobStorage.Folder.coerce("s3://shredded/archive"),
"com.acme",
"example",
SchemaVer.Full(1, 1, 0),
SnowplowEntity.Context
)
)
)

val result = Migration.getMaxSchemaKeyPerTableName(shreddedTypes)

result must beEqualTo(
Map(
("com_acme_some_context_2", SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 1))),
("com_acme_example_1", SchemaKey("com.acme", "example", "jsonschema", SchemaVer.Full(1, 1, 0)))
)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ object PureDAO {
override def getEventTable: Statement =
Statement.CreateTable(Fragment.const0("CREATE events"))

def updateTable(shredModel: ShredModel.GoodModel, currentSchemaKey: SchemaKey): Migration.Block =
def updateTable(
shredModel: ShredModel.GoodModel,
currentSchemaKey: SchemaKey,
highestSchemaKey: SchemaKey
): Migration.Block =
throw new Throwable("Not implemented in test suite")

def extendTable(info: ShreddedType.Info): List[Block] =
Expand Down

0 comments on commit 29bf442

Please sign in to comment.