Skip to content

Commit

Permalink
Upgrade schema-ddl to 0.20.0 (close #1265)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Sep 5, 2023
1 parent af0997d commit 51c15e9
Show file tree
Hide file tree
Showing 52 changed files with 991 additions and 943 deletions.
10 changes: 10 additions & 0 deletions config/loader/aws/redshift.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,14 @@
# Version of the terraform module that deployed the app
"moduleVersion": "1.0.0"
}

# Optional. Enable features that are still in beta, or which are here to enable smoother upgrades
"featureFlags": {
# List of SchemaKey with partial SchemaVer to disable migration for, redshift only
# Redshift Loader will disable all migration and recovery table creation for the tables
# which belongs to provided schema keys
# e.g. [ "iglu:com.example/myschema1/jsonschema/1-*-*", "iglu:com.example/myschema2/jsonschema/1-*-*"]
# Optional, empty list by default
"disableMigration": []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object Processing {
implicit val lookup: RegistryLookup[F] = resources.registryLookup
val transformer: Transformer[F] = config.formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(resources.igluResolver, resources.propertiesCache, f, processor)
Transformer.ShredTransformer(resources.igluResolver, resources.shredModelCache, f, processor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(resources.igluResolver, f, processor)
}
Expand Down Expand Up @@ -257,7 +257,9 @@ object Processing {
implicit class TransformedOps(t: Transformed) {
def getPath: SinkPath = t match {
case p: Transformed.Shredded =>
val suffix = Some(s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/")
val suffix = Some(
s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}"
)
val pathType = if (p.isGood) SinkPath.PathType.Good else SinkPath.PathType.Bad
SinkPath(suffix, pathType)
case p: Transformed.WideRow =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,37 @@
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common

import java.util.UUID

import scala.concurrent.ExecutionContext

import org.typelevel.log4cats.slf4j.Slf4jLogger
import org.typelevel.log4cats.Logger

import io.circe.Json

import cats.{Applicative, Monad, MonadThrow}
import cats.implicits._
import cats.effect._
import cats.effect.std.Random

import io.sentry.SentryClient
import com.snowplowanalytics.iglu.client.resolver.{CreateResolverCache, Resolver}
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup}
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.rdbloader.common.Sentry
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils.EventParser
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, ShredModelCache, ShredModelCacheKey}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.parquet.ParquetOps
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Config.Output.Bad
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.metrics.Metrics
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.BadSink
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.Checkpointer

import com.snowplowanalytics.snowplow.scalatracker.Tracking

import org.http4s.blaze.client.BlazeClientBuilder

case class Resources[F[_], C](
igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
shredModelCache: ShredModelCache[F],
eventParser: EventParser,
producer: Queue.Producer[F],
instanceId: String,
Expand Down Expand Up @@ -81,7 +74,7 @@ object Resources {
producer <- mkQueue(config.queue)
resolverConfig <- mkResolverConfig(igluConfig)
resolver <- mkResolver(resolverConfig)
propertiesCache <- Resource.eval(CreateLruMap[F, PropertiesKey, Properties].create(resolverConfig.cacheSize))
shredModelCache <- Resource.eval(CreateLruMap[F, ShredModelCacheKey, ShredModel].create(resolverConfig.cacheSize))
httpClient <- BlazeClientBuilder[F].withExecutionContext(executionContext).resource
implicit0(registryLookup: RegistryLookup[F]) <- Resource.pure(Http4sRegistryLookup[F](httpClient))
eventParser <- mkEventParser(resolver, config)
Expand All @@ -103,7 +96,7 @@ object Resources {
badSink <- mkBadSink(config, mkBadQueue)
} yield Resources(
resolver,
propertiesCache,
shredModelCache,
eventParser,
producer,
instanceId.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{SnowplowEntity, TypesInfo}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats.WideRow
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{
AtomicFieldsProvider,
Expand All @@ -40,7 +40,7 @@ sealed trait Transformer[F[_]] extends Product with Serializable {
object Transformer {
case class ShredTransformer[F[_]: Monad: RegistryLookup: Clock](
igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
shredModelCache: ShredModelCache[F],
formats: Formats.Shred,
processor: Processor
) extends Transformer[F] {
Expand All @@ -54,12 +54,12 @@ object Transformer {
else TypesInfo.Shredded.ShreddedFormat.JSON

def goodTransform(event: Event): EitherT[F, BadRow, List[Transformed]] =
Transformed.shredEvent[F](igluResolver, propertiesCache, isTabular, processor)(event)
Transformed.shredEvent[F](igluResolver, shredModelCache, isTabular, processor)(event)

def badTransform(badRow: BadRow): Transformed = {
val SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)) = badRow.schemaKey
val SchemaKey(vendor, name, _, SchemaVer.Full(model, revision, addition)) = badRow.schemaKey
val data = Transformed.Data.DString(badRow.compact)
Transformed.Shredded.Json(false, vendor, name, model, data)
Transformed.Shredded.Json(false, vendor, name, model, revision, addition, data)
}

def typesInfo(types: Set[Data.ShreddedType]): TypesInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,28 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0"
)
)
actualOptimizelyRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1/revision=0/addition=0"
)
)
actualConsentRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1/revision=0/addition=0"
)
)
actualBadRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/revision=0/addition=0"
)
)

Expand Down Expand Up @@ -92,14 +92,14 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0"
)
)
actualBadRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/revision=0/addition=0"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import io.circe.parser.{parse => parseCirce}
import org.http4s.client.{Client => Http4sClient}
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, Registry, RegistryLookup}
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, PropertiesKey, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, ShredModelCacheKey, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{Processing, Transformer}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.{Checkpointer, ParsedC}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.TestApplication._
Expand All @@ -41,12 +41,12 @@ class TransformingSpec extends Specification {

val testFileNameMap = List(
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "atomic", 1, dummyTransformedData)
.Tabular("com.snowplowanalytics.snowplow", "atomic", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-atomic",
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, dummyTransformedData)
.Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-consent_document",
Transformed.Shredded.Tabular("com.optimizely", "state", 1, dummyTransformedData).getPath -> "com.optimizely-state"
Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath -> "com.optimizely-state"
).toMap

