diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala index a00b3c137..46a419aae 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala +++ b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala @@ -71,7 +71,7 @@ object Processing { ): Stream[F, Unit] = { val transformer: Transformer[F] = config.formats match { case f: TransformerConfig.Formats.Shred => - Transformer.ShredTransformer(resources.igluResolver, resources.propertiesCache, f, processor) + Transformer.ShredTransformer(resources.igluResolver, resources.shredModelCache, f, processor) case f: TransformerConfig.Formats.WideRow => Transformer.WideRowTransformer(resources.igluResolver, f, processor) } @@ -260,7 +260,9 @@ object Processing { implicit class TransformedOps(t: Transformed) { def getPath: SinkPath = t match { case p: Transformed.Shredded => - val suffix = Some(s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/") + val suffix = Some( + s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}" + ) val pathType = if (p.isGood) SinkPath.PathType.Good else SinkPath.PathType.Bad SinkPath(suffix, pathType) case p: Transformed.WideRow => diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Resources.scala b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Resources.scala index 307a5ad80..98ab2c887 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Resources.scala +++ b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Resources.scala @@ -30,14 +30,15 @@ import io.sentry.SentryClient import com.snowplowanalytics.iglu.client.resolver.{CreateResolverCache, Resolver} import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup} -import com.snowplowanalytics.iglu.schemaddl.Properties +import com.snowplowanalytics.iglu.core.SchemaKey +import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel import com.snowplowanalytics.lrumap.CreateLruMap import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.rdbloader.common.Sentry import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue} import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils.EventParser -import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey} +import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, ShredModelCache} import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Config.Output.Bad import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.metrics.Metrics import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.BadSink @@ -49,7 +50,7 @@ import org.http4s.blaze.client.BlazeClientBuilder case class Resources[F[_], C]( igluResolver: Resolver[F], - propertiesCache: PropertiesCache[F], + shredModelCache: ShredModelCache[F], eventParser: EventParser, producer: Queue.Producer[F], instanceId: String, @@ -82,7 +83,7 @@ object Resources { producer <- mkQueue(config.queue) resolverConfig <- mkResolverConfig(igluConfig) resolver <- mkResolver(resolverConfig) - propertiesCache <- Resource.eval(CreateLruMap[F, PropertiesKey, Properties].create(resolverConfig.cacheSize)) + shredModelCache <- Resource.eval(CreateLruMap[F, SchemaKey, ShredModel].create(resolverConfig.cacheSize)) httpClient <- BlazeClientBuilder[F].withExecutionContext(executionContext).resource implicit0(registryLookup: RegistryLookup[F]) <- Resource.pure(Http4sRegistryLookup[F](httpClient)) eventParser <- mkEventParser(resolver, config) @@ -104,7 +105,7 @@ object Resources { badSink <- mkBadSink(config, mkBadQueue) } yield Resources( resolver, - propertiesCache, + shredModelCache, eventParser, producer, instanceId.toString, @@ -145,13 +146,17 @@ object Resources { case Left(error) => Sync[F].raiseError[Resolver[F]](error) } } - private def mkEventParser[F[_]: Sync: Clock](igluResolver: Resolver[F], config: Config): Resource[F, EventParser] = Resource.eval { - mkAtomicLengths(igluResolver, config).flatMap { - case Right(atomicLengths) => Sync[F].pure(Event.parser(atomicLengths)) - case Left(error) => Sync[F].raiseError[EventParser](error) + private def mkEventParser[F[_]: Sync: RegistryLookup: Clock](igluResolver: Resolver[F], config: Config): Resource[F, EventParser] = + Resource.eval { + mkAtomicLengths(igluResolver, config).flatMap { + case Right(atomicLengths) => Sync[F].pure(Event.parser(atomicLengths)) + case Left(error) => Sync[F].raiseError[EventParser](error) + } } - } - private def mkAtomicLengths[F[_]: Sync: Clock](igluResolver: Resolver[F], config: Config): F[Either[RuntimeException, Map[String, Int]]] = + private def mkAtomicLengths[F[_]: Sync: RegistryLookup: Clock]( + igluResolver: Resolver[F], + config: Config + ): F[Either[RuntimeException, Map[String, Int]]] = if (config.featureFlags.truncateAtomicFields) { EventUtils.getAtomicLengths(igluResolver) } else { diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Transformer.scala b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Transformer.scala index 00fb62a99..f75ec54d1 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Transformer.scala +++ b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Transformer.scala @@ -25,7 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.Common import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{SnowplowEntity, TypesInfo} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats.WideRow -import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, Transformed} +import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, Transformed} import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{ AtomicFieldsProvider, @@ -45,7 +45,7 @@ sealed trait Transformer[F[_]] extends Product with Serializable { object Transformer { case class ShredTransformer[F[_]: Monad: RegistryLookup: Clock]( igluResolver: Resolver[F], - propertiesCache: PropertiesCache[F], + shredModelCache: ShredModelCache[F], formats: Formats.Shred, processor: Processor ) extends Transformer[F] { @@ -59,12 +59,12 @@ object Transformer { else TypesInfo.Shredded.ShreddedFormat.JSON def goodTransform(event: Event): EitherT[F, BadRow, List[Transformed]] = - Transformed.shredEvent[F](igluResolver, propertiesCache, isTabular, processor)(event) + Transformed.shredEvent[F](igluResolver, shredModelCache, isTabular, processor)(event) def badTransform(badRow: BadRow): Transformed = { - val SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)) = badRow.schemaKey + val SchemaKey(vendor, name, _, SchemaVer.Full(model, revision, addition)) = badRow.schemaKey val data = Transformed.Data.DString(badRow.compact) - Transformed.Shredded.Json(false, vendor, name, model, data) + Transformed.Shredded.Json(false, vendor, name, model, revision, addition, data) } def typesInfo(types: Set[Data.ShreddedType]): TypesInfo = { diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala index 57c3eaa31..3d3f897bc 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/ShredTsvProcessingSpec.scala @@ -39,28 +39,28 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { readStringRowsFrom( Path( outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1" + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" ) ) actualOptimizelyRows <- readStringRowsFrom( Path( outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1" + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1/revision=0/addition=0" ) ) actualConsentRows <- readStringRowsFrom( Path( outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1" + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1/revision=0/addition=0" ) ) actualBadRows <- readStringRowsFrom( Path( outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/" + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/revision=0/addition=0" ) ) @@ -99,14 +99,14 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec { readStringRowsFrom( Path( outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1" + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" ) ) actualBadRows <- readStringRowsFrom( Path( outputDirectory.toString + - s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/" + s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/revision=0/addition=0" ) ) diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala index f804e92fb..039ef6e29 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala @@ -26,13 +26,14 @@ import io.circe.optics.JsonPath._ import io.circe.parser.{parse => parseCirce} import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.iglu.client.resolver.registries.Registry -import com.snowplowanalytics.iglu.schemaddl.Properties +import com.snowplowanalytics.iglu.core.SchemaKey +import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel import com.snowplowanalytics.lrumap.CreateLruMap import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig -import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, PropertiesKey, Transformed} +import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, Transformed} import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{Processing, Transformer} import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.{Checkpointer, ParsedC} import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.TestApplication._ @@ -47,12 +48,12 @@ class TransformingSpec extends Specification { val testFileNameMap = List( Transformed.Shredded - .Tabular("com.snowplowanalytics.snowplow", "atomic", 1, dummyTransformedData) + .Tabular("com.snowplowanalytics.snowplow", "atomic", 1, 0, 0, dummyTransformedData) .getPath -> "com.snowplowanalytics.snowplow-atomic", Transformed.Shredded - .Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, dummyTransformedData) + .Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, 0, 0, dummyTransformedData) .getPath -> "com.snowplowanalytics.snowplow-consent_document", - Transformed.Shredded.Tabular("com.optimizely", "state", 1, dummyTransformedData).getPath -> "com.optimizely-state" + Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath -> "com.optimizely-state" ).toMap val expectedTransformedMap = @@ -128,12 +129,12 @@ object TransformingSpec { val defaultWindow = Window(1, 1, 1, 1, 1) val dummyTransformedData = Transformed.Data.DString("") - def propertiesCache: PropertiesCache[IO] = CreateLruMap[IO, PropertiesKey, Properties].create(100).unsafeRunSync() + def shredModelCache: ShredModelCache[IO] = CreateLruMap[IO, SchemaKey, ShredModel].create(100).unsafeRunSync() def createTransformer(formats: TransformerConfig.Formats): Transformer[IO] = formats match { case f: TransformerConfig.Formats.Shred => - Transformer.ShredTransformer(defaultIgluResolver, propertiesCache, f, TestProcessor) + Transformer.ShredTransformer(defaultIgluResolver, shredModelCache, f, TestProcessor) case f: TransformerConfig.Formats.WideRow => Transformer.WideRowTransformer(defaultIgluResolver, f, TestProcessor) } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/EventUtils.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/EventUtils.scala index ec595a8cb..c8282d584 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/EventUtils.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/EventUtils.scala @@ -20,7 +20,7 @@ import java.time.format.DateTimeParseException import io.circe.Json import cats.Monad -import cats.data.{EitherT, NonEmptyList} +import cats.data.NonEmptyList import cats.implicits._ import cats.effect.Clock @@ -30,16 +30,13 @@ import com.snowplowanalytics.iglu.core._ import com.snowplowanalytics.iglu.client.{ClientError, Resolver} import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.schemaddl.migrations.FlatData import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._ import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, ParsingError} import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload, Processor} -import com.snowplowanalytics.snowplow.rdbloader.common.Common -import Flattening.NullCharacter -import com.snowplowanalytics.iglu.schemaddl.Properties import com.snowplowanalytics.snowplow.analytics.scalasdk.decode.TSVParser +import com.snowplowanalytics.snowplow.rdbloader.common.Common object EventUtils { @@ -138,34 +135,6 @@ object EventUtils { "events" ) - /** - * Transform a self-desribing entity into tabular format, using its known schemas to get a correct - * order of columns - * @param resolver - * Iglu resolver to get list of known schemas - * @param instance - * self-describing JSON that needs to be transformed - * @return - * list of columns or flattening error - */ - def flatten[F[_]: Monad: Clock: RegistryLookup]( - resolver: Resolver[F], - propertiesCache: PropertiesCache[F], - instance: SelfDescribingData[Json] - ): EitherT[F, FailureDetails.LoaderIgluError, List[String]] = - Flattening - .getDdlProperties(resolver, propertiesCache, instance.schema) - .map(props => mapProperties(props, instance)) - - private def mapProperties(props: Properties, instance: SelfDescribingData[Json]) = - props - .map { case (pointer, _) => - FlatData.getPath(pointer.forData, instance.data, getString, NullCharacter) - } - - def getString(json: Json): String = - json.fold(NullCharacter, transformBool, _ => json.show, escape, _ => escape(json.noSpaces), _ => escape(json.noSpaces)) - def getEntities(event: Event): List[SelfDescribingData[Json]] = event.unstruct_event.data.toList ++ event.derived_contexts.data ++ @@ -178,11 +147,6 @@ object EventUtils { case _ => 0 } - /** Prevents data with newlines and tabs from breaking the loading process */ - private def escape(s: String): String = - if (s == NullCharacter) "\\\\N" - else s.replace('\t', ' ').replace('\n', ' ') - private def getLength(schema: Schema): Option[Int] = schema.maxLength.map(_.value.toInt) } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Flattening.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Flattening.scala deleted file mode 100644 index e2c4a212d..000000000 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Flattening.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright (c) 2012-2021 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.rdbloader.common.transformation - -import io.circe.Json -import cats.Monad -import cats.data.EitherT -import cats.syntax.either._ -import cats.effect.Clock -import com.snowplowanalytics.iglu.client.resolver.Resolver.{ResolverResult, SchemaListKey} -import com.snowplowanalytics.iglu.core._ -import com.snowplowanalytics.iglu.core.circe.implicits._ -import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.client.{ClientError, Resolver} -import com.snowplowanalytics.iglu.schemaddl.{IgluSchema, Properties} -import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, SchemaList => DdlSchemaList} -import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema -import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._ -import com.snowplowanalytics.snowplow.badrows.FailureDetails - -object Flattening { - - /** Redshift default NULL string */ - val NullCharacter: String = "\\N" - - val MetaSchema = SchemaKey("com.snowplowanalyics.self-desc", "schema", "jsonschema", SchemaVer.Full(1, 0, 0)) - - def getOrdered[F[_]: Monad: RegistryLookup: Clock]( - resolver: Resolver[F], - vendor: String, - name: String, - model: Int - ): EitherT[F, FailureDetails.LoaderIgluError, DdlSchemaList] = { - val criterion = SchemaCriterion(vendor, name, "jsonschema", Some(model), None, None) - val schemaList = resolver.listSchemas(vendor, name, model) - for { - schemaList <- EitherT[F, ClientError.ResolutionError, SchemaList](schemaList).leftMap(error => - FailureDetails.LoaderIgluError.SchemaListNotFound(criterion, error) - ) - ordered <- DdlSchemaList.fromSchemaList(schemaList, fetch(resolver)) - } yield ordered - } - - def getDdlProperties[F[_]: Monad: Clock: RegistryLookup]( - resolver: Resolver[F], - propertiesCache: PropertiesCache[F], - schemaKey: SchemaKey - ): EitherT[F, FailureDetails.LoaderIgluError, Properties] = { - val criterion = SchemaCriterion(schemaKey.vendor, schemaKey.name, "jsonschema", Some(schemaKey.version.model), None, None) - - EitherT(resolver.listSchemasResult(schemaKey.vendor, schemaKey.name, schemaKey.version.model)) - .leftMap(error => FailureDetails.LoaderIgluError.SchemaListNotFound(criterion, error)) - .flatMap { - case cached: ResolverResult.Cached[SchemaListKey, SchemaList] => - lookupInCache(resolver, propertiesCache, cached) - case ResolverResult.NotCached(schemaList) => - evaluateProperties(schemaList, resolver) - } - } - - def fetch[F[_]: Monad: RegistryLookup: Clock]( - resolver: Resolver[F] - )( - key: SchemaKey - ): EitherT[F, FailureDetails.LoaderIgluError, IgluSchema] = - for { - json <- EitherT(resolver.lookupSchema(key)).leftMap(error => FailureDetails.LoaderIgluError.IgluError(key, error)) - schema <- EitherT.fromEither(parseSchema(json)) - } yield schema - - private def lookupInCache[F[_]: Monad: RegistryLookup: Clock]( - resolver: Resolver[F], - propertiesCache: PropertiesCache[F], - resolvedSchemaList: ResolverResult.Cached[SchemaListKey, SchemaList] - ): EitherT[F, FailureDetails.LoaderIgluError, Properties] = { - val propertiesKey = (resolvedSchemaList.key, resolvedSchemaList.timestamp) - - EitherT.liftF(propertiesCache.get(propertiesKey)).flatMap { - case Some(properties) => - EitherT.pure[F, FailureDetails.LoaderIgluError](properties) - case None => - evaluateProperties(resolvedSchemaList.value, resolver) - .semiflatTap(props => propertiesCache.put(propertiesKey, props)) - } - } - - private def evaluateProperties[F[_]: Monad: RegistryLookup: Clock]( - schemaList: SchemaList, - resolver: Resolver[F] - ): EitherT[F, FailureDetails.LoaderIgluError, Properties] = - DdlSchemaList - .fromSchemaList(schemaList, fetch(resolver)) - .map(FlatSchema.extractProperties) - - /** Parse JSON into self-describing schema, or return `FlatteningError` */ - private def parseSchema(json: Json): Either[FailureDetails.LoaderIgluError, IgluSchema] = - for { - selfDescribing <- SelfDescribingSchema.parse(json).leftMap(invalidSchema(json)) - parsed <- Schema.parse(selfDescribing.schema).toRight(invalidSchema(selfDescribing)) - } yield SelfDescribingSchema(selfDescribing.self, parsed) - - private def invalidSchema(json: Json)(code: ParseError): FailureDetails.LoaderIgluError = { - val error = s"Cannot parse ${json.noSpaces} as self-describing schema, ${code.code}" - FailureDetails.LoaderIgluError.InvalidSchema(MetaSchema, error) - } - - private def invalidSchema(schema: SelfDescribingSchema[_]): FailureDetails.LoaderIgluError = { - val error = s"Cannot be parsed as JSON Schema AST" - FailureDetails.LoaderIgluError.InvalidSchema(schema.self.schemaKey, error) - } -} diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala index e12af3b55..6d84b44af 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala @@ -20,12 +20,16 @@ import cats.data.{EitherT, NonEmptyList} import cats.effect.Clock import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.core.SchemaKey +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SelfDescribingSchema} +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, FieldValue} +import com.snowplowanalytics.iglu.schemaddl.redshift._ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.badrows.{BadRow, FailureDetails, Processor} import com.snowplowanalytics.snowplow.rdbloader.common.Common.AtomicSchema import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.Shredded.ShreddedFormat +import com.snowplowanalytics.snowplow.rdbloader.common.SchemaProvider +import com.snowplowanalytics.snowplow.rdbloader.common.SchemaProvider.SchemaWithKey /** Represents transformed data in blob storage */ @@ -55,6 +59,8 @@ object Transformed { def name: String def format: ShreddedFormat def model: Int + def revision: Int + def addition: Int def data: Data.DString } @@ -68,6 +74,8 @@ object Transformed { vendor: String, name: String, model: Int, + revision: Int, + addition: Int, data: Data.DString ) extends Shredded { val format = ShreddedFormat.JSON @@ -78,6 +86,8 @@ object Transformed { vendor: String, name: String, model: Int, + revision: Int, + addition: Int, data: Data.DString ) extends Shredded { val isGood = true // We don't support TSV shredding for bad data @@ -111,7 +121,7 @@ object Transformed { */ def shredEvent[F[_]: Monad: Clock: RegistryLookup]( igluResolver: Resolver[F], - propertiesCache: PropertiesCache[F], + shredModelCache: ShredModelCache[F], isTabular: SchemaKey => Boolean, processor: Processor )( @@ -121,18 +131,54 @@ object Transformed { .fromEvent(event) .traverse { hierarchy => val tabular = isTabular(hierarchy.entity.schema) - fromHierarchy(tabular, igluResolver, propertiesCache)(hierarchy) + fromHierarchy(tabular, igluResolver, shredModelCache)(hierarchy) } .leftMap(error => EventUtils.shreddingBadRow(event, processor)(NonEmptyList.one(error))) .map { shredded => val data = EventUtils.alterEnrichedEvent(event) - val atomic = Shredded.Tabular(AtomicSchema.vendor, AtomicSchema.name, AtomicSchema.version.model, Transformed.Data.DString(data)) + val atomic = Shredded.Tabular( + AtomicSchema.vendor, + AtomicSchema.name, + AtomicSchema.version.model, + AtomicSchema.version.revision, + AtomicSchema.version.addition, + Transformed.Data.DString(data) + ) atomic :: shredded } def wideRowEvent(event: Event): Transformed = WideRow(good = true, Transformed.Data.DString(event.toJson(true).noSpaces)) + /** + * Lookup ShredModel for given SchemaKey and evaluate if not found + */ + def lookupShredModel[F[_]: Monad: Clock: RegistryLookup]( + schemaKey: SchemaKey, + shredModelCache: ShredModelCache[F], + resolver: => Resolver[F] + ): EitherT[F, FailureDetails.LoaderIgluError, ShredModel] = + EitherT + .liftF(shredModelCache.get(schemaKey)) + .flatMap( + _.fold( + SchemaProvider + .fetchSchemasWithSameModel(resolver, schemaKey) + .flatMap { schemaWithKeyList => + EitherT + .fromOption[F][FailureDetails.LoaderIgluError, NonEmptyList[SchemaWithKey]]( + NonEmptyList.fromList(schemaWithKeyList), + FailureDetails.LoaderIgluError.InvalidSchema(schemaKey, s"Empty resolver response for $schemaKey") + ) + .map { nel => + val schemas = nel.map(swk => SelfDescribingSchema[Schema](SchemaMap(swk.schemaKey), swk.schema)) + foldMapRedshiftSchemas(schemas)(schemaKey) + } + .semiflatTap(shredModel => shredModelCache.put(schemaKey, shredModel)) + } + )(EitherT.pure(_)) + ) + /** * Transform JSON `Hierarchy`, extracted from enriched into a `Shredded` entity, specifying how it * should look like in destination: JSON or TSV If flattening algorithm failed at any point - it @@ -148,20 +194,37 @@ object Transformed { private def fromHierarchy[F[_]: Monad: Clock: RegistryLookup]( tabular: Boolean, resolver: => Resolver[F], - propertiesCache: PropertiesCache[F] + shredModelCache: ShredModelCache[F] )( hierarchy: Hierarchy ): EitherT[F, FailureDetails.LoaderIgluError, Transformed] = { val vendor = hierarchy.entity.schema.vendor val name = hierarchy.entity.schema.name + val meta = EventUtils.buildMetadata(hierarchy.eventId, hierarchy.collectorTstamp, hierarchy.entity.schema) if (tabular) { - EventUtils.flatten(resolver, propertiesCache, hierarchy.entity).map { columns => - val meta = EventUtils.buildMetadata(hierarchy.eventId, hierarchy.collectorTstamp, hierarchy.entity.schema) - Shredded.Tabular(vendor, name, hierarchy.entity.schema.version.model, Transformed.Data.DString((meta ++ columns).mkString("\t"))) - } + lookupShredModel(hierarchy.entity.schema, shredModelCache, resolver) + .map { shredModel => + val columns = shredModel.jsonToStrings(hierarchy.entity.data) + Shredded.Tabular( + vendor, + name, + hierarchy.entity.schema.version.model, + hierarchy.entity.schema.version.revision, + hierarchy.entity.schema.version.addition, + Transformed.Data.DString((meta ++ columns).mkString("\t")) + ) + } } else EitherT.pure[F, FailureDetails.LoaderIgluError]( - Shredded.Json(true, vendor, name, hierarchy.entity.schema.version.model, Transformed.Data.DString(hierarchy.dumpJson)) + Shredded.Json( + true, + vendor, + name, + hierarchy.entity.schema.version.model, + hierarchy.entity.schema.version.revision, + hierarchy.entity.schema.version.addition, + Transformed.Data.DString(hierarchy.dumpJson) + ) ) } } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/package.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/package.scala index 4d7f78e96..a42799873 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/package.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/package.scala @@ -14,20 +14,18 @@ */ package com.snowplowanalytics.snowplow.rdbloader.common -import com.snowplowanalytics.iglu.client.resolver.Resolver.SchemaListKey -import com.snowplowanalytics.iglu.client.resolver.StorageTime +import com.snowplowanalytics.iglu.core.SchemaKey +import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel import java.time.{Instant, ZoneOffset} import java.time.format.DateTimeFormatter import com.snowplowanalytics.lrumap.LruMap -import com.snowplowanalytics.iglu.schemaddl.Properties package object transformation { private val Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") - type PropertiesKey = (SchemaListKey, StorageTime) - type PropertiesCache[F[_]] = LruMap[F, PropertiesKey, Properties] + type ShredModelCache[F[_]] = LruMap[F, SchemaKey, ShredModel] implicit class InstantOps(time: Instant) { def formatted: String = diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CachedFlatteningSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CachedFlatteningSpec.scala deleted file mode 100644 index bc5487eb1..000000000 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CachedFlatteningSpec.scala +++ /dev/null @@ -1,207 +0,0 @@ -package com.snowplowanalytics.snowplow.rdbloader.common - -import cats.effect.Clock -import cats.{Applicative, Id, Monad} -import com.snowplowanalytics.iglu.client.resolver.Resolver -import com.snowplowanalytics.iglu.client.resolver.registries.{Registry, RegistryError, RegistryLookup} -import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaList, SelfDescribingData} -import com.snowplowanalytics.iglu.schemaddl.Properties -import com.snowplowanalytics.lrumap.CreateLruMap -import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey} -import io.circe.Json -import io.circe.literal.JsonStringContext -import org.specs2.mutable.Specification - -import scala.concurrent.duration.{DurationInt, FiniteDuration} - -import java.util.concurrent.TimeUnit - -class CachedFlatteningSpec extends Specification { - - // single 'field1' field - val `original schema - 1 field`: Json = - json""" - { - "$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", - "description": "Test schema 1", - "self": { - "vendor": "com.snowplowanalytics.snowplow", - "name": "test_schema", - "format": "jsonschema", - "version": "1-0-0" - }, - - "type": "object", - "properties": { - "field1": { "type": "string"} - } - } - """ - - // same key as schema1, but with additional `field2` field. - val `patched schema - 2 fields`: Json = - json""" - { - "$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", - "description": "Test schema 1", - "self": { - "vendor": "com.snowplowanalytics.snowplow", - "name": "test_schema", - "format": "jsonschema", - "version": "1-0-0" - }, - - "type": "object", - "properties": { - "field1": { "type": "string" }, - "field2": { "type": "integer" } - } - } - """ - - val cacheTtl = 10.seconds - val dataToFlatten = json"""{ "field1": "1", "field2": 2 }""" - val schemaKey = "iglu:com.snowplowanalytics.snowplow/test_schema/jsonschema/1-0-0" - - // (vendor, name, model) - val propertiesKey = ("com.snowplowanalytics.snowplow", "test_schema", 1) - - "Cached properties during flattening should be in sync with cached schemas/lists in iglu client" >> { - - "(1) original schema only, 1 flatten call => 1 field flattened" in { - val propertiesCache = getCache - val result = flatten(propertiesCache, getResolver)( - currentTime = 1000, // ms - schemaInRegistry = `original schema - 1 field` - ) - - result must beEqualTo(List("1")) - - // Properties are cached after first call (1 second) - propertiesCache.get((propertiesKey, 1.second)) must beSome - } - - "(2) original schema is patched between calls, no delay => original schema is still cached => 1 field flattened" in { - val propertiesCache = getCache - val resolver = getResolver - - // first call - flatten(propertiesCache, resolver)( - currentTime = 1000, // ms - schemaInRegistry = `original schema - 1 field` - ) - - // second call, same time - val result = flatten(propertiesCache, resolver)( - currentTime = 1000, // ms - schemaInRegistry = `patched schema - 2 fields` // different schema with the same key! - ) - - // no data from patched schema - result must beEqualTo(List("1")) - - // Properties are cached after first call (1 second) - propertiesCache.get((propertiesKey, 1.second)) must beSome - } - - "(3) schema is patched, delay between flatten calls is less than cache TTL => original schema is still cached => 1 field flattened" in { - val propertiesCache = getCache - val resolver = getResolver - - // first call - flatten(propertiesCache, resolver)( - currentTime = 1000, // ms - schemaInRegistry = `original schema - 1 field` - ) - - // second call, 2s later, less than 10s TTL - val result = flatten(propertiesCache, resolver)( - currentTime = 3000, // ms - schemaInRegistry = `patched schema - 2 fields` // different schema with the same key! - ) - - // no data from patched schema - result must beEqualTo(List("1")) - - // Properties are cached after first call (1 second) - propertiesCache.get((propertiesKey, 1.second)) must beSome - - // Properties are not cached after second call (3 seconds) - propertiesCache.get((propertiesKey, 3.second)) must beNone - } - - "(4) schema is patched, delay between flatten calls is greater than cache TTL => original schema is expired => using patched schema => 2 field flattened" in { - val propertiesCache = getCache - val resolver = getResolver - - // first call - flatten(propertiesCache, resolver)( - currentTime = 1000, // ms - schemaInRegistry = `original schema - 1 field` - ) - - // second call, 12s later, greater than 10s TTL - val result = flatten(propertiesCache, resolver)( - currentTime = 13000, // ms - schemaInRegistry = `patched schema - 2 fields` // different schema with the same key! - ) - - // Cache content expired, patched schema is fetched => 2 fields flattened - result must beEqualTo(List("1", "2")) - - // Properties are cached after first call (1 second) - propertiesCache.get((propertiesKey, 1.second)) must beSome - - // Properties are cached after second call (13 seconds) - propertiesCache.get((propertiesKey, 13.second)) must beSome - } - } - - // Helper method to wire all test dependencies and execute EventUtils.flatten - private def flatten( - propertiesCache: PropertiesCache[Id], - resolver: Resolver[Id] - )( - currentTime: Long, - schemaInRegistry: Json - ): List[String] = { - - // To return value stored in the schemaInRegistry variable, passed registry is ignored - val testRegistryLookup: RegistryLookup[Id] = new RegistryLookup[Id] { - override def lookup(registry: Registry, schemaKey: SchemaKey): Id[Either[RegistryError, Json]] = - Right(schemaInRegistry) - - override def list( - registry: Registry, - vendor: String, - name: String, - model: Int - ): Id[Either[RegistryError, SchemaList]] = - Right(SchemaList(List(SchemaKey.fromUri("iglu:com.snowplowanalytics.snowplow/test_schema/jsonschema/1-0-0").right.get))) - } - - val staticClock: Clock[Id] = new Clock[Id] { - override def applicative: Applicative[Id] = Applicative[Id] - - override def monotonic: Id[FiniteDuration] = FiniteDuration(currentTime * 1000000, TimeUnit.NANOSECONDS) - - override def realTime: Id[FiniteDuration] = FiniteDuration(currentTime, TimeUnit.MILLISECONDS) - } - - val data = SelfDescribingData(schema = SchemaKey.fromUri(schemaKey).right.get, data = dataToFlatten) - - EventUtils.flatten(resolver, propertiesCache, data)(Monad[Id], staticClock, testRegistryLookup).value.right.get - } - - private def getCache: PropertiesCache[Id] = CreateLruMap[Id, PropertiesKey, Properties].create(100) - - private def getResolver: Resolver[Id] = - Resolver.init[Id]( - cacheSize = 10, - cacheTtl = Some(cacheTtl), - refs = Registry.Embedded( // not used in test as we fix returned schema in custom test RegistryLookup - Registry.Config("Test", 0, List.empty), - path = "/fake" - ) - ) -} diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CachedShredModelSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CachedShredModelSpec.scala new file mode 100644 index 000000000..d0e119a79 --- /dev/null +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CachedShredModelSpec.scala @@ -0,0 +1,97 @@ +package com.snowplowanalytics.snowplow.rdbloader.common + +import cats.{Applicative, Id, Monad} +import cats.effect.Clock +import com.snowplowanalytics.iglu.client.resolver.registries.{Registry, RegistryError, RegistryLookup} +import com.snowplowanalytics.iglu.client.resolver.Resolver +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaList, SchemaVer} +import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel +import com.snowplowanalytics.lrumap.CreateLruMap +import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, Transformed} +import io.circe.Json +import io.circe.literal.JsonStringContext +import org.specs2.mutable.Specification + +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +import java.util.concurrent.TimeUnit + +class CachedShredModelSpec extends Specification { + + val schemaKey = SchemaKey("com.snowplowanalytics.snowplow", "test_schema", "jsonschema", SchemaVer.Full(1, 0, 0)) + + val data = json"""{ "field1": "1" }""" + + val schema100: Json = + json""" + { + "$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "description": "Test schema 1", + "self": { + "vendor": "com.snowplowanalytics.snowplow", + "name": "test_schema", + "format": "jsonschema", + "version": "1-0-0" + }, + + "type": "object", + "properties": { + "field1": { "type": "string"} + } + } + """ + + "Cached shred model" should { + "original schema only, 1 field flattened" in { + + val cache = getCache + val result: ShredModel = helper(schemaKey, cache, 1000, schema100) + + result.jsonToStrings(data) must beEqualTo(List("1")) + cache.get(schemaKey) must beSome + } + } + private def helper( + schemaKey: SchemaKey, + shredModelCache: ShredModelCache[Id], + currentTime: Long, + schemaInRegistry: Json + ): ShredModel = { + + // To return value stored in the schemaInRegistry variable, passed registry is ignored + val testRegistryLookup: RegistryLookup[Id] = new RegistryLookup[Id] { + override def lookup(registry: Registry, schemaKey: SchemaKey): Id[Either[RegistryError, Json]] = + Right(schemaInRegistry) + + override def list( + registry: Registry, + vendor: String, + name: String, + model: Int + ): Id[Either[RegistryError, SchemaList]] = + Right(SchemaList(List(SchemaKey.fromUri("iglu:com.snowplowanalytics.snowplow/test_schema/jsonschema/1-0-0").right.get))) + } + + val clock: Clock[Id] = new Clock[Id] { + override def applicative: Applicative[Id] = Applicative[Id] + + override def monotonic: Id[FiniteDuration] = FiniteDuration(currentTime * 1000000, TimeUnit.NANOSECONDS) + + override def realTime: Id[FiniteDuration] = FiniteDuration(currentTime, TimeUnit.MILLISECONDS) + } + + Transformed.lookupShredModel[Id](schemaKey, shredModelCache, getResolver)(Monad[Id], clock, testRegistryLookup).value.right.get + } + + private def getCache: ShredModelCache[Id] = CreateLruMap[Id, SchemaKey, ShredModel].create(100) + + private def getResolver: Resolver[Id] = + Resolver.init[Id]( + cacheSize = 10, + cacheTtl = Some(10.seconds), + refs = Registry.Embedded( // not used in test as we fix returned schema in custom test RegistryLookup + Registry.Config("Test", 0, List.empty), + path = "/fake" + ) + ) +} diff --git a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala index bb6d9dd0b..c6a4324ea 100644 --- a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala +++ b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala @@ -20,7 +20,7 @@ import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig import com.snowplowanalytics.iglu.schemaddl.parquet.FieldValue import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.ParquetTransformer import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields -import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.spark.singleton.{IgluSingleton, PropertiesCacheSingleton} +import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.spark.singleton.{IgluSingleton, ShredModelCacheSingleton} import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType @@ -92,7 +92,7 @@ object Transformer { Transformed .shredEvent[Id]( IgluSingleton.get(resolverConfig), - PropertiesCacheSingleton.get(resolverConfig), + ShredModelCacheSingleton.get(resolverConfig), isTabular, ShredJob.BadRowsProcessor )(event) @@ -107,10 +107,10 @@ object Transformer { } def badTransform(badRow: BadRow, badEventsCounter: LongAccumulator): Transformed = { - val SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)) = badRow.schemaKey + val SchemaKey(vendor, name, _, SchemaVer.Full(model, revision, addition)) = badRow.schemaKey val data = Transformed.Data.DString(badRow.compact) badEventsCounter.add(1L) - Transformed.Shredded.Json(false, vendor, name, model, data) + Transformed.Shredded.Json(false, vendor, name, model, revision, addition, data) } def typesInfo: TypesInfo = TypesInfo.Shredded(typesAccumulator.value.toList) @@ -213,7 +213,7 @@ object Transformer { } type WideRowTuple = (String, String) - type ShreddedTuple = (String, String, String, String, Int, String) + type ShreddedTuple = (String, String, String, String, Int, Int, Int, String) private implicit class TransformedOps(t: Transformed) { def wideRow: Option[WideRowTuple] = t match { @@ -226,7 +226,7 @@ object Transformer { def shredded: Option[ShreddedTuple] = t match { case p: Transformed.Shredded => val outputType = if (p.isGood) "good" else "bad" - (outputType, p.vendor, p.name, p.format.path, p.model, p.data.value).some + (outputType, p.vendor, p.name, p.format.path, p.model, p.revision, p.addition, p.data.value).some case _ => None } diff --git a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/Sink.scala b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/Sink.scala index 404883605..757446e33 100644 --- a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/Sink.scala +++ b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/Sink.scala @@ -24,15 +24,15 @@ object Sink { def writeShredded( spark: SparkSession, compression: Compression, - data: RDD[(String, String, String, String, Int, String)], + data: RDD[(String, String, String, String, Int, Int, Int, String)], outFolder: String ): Unit = { import spark.implicits._ data - .toDF("output", "vendor", "name", "format", "model", "data") + .toDF("output", "vendor", "name", "format", "model", "revision", "addition", "data") .write .withCompression(compression) - .partitionBy("output", "vendor", "name", "format", "model") + .partitionBy("output", "vendor", "name", "format", "model", "revision", "addition") .mode(SaveMode.Append) .text(outFolder) } diff --git a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/singleton.scala b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/singleton.scala index 44723af74..17a8d0245 100644 --- a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/singleton.scala +++ b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/singleton.scala @@ -21,14 +21,14 @@ import cats.syntax.show._ import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig -import com.snowplowanalytics.iglu.schemaddl.Properties - +import com.snowplowanalytics.iglu.core.SchemaKey +import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel import com.snowplowanalytics.lrumap.CreateLruMap import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.eventsmanifest.{EventsManifest, EventsManifestConfig} import com.snowplowanalytics.snowplow.rdbloader.common._ import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils.EventParser -import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey} +import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, ShredModelCache} import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.Config /** Singletons needed for unserializable or stateful classes. */ @@ -97,14 +97,14 @@ object singleton { } } - object PropertiesCacheSingleton { - @volatile private var instance: PropertiesCache[Id] = _ + object ShredModelCacheSingleton { + @volatile private var instance: ShredModelCache[Id] = _ - def get(resolverConfig: ResolverConfig): PropertiesCache[Id] = { + def get(resolverConfig: ResolverConfig): ShredModelCache[Id] = { if (instance == null) { synchronized { if (instance == null) { - instance = CreateLruMap[Id, PropertiesKey, Properties].create(resolverConfig.cacheSize) + instance = CreateLruMap[Id, SchemaKey, ShredModel].create(resolverConfig.cacheSize) } } } diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/PartitionDataFilterSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/PartitionDataFilterSpec.scala index a34ad1867..0838e97bf 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/PartitionDataFilterSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/PartitionDataFilterSpec.scala @@ -103,9 +103,9 @@ class PartitionDataFilterSpec extends Specification { private def goodWide(content: String) = WideRow(good = true, DString(content)) private def badWide(content: String) = WideRow(good = false, DString(content)) - private def goodTabShred(content: String) = Transformed.Shredded.Tabular("vendor", "name", 1, DString(content)) - private def goodJsonShred(content: String) = Transformed.Shredded.Json(isGood = true, "vendor", "name", 1, DString(content)) - private def badShred(content: String) = Transformed.Shredded.Json(isGood = false, "vendor", "name", 1, DString(content)) + private def goodTabShred(content: String) = Transformed.Shredded.Tabular("vendor", "name", 1, 0, 0, DString(content)) + private def goodJsonShred(content: String) = Transformed.Shredded.Json(isGood = true, "vendor", "name", 1, 0, 0, DString(content)) + private def badShred(content: String) = Transformed.Shredded.Json(isGood = false, "vendor", "name", 1, 0, 0, DString(content)) } object PartitionDataFilterSpec { diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala index 64bbe6d8a..2c4af2250 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala @@ -68,7 +68,7 @@ object ShredJobSpec { val Name = "snowplow-transformer-batch" val Version = BuildInfo.version - val AtomicFolder = "vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1" + val AtomicFolder = "vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0" sealed trait Events diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/CrossBatchDeduplicationSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/CrossBatchDeduplicationSpec.scala index 6ae60c4c1..0d9abc058 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/CrossBatchDeduplicationSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/CrossBatchDeduplicationSpec.scala @@ -57,7 +57,7 @@ object CrossBatchDeduplicationSpec { ) object expected { - val additionalContextPath = "vendor=com.snowplowanalytics.snowplow/name=ua_parser_context/format=json/model=1" + val additionalContextPath = "vendor=com.snowplowanalytics.snowplow/name=ua_parser_context/format=json/model=1/revision=0/addition=0" val additionalContextContents1 = s""" |{ diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/DerivedContextsSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/DerivedContextsSpec.scala index 0e28b6655..763e72a19 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/DerivedContextsSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/DerivedContextsSpec.scala @@ -25,7 +25,7 @@ object DerivedContextsSpec { ) object expected { - val path = s"vendor=org.schema/name=WebPage/format=json/model=1" + val path = s"vendor=org.schema/name=WebPage/format=json/model=1/revision=0/addition=0" val contents = s"""|{ |"schema":{ diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/EmptySchemaSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/EmptySchemaSpec.scala index 8a0c80bad..81db33448 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/EmptySchemaSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/EmptySchemaSpec.scala @@ -72,8 +72,8 @@ object EmptySchemaSpec { val contexBContents = "com.snowplowanalytics.iglu\tanything-b\tjsonschema\t1-0-0\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"anything-b\"]\tevents" - val contextAPath = "vendor=com.snowplowanalytics.iglu/name=anything-a/format=tsv/model=1" - val contextBPath = "vendor=com.snowplowanalytics.iglu/name=anything-b/format=tsv/model=1" + val contextAPath = "vendor=com.snowplowanalytics.iglu/name=anything-a/format=tsv/model=1/revision=0/addition=0" + val contextBPath = "vendor=com.snowplowanalytics.iglu/name=anything-b/format=tsv/model=1/revision=0/addition=0" // Removed three JSON columns and added 7 columns at the end val event = diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/EventDeduplicationSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/EventDeduplicationSpec.scala index 4bfaf7964..7c4c8bf46 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/EventDeduplicationSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/EventDeduplicationSpec.scala @@ -72,7 +72,7 @@ class EventDeduplicationSpec extends Specification with ShredJobSpec { "A job which is provided with a two events with same event_id and both natural and synthetic deduplication are enabled" should { object expected { - val path = "vendor=com.snowplowanalytics.snowplow/name=duplicate/format=json/model=1" + val path = "vendor=com.snowplowanalytics.snowplow/name=duplicate/format=json/model=1/revision=0/addition=0" val contents = s"""|{ |"schema":{ @@ -93,7 +93,7 @@ class EventDeduplicationSpec extends Specification with ShredJobSpec { |} |}""".stripMargin.replaceAll("[\n\r]", "") - val additionalContextPath = "vendor=com.snowplowanalytics.snowplow/name=ua_parser_context/format=json/model=1" + val additionalContextPath = "vendor=com.snowplowanalytics.snowplow/name=ua_parser_context/format=json/model=1/revision=0/addition=0" val additionalContextContents = s""" |{ @@ -187,7 +187,7 @@ class EventDeduplicationSpec extends Specification with ShredJobSpec { "A job which is provided with a two events with same event_id and both natural and synthetic deduplication are disabled" should { object expected { - val additionalContextPath = "vendor=com.snowplowanalytics.snowplow/name=ua_parser_context/format=json/model=1" + val additionalContextPath = "vendor=com.snowplowanalytics.snowplow/name=ua_parser_context/format=json/model=1/revision=0/addition=0" val additionalContextContents = s""" |{ diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/ForwardCompatibleContextSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/ForwardCompatibleContextSpec.scala index 8185fbcb3..05e275ee2 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/ForwardCompatibleContextSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/ForwardCompatibleContextSpec.scala @@ -25,7 +25,7 @@ object ForwardCompatibleContextSpec { ) object expected { - val path = "vendor=org.schema/name=WebPage/format=json/model=1" + val path = "vendor=org.schema/name=WebPage/format=json/model=1/revision=0/addition=0" val contents = s"""|{ |"schema":{ diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/LinkClickEventSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/LinkClickEventSpec.scala index d9bd88eb2..486437f3b 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/LinkClickEventSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/LinkClickEventSpec.scala @@ -25,7 +25,7 @@ object LinkClickEventSpec { ) object expected { - val path = "vendor=com.snowplowanalytics.snowplow/name=link_click/format=json/model=1" + val path = "vendor=com.snowplowanalytics.snowplow/name=link_click/format=json/model=1/revision=0/addition=0" val contents = s"""|{ |"schema":{ diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/WebsitePageContextSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/WebsitePageContextSpec.scala index 660d4850b..becc947c0 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/WebsitePageContextSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/WebsitePageContextSpec.scala @@ -24,7 +24,7 @@ object WebsitePageContextSpec { ) object expected { - val path = "vendor=org.schema/name=WebPage/format=json/model=1" + val path = "vendor=org.schema/name=WebPage/format=json/model=1/revision=0/addition=0" val contents = s"""|{ |"schema":{ diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/NewlineSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/NewlineSpec.scala index 4f387bbee..0acf72cde 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/NewlineSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/NewlineSpec.scala @@ -70,7 +70,7 @@ object NewlineSpec { ) object expected { - val contextPath = s"vendor=com.snowplowanalytics.snowplow/name=change_form/format=tsv/model=1" + val contextPath = s"vendor=com.snowplowanalytics.snowplow/name=change_form/format=tsv/model=1/revision=0/addition=0" val contextContents = "com.snowplowanalytics.snowplow\tchange_form\tjsonschema\t1-0-0\tdeadbeef-dead-beef-dead-0000beefdead\t1970-01-01 00:00:00.000\tevents\t[\"events\",\"change_form\"]\tevents\tb\ta\tTEXTAREA\t\\N\t\\N\tline 1 line2 column2" diff --git a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/TabularOutputSpec.scala b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/TabularOutputSpec.scala index eec497b74..7b69cd93e 100644 --- a/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/TabularOutputSpec.scala +++ b/modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/good/tabular/TabularOutputSpec.scala @@ -65,7 +65,7 @@ object TabularOutputSpec { "stackTrace": null, "isFatal": true, "className": "AbstractSingletonFactoryBean", - "causeStackTrace": "this column should be last" + "causeStackTrace": "trace" } } }""".noSpaces @@ -75,15 +75,15 @@ object TabularOutputSpec { ) object expected { - val contextPath = s"vendor=org.schema/name=WebPage/format=tsv/model=1" + val contextPath = s"vendor=org.schema/name=WebPage/format=tsv/model=1/revision=0/addition=0" val contextContents = "org.schema\tWebPage\tjsonschema\t1-0-0\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"WebPage\"]\tevents\tJonathan Almeida\t[\"blog\",\"releases\"]\t\\N\t\\N\t2014-07-23T00:00:00Z\tblog\ten-US\t[\"snowplow\",\"analytics\",\"java\",\"jvm\",\"tracker\"]" - val eventPath = s"vendor=com.snowplowanalytics.snowplow/name=application_error/format=tsv/model=1" + val eventPath = s"vendor=com.snowplowanalytics.snowplow/name=application_error/format=tsv/model=1/revision=0/addition=2" val eventContents = - "com.snowplowanalytics.snowplow\tapplication_error\tjsonschema\t1-0-2\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"application_error\"]\tevents\tundefined is not a function\tJAVASCRIPT\tAbstractSingletonFactoryBean\t\\N\t1\t\\N\t\\N\t14\t\\N\t\\N\t\\N\tthis column should be last" + "com.snowplowanalytics.snowplow\tapplication_error\tjsonschema\t1-0-2\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"application_error\"]\tevents\tundefined is not a function\tJAVASCRIPT\ttrace\tAbstractSingletonFactoryBean\t\\N\t\\N\t1\t\\N\t\\N\t\\N\t14\t\\N" // Removed three JSON columns and added 7 columns at the end val event = diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d689cdbed..2019d5e2b 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -44,7 +44,7 @@ object Dependencies { val spark = "3.3.1" val eventsManifest = "0.3.0" - val schemaDdl = "0.17.1" + val schemaDdl = "0.18.0-M18" val jacksonModule = "2.14.2" // Override incompatible version in spark runtime val jacksonDatabind = "2.14.2" val parquet4s = "2.10.0"