Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade schema-ddl to 0.19.4 #1230

Merged
merged 2 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/loader/aws/redshift.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,14 @@
# Version of the terraform module that deployed the app
"moduleVersion": "1.0.0"
}

# Optional. Enable features that are still in beta, or which are here to enable smoother upgrades
"featureFlags": {
# List of SchemaKey with partial SchemaVer to disable migration for, redshift only
# Redshift Loader will disable all migration and recovery table creation for the tables
# which belongs to provided schema keys
# e.g. [ "iglu:com.example/myschema1/jsonschema/1-*-*", "iglu:com.example/myschema2/jsonschema/1-*-*"]
# Optional, empty list by default
"disableMigration": []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object Processing {
implicit val lookup: RegistryLookup[F] = resources.registryLookup
val transformer: Transformer[F] = config.formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(resources.igluResolver, resources.propertiesCache, f, processor)
Transformer.ShredTransformer(resources.igluResolver, resources.shredModelCache, f, processor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(resources.igluResolver, f, processor)
}
Expand Down Expand Up @@ -257,7 +257,9 @@ object Processing {
implicit class TransformedOps(t: Transformed) {
def getPath: SinkPath = t match {
case p: Transformed.Shredded =>
val suffix = Some(s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/")
val suffix = Some(
s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}"
)
val pathType = if (p.isGood) SinkPath.PathType.Good else SinkPath.PathType.Bad
SinkPath(suffix, pathType)
case p: Transformed.WideRow =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,37 @@
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common

import java.util.UUID

import scala.concurrent.ExecutionContext

import org.typelevel.log4cats.slf4j.Slf4jLogger
import org.typelevel.log4cats.Logger

import io.circe.Json

import cats.{Applicative, Monad, MonadThrow}
import cats.implicits._
import cats.effect._
import cats.effect.std.Random

import io.sentry.SentryClient
import com.snowplowanalytics.iglu.client.resolver.{CreateResolverCache, Resolver}
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup}
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.rdbloader.common.Sentry
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils.EventParser
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, ShredModelCache, ShredModelCacheKey}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.parquet.ParquetOps
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Config.Output.Bad
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.metrics.Metrics
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.BadSink
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.Checkpointer

import com.snowplowanalytics.snowplow.scalatracker.Tracking

import org.http4s.blaze.client.BlazeClientBuilder

case class Resources[F[_], C](
igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
shredModelCache: ShredModelCache[F],
eventParser: EventParser,
producer: Queue.Producer[F],
instanceId: String,
Expand Down Expand Up @@ -81,7 +74,7 @@ object Resources {
producer <- mkQueue(config.queue)
resolverConfig <- mkResolverConfig(igluConfig)
resolver <- mkResolver(resolverConfig)
propertiesCache <- Resource.eval(CreateLruMap[F, PropertiesKey, Properties].create(resolverConfig.cacheSize))
shredModelCache <- Resource.eval(CreateLruMap[F, ShredModelCacheKey, ShredModel].create(resolverConfig.cacheSize))
httpClient <- BlazeClientBuilder[F].withExecutionContext(executionContext).resource
implicit0(registryLookup: RegistryLookup[F]) <- Resource.pure(Http4sRegistryLookup[F](httpClient))
eventParser <- mkEventParser(resolver, config)
Expand All @@ -103,7 +96,7 @@ object Resources {
badSink <- mkBadSink(config, mkBadQueue)
} yield Resources(
resolver,
propertiesCache,
shredModelCache,
eventParser,
producer,
instanceId.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{SnowplowEntity, TypesInfo}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats.WideRow
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{
AtomicFieldsProvider,
Expand All @@ -40,7 +40,7 @@ sealed trait Transformer[F[_]] extends Product with Serializable {
object Transformer {
case class ShredTransformer[F[_]: Monad: RegistryLookup: Clock](
igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
shredModelCache: ShredModelCache[F],
formats: Formats.Shred,
processor: Processor
) extends Transformer[F] {
Expand All @@ -54,12 +54,12 @@ object Transformer {
else TypesInfo.Shredded.ShreddedFormat.JSON

def goodTransform(event: Event): EitherT[F, BadRow, List[Transformed]] =
Transformed.shredEvent[F](igluResolver, propertiesCache, isTabular, processor)(event)
Transformed.shredEvent[F](igluResolver, shredModelCache, isTabular, processor)(event)

def badTransform(badRow: BadRow): Transformed = {
val SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)) = badRow.schemaKey
val SchemaKey(vendor, name, _, SchemaVer.Full(model, revision, addition)) = badRow.schemaKey
val data = Transformed.Data.DString(badRow.compact)
Transformed.Shredded.Json(false, vendor, name, model, data)
Transformed.Shredded.Json(false, vendor, name, model, revision, addition, data)
}

def typesInfo(types: Set[Data.ShreddedType]): TypesInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,28 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0"
)
)
actualOptimizelyRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1/revision=0/addition=0"
)
)
actualConsentRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1/revision=0/addition=0"
)
)
actualBadRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/revision=0/addition=0"
)
)

Expand Down Expand Up @@ -92,14 +92,14 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0"
)
)
actualBadRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/revision=0/addition=0"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import io.circe.parser.{parse => parseCirce}
import org.http4s.client.{Client => Http4sClient}
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, Registry, RegistryLookup}
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, PropertiesKey, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, ShredModelCacheKey, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{Processing, Transformer}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.{Checkpointer, ParsedC}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.TestApplication._
Expand All @@ -41,12 +41,12 @@ class TransformingSpec extends Specification {

val testFileNameMap = List(
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "atomic", 1, dummyTransformedData)
.Tabular("com.snowplowanalytics.snowplow", "atomic", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-atomic",
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, dummyTransformedData)
.Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-consent_document",
Transformed.Shredded.Tabular("com.optimizely", "state", 1, dummyTransformedData).getPath -> "com.optimizely-state"
Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath -> "com.optimizely-state"
).toMap

val expectedTransformedMap =
Expand Down Expand Up @@ -128,12 +128,12 @@ object TransformingSpec {
val defaultWindow = Window(1, 1, 1, 1, 1)
val dummyTransformedData = Transformed.Data.DString("")

def propertiesCache: PropertiesCache[IO] = CreateLruMap[IO, PropertiesKey, Properties].create(100).unsafeRunSync()
def shredModelCache: ShredModelCache[IO] = CreateLruMap[IO, ShredModelCacheKey, ShredModel].create(100).unsafeRunSync()

def createTransformer(formats: TransformerConfig.Formats): Transformer[IO] =
formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(defaultIgluResolver, propertiesCache, f, TestProcessor)
Transformer.ShredTransformer(defaultIgluResolver, shredModelCache, f, TestProcessor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(defaultIgluResolver, f, TestProcessor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object Common {
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss").withZone(ZoneId.from(ZoneOffset.UTC))

def entityPath(entity: TypesInfo.Shredded.Type) =
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}"
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}/revision=${entity.schemaKey.version.revision}/addition=${entity.schemaKey.version.addition}"

def entityPathFull(base: BlobStorage.Folder, entity: TypesInfo.Shredded.Type): BlobStorage.Folder =
BlobStorage.Folder.append(base, entityPath(entity))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits.toSchema
import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError

object SchemaProvider {

Expand All @@ -31,7 +31,7 @@ object SchemaProvider {
def getSchema[F[_]: Clock: Monad: RegistryLookup](
resolver: Resolver[F],
schemaKey: SchemaKey
): EitherT[F, FailureDetails.LoaderIgluError, Schema] =
): EitherT[F, LoaderIgluError, Schema] =
for {
json <- EitherT(resolver.lookupSchema(schemaKey)).leftMap(resolverBadRow(schemaKey))
schema <- EitherT.fromOption[F](Schema.parse(json), parseSchemaBadRow(schemaKey))
Expand All @@ -40,7 +40,7 @@ object SchemaProvider {
def fetchSchemasWithSameModel[F[_]: Clock: Monad: RegistryLookup](
resolver: Resolver[F],
schemaKey: SchemaKey
): EitherT[F, FailureDetails.LoaderIgluError, List[SchemaWithKey]] =
): EitherT[F, LoaderIgluError, List[SchemaWithKey]] =
EitherT(resolver.listSchemasLike(schemaKey))
.leftMap(resolverFetchBadRow(schemaKey.vendor, schemaKey.name, schemaKey.format, schemaKey.version.model))
.map(_.schemas)
Expand All @@ -59,13 +59,13 @@ object SchemaProvider {
model: Int
)(
e: ClientError.ResolutionError
): FailureDetails.LoaderIgluError =
FailureDetails.LoaderIgluError.SchemaListNotFound(SchemaCriterion(vendor = vendor, name = name, format = format, model = model), e)
): LoaderIgluError =
LoaderIgluError.SchemaListNotFound(SchemaCriterion(vendor = vendor, name = name, format = format, model = model), e)

private def resolverBadRow(schemaKey: SchemaKey)(e: ClientError.ResolutionError): FailureDetails.LoaderIgluError =
FailureDetails.LoaderIgluError.IgluError(schemaKey, e)
private def resolverBadRow(schemaKey: SchemaKey)(e: ClientError.ResolutionError): LoaderIgluError =
LoaderIgluError.IgluError(schemaKey, e)

private def parseSchemaBadRow(schemaKey: SchemaKey): FailureDetails.LoaderIgluError =
FailureDetails.LoaderIgluError.InvalidSchema(schemaKey, "Cannot be parsed as JSON Schema AST")
private def parseSchemaBadRow(schemaKey: SchemaKey): LoaderIgluError =
LoaderIgluError.InvalidSchema(schemaKey, "Cannot be parsed as JSON Schema AST")

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import java.time.format.DateTimeParseException

import io.circe.Json
import cats.Monad
import cats.data.{EitherT, NonEmptyList}
import cats.data.NonEmptyList
import cats.implicits._

import cats.effect.Clock
Expand All @@ -23,16 +23,13 @@ import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.iglu.schemaddl.migrations.FlatData
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, ParsingError}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload, Processor}
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import Flattening.NullCharacter
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.snowplow.analytics.scalasdk.decode.TSVParser
import com.snowplowanalytics.snowplow.rdbloader.common.Common

object EventUtils {

Expand Down Expand Up @@ -131,34 +128,6 @@ object EventUtils {
"events"
)

/**
* Transform a self-desribing entity into tabular format, using its known schemas to get a correct
* order of columns
* @param resolver
* Iglu resolver to get list of known schemas
* @param instance
* self-describing JSON that needs to be transformed
* @return
* list of columns or flattening error
*/
def flatten[F[_]: Monad: Clock: RegistryLookup](
resolver: Resolver[F],
propertiesCache: PropertiesCache[F],
instance: SelfDescribingData[Json]
): EitherT[F, FailureDetails.LoaderIgluError, List[String]] =
Flattening
.getDdlProperties(resolver, propertiesCache, instance.schema)
.map(props => mapProperties(props, instance))

private def mapProperties(props: Properties, instance: SelfDescribingData[Json]) =
props
.map { case (pointer, _) =>
FlatData.getPath(pointer.forData, instance.data, getString, NullCharacter)
}

def getString(json: Json): String =
json.fold(NullCharacter, transformBool, _ => json.show, escape, _ => escape(json.noSpaces), _ => escape(json.noSpaces))

def getEntities(event: Event): List[SelfDescribingData[Json]] =
event.unstruct_event.data.toList ++
event.derived_contexts.data ++
Expand All @@ -171,11 +140,6 @@ object EventUtils {
case _ => 0
}

/** Prevents data with newlines and tabs from breaking the loading process */
private def escape(s: String): String =
if (s == NullCharacter) "\\\\N"
else s.replace('\t', ' ').replace('\n', ' ')

private def getLength(schema: Schema): Option[Int] =
schema.maxLength.map(_.value.toInt)
}
Loading
Loading