val expectedTransformedMap =
Expand Down Expand Up @@ -128,12 +128,12 @@ object TransformingSpec {
val defaultWindow = Window(1, 1, 1, 1, 1)
val dummyTransformedData = Transformed.Data.DString("")

def propertiesCache: PropertiesCache[IO] = CreateLruMap[IO, PropertiesKey, Properties].create(100).unsafeRunSync()
def shredModelCache: ShredModelCache[IO] = CreateLruMap[IO, ShredModelCacheKey, ShredModel].create(100).unsafeRunSync()

def createTransformer(formats: TransformerConfig.Formats): Transformer[IO] =
formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(defaultIgluResolver, propertiesCache, f, TestProcessor)
Transformer.ShredTransformer(defaultIgluResolver, shredModelCache, f, TestProcessor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(defaultIgluResolver, f, TestProcessor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object Common {
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss").withZone(ZoneId.from(ZoneOffset.UTC))

def entityPath(entity: TypesInfo.Shredded.Type) =
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}"
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}/revision=${entity.schemaKey.version.revision}/addition=${entity.schemaKey.version.addition}"

def entityPathFull(base: BlobStorage.Folder, entity: TypesInfo.Shredded.Type): BlobStorage.Folder =
BlobStorage.Folder.append(base, entityPath(entity))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits.toSchema
import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError

object SchemaProvider {

Expand All @@ -31,7 +31,7 @@ object SchemaProvider {
def getSchema[F[_]: Clock: Monad: RegistryLookup](
resolver: Resolver[F],
schemaKey: SchemaKey
): EitherT[F, FailureDetails.LoaderIgluError, Schema] =
): EitherT[F, LoaderIgluError, Schema] =
for {
json <- EitherT(resolver.lookupSchema(schemaKey)).leftMap(resolverBadRow(schemaKey))
schema <- EitherT.fromOption[F](Schema.parse(json), parseSchemaBadRow(schemaKey))
Expand All @@ -40,7 +40,7 @@ object SchemaProvider {
def fetchSchemasWithSameModel[F[_]: Clock: Monad: RegistryLookup](
resolver: Resolver[F],
schemaKey: SchemaKey
): EitherT[F, FailureDetails.LoaderIgluError, List[SchemaWithKey]] =
): EitherT[F, LoaderIgluError, List[SchemaWithKey]] =
EitherT(resolver.listSchemasLike(schemaKey))
.leftMap(resolverFetchBadRow(schemaKey.vendor, schemaKey.name, schemaKey.format, schemaKey.version.model))
.map(_.schemas)
Expand All @@ -59,13 +59,13 @@ object SchemaProvider {
model: Int
)(
e: ClientError.ResolutionError
): FailureDetails.LoaderIgluError =
FailureDetails.LoaderIgluError.SchemaListNotFound(SchemaCriterion(vendor = vendor, name = name, format = format, model = model), e)
): LoaderIgluError =
LoaderIgluError.SchemaListNotFound(SchemaCriterion(vendor = vendor, name = name, format = format, model = model), e)

private def resolverBadRow(schemaKey: SchemaKey)(e: ClientError.ResolutionError): FailureDetails.LoaderIgluError =
FailureDetails.LoaderIgluError.IgluError(schemaKey, e)
private def resolverBadRow(schemaKey: SchemaKey)(e: ClientError.ResolutionError): LoaderIgluError =
LoaderIgluError.IgluError(schemaKey, e)

private def parseSchemaBadRow(schemaKey: SchemaKey): FailureDetails.LoaderIgluError =
FailureDetails.LoaderIgluError.InvalidSchema(schemaKey, "Cannot be parsed as JSON Schema AST")
private def parseSchemaBadRow(schemaKey: SchemaKey): LoaderIgluError =
LoaderIgluError.InvalidSchema(schemaKey, "Cannot be parsed as JSON Schema AST")

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import java.time.format.DateTimeParseException

import io.circe.Json
import cats.Monad
import cats.data.{EitherT, NonEmptyList}
import cats.data.NonEmptyList
import cats.implicits._

import cats.effect.Clock
Expand All @@ -23,16 +23,13 @@ import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.iglu.schemaddl.migrations.FlatData
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, ParsingError}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload, Processor}
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import Flattening.NullCharacter
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.snowplow.analytics.scalasdk.decode.TSVParser
import com.snowplowanalytics.snowplow.rdbloader.common.Common

object EventUtils {

Expand Down Expand Up @@ -131,34 +128,6 @@ object EventUtils {
"events"
)

/**
* Transform a self-desribing entity into tabular format, using its known schemas to get a correct
* order of columns
* @param resolver
* Iglu resolver to get list of known schemas
* @param instance
* self-describing JSON that needs to be transformed
* @return
* list of columns or flattening error
*/
def flatten[F[_]: Monad: Clock: RegistryLookup](
resolver: Resolver[F],
propertiesCache: PropertiesCache[F],
instance: SelfDescribingData[Json]
): EitherT[F, FailureDetails.LoaderIgluError, List[String]] =
Flattening
.getDdlProperties(resolver, propertiesCache, instance.schema)
.map(props => mapProperties(props, instance))

private def mapProperties(props: Properties, instance: SelfDescribingData[Json]) =
props
.map { case (pointer, _) =>
FlatData.getPath(pointer.forData, instance.data, getString, NullCharacter)
}

def getString(json: Json): String =
json.fold(NullCharacter, transformBool, _ => json.show, escape, _ => escape(json.noSpaces), _ => escape(json.noSpaces))

def getEntities(event: Event): List[SelfDescribingData[Json]] =
event.unstruct_event.data.toList ++
event.derived_contexts.data ++
Expand All @@ -171,11 +140,6 @@ object EventUtils {
case _ => 0
}

/** Prevents data with newlines and tabs from breaking the loading process */
private def escape(s: String): String =
if (s == NullCharacter) "\\\\N"
else s.replace('\t', ' ').replace('\n', ' ')

private def getLength(schema: Schema): Option[Int] =
schema.maxLength.map(_.value.toInt)
}
Loading

0 comments on commit 51c15e9

Please sign in to comment.