Skip to content

Commit

Permalink
Upgrade schema-ddl to 0.18.0 for transformers
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Apr 6, 2023
1 parent 93f9698 commit d99cc98
Show file tree
Hide file tree
Showing 26 changed files with 250 additions and 448 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object Processing {
): Stream[F, Unit] = {
val transformer: Transformer[F] = config.formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(resources.igluResolver, resources.propertiesCache, f, processor)
Transformer.ShredTransformer(resources.igluResolver, resources.shredModelCache, f, processor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(resources.igluResolver, f, processor)
}
Expand Down Expand Up @@ -260,7 +260,9 @@ object Processing {
implicit class TransformedOps(t: Transformed) {
def getPath: SinkPath = t match {
case p: Transformed.Shredded =>
val suffix = Some(s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/")
val suffix = Some(
s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}"
)
val pathType = if (p.isGood) SinkPath.PathType.Good else SinkPath.PathType.Bad
SinkPath(suffix, pathType)
case p: Transformed.WideRow =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import io.sentry.SentryClient
import com.snowplowanalytics.iglu.client.resolver.{CreateResolverCache, Resolver}
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup}
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.rdbloader.common.Sentry
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils.EventParser
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, ShredModelCache}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Config.Output.Bad
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.metrics.Metrics
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.BadSink
Expand All @@ -49,7 +50,7 @@ import org.http4s.blaze.client.BlazeClientBuilder

case class Resources[F[_], C](
igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
shredModelCache: ShredModelCache[F],
eventParser: EventParser,
producer: Queue.Producer[F],
instanceId: String,
Expand Down Expand Up @@ -82,7 +83,7 @@ object Resources {
producer <- mkQueue(config.queue)
resolverConfig <- mkResolverConfig(igluConfig)
resolver <- mkResolver(resolverConfig)
propertiesCache <- Resource.eval(CreateLruMap[F, PropertiesKey, Properties].create(resolverConfig.cacheSize))
shredModelCache <- Resource.eval(CreateLruMap[F, SchemaKey, ShredModel].create(resolverConfig.cacheSize))
httpClient <- BlazeClientBuilder[F].withExecutionContext(executionContext).resource
implicit0(registryLookup: RegistryLookup[F]) <- Resource.pure(Http4sRegistryLookup[F](httpClient))
eventParser <- mkEventParser(resolver, config)
Expand All @@ -104,7 +105,7 @@ object Resources {
badSink <- mkBadSink(config, mkBadQueue)
} yield Resources(
resolver,
propertiesCache,
shredModelCache,
eventParser,
producer,
instanceId.toString,
Expand Down Expand Up @@ -145,13 +146,17 @@ object Resources {
case Left(error) => Sync[F].raiseError[Resolver[F]](error)
}
}
private def mkEventParser[F[_]: Sync: Clock](igluResolver: Resolver[F], config: Config): Resource[F, EventParser] = Resource.eval {
mkAtomicLengths(igluResolver, config).flatMap {
case Right(atomicLengths) => Sync[F].pure(Event.parser(atomicLengths))
case Left(error) => Sync[F].raiseError[EventParser](error)
private def mkEventParser[F[_]: Sync: RegistryLookup: Clock](igluResolver: Resolver[F], config: Config): Resource[F, EventParser] =
Resource.eval {
mkAtomicLengths(igluResolver, config).flatMap {
case Right(atomicLengths) => Sync[F].pure(Event.parser(atomicLengths))
case Left(error) => Sync[F].raiseError[EventParser](error)
}
}
}
private def mkAtomicLengths[F[_]: Sync: Clock](igluResolver: Resolver[F], config: Config): F[Either[RuntimeException, Map[String, Int]]] =
private def mkAtomicLengths[F[_]: Sync: RegistryLookup: Clock](
igluResolver: Resolver[F],
config: Config
): F[Either[RuntimeException, Map[String, Int]]] =
if (config.featureFlags.truncateAtomicFields) {
EventUtils.getAtomicLengths(igluResolver)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{SnowplowEntity, TypesInfo}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats.WideRow
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{
AtomicFieldsProvider,
Expand All @@ -45,7 +45,7 @@ sealed trait Transformer[F[_]] extends Product with Serializable {
object Transformer {
case class ShredTransformer[F[_]: Monad: RegistryLookup: Clock](
igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
shredModelCache: ShredModelCache[F],
formats: Formats.Shred,
processor: Processor
) extends Transformer[F] {
Expand All @@ -59,12 +59,12 @@ object Transformer {
else TypesInfo.Shredded.ShreddedFormat.JSON

def goodTransform(event: Event): EitherT[F, BadRow, List[Transformed]] =
Transformed.shredEvent[F](igluResolver, propertiesCache, isTabular, processor)(event)
Transformed.shredEvent[F](igluResolver, shredModelCache, isTabular, processor)(event)

def badTransform(badRow: BadRow): Transformed = {
val SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)) = badRow.schemaKey
val SchemaKey(vendor, name, _, SchemaVer.Full(model, revision, addition)) = badRow.schemaKey
val data = Transformed.Data.DString(badRow.compact)
Transformed.Shredded.Json(false, vendor, name, model, data)
Transformed.Shredded.Json(false, vendor, name, model, revision, addition, data)
}

def typesInfo(types: Set[Data.ShreddedType]): TypesInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,28 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0"
)
)
actualOptimizelyRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1/revision=0/addition=0"
)
)
actualConsentRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1/revision=0/addition=0"
)
)
actualBadRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/revision=0/addition=0"
)
)

Expand Down Expand Up @@ -99,14 +99,14 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0"
)
)
actualBadRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/revision=0/addition=0"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import io.circe.optics.JsonPath._
import io.circe.parser.{parse => parseCirce}
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.Registry
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, PropertiesKey, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{Processing, Transformer}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.{Checkpointer, ParsedC}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.TestApplication._
Expand All @@ -47,12 +48,12 @@ class TransformingSpec extends Specification {

val testFileNameMap = List(
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "atomic", 1, dummyTransformedData)
.Tabular("com.snowplowanalytics.snowplow", "atomic", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-atomic",
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, dummyTransformedData)
.Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-consent_document",
Transformed.Shredded.Tabular("com.optimizely", "state", 1, dummyTransformedData).getPath -> "com.optimizely-state"
Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath -> "com.optimizely-state"
).toMap

val expectedTransformedMap =
Expand Down Expand Up @@ -128,12 +129,12 @@ object TransformingSpec {
val defaultWindow = Window(1, 1, 1, 1, 1)
val dummyTransformedData = Transformed.Data.DString("")

def propertiesCache: PropertiesCache[IO] = CreateLruMap[IO, PropertiesKey, Properties].create(100).unsafeRunSync()
def shredModelCache: ShredModelCache[IO] = CreateLruMap[IO, SchemaKey, ShredModel].create(100).unsafeRunSync()

def createTransformer(formats: TransformerConfig.Formats): Transformer[IO] =
formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(defaultIgluResolver, propertiesCache, f, TestProcessor)
Transformer.ShredTransformer(defaultIgluResolver, shredModelCache, f, TestProcessor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(defaultIgluResolver, f, TestProcessor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.time.format.DateTimeParseException

import io.circe.Json
import cats.Monad
import cats.data.{EitherT, NonEmptyList}
import cats.data.NonEmptyList
import cats.implicits._

import cats.effect.Clock
Expand All @@ -30,16 +30,13 @@ import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.iglu.schemaddl.migrations.FlatData
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, ParsingError}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload, Processor}
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import Flattening.NullCharacter
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.snowplow.analytics.scalasdk.decode.TSVParser
import com.snowplowanalytics.snowplow.rdbloader.common.Common

object EventUtils {

Expand Down Expand Up @@ -138,34 +135,6 @@ object EventUtils {
"events"
)

/**
* Transform a self-desribing entity into tabular format, using its known schemas to get a correct
* order of columns
* @param resolver
* Iglu resolver to get list of known schemas
* @param instance
* self-describing JSON that needs to be transformed
* @return
* list of columns or flattening error
*/
def flatten[F[_]: Monad: Clock: RegistryLookup](
resolver: Resolver[F],
propertiesCache: PropertiesCache[F],
instance: SelfDescribingData[Json]
): EitherT[F, FailureDetails.LoaderIgluError, List[String]] =
Flattening
.getDdlProperties(resolver, propertiesCache, instance.schema)
.map(props => mapProperties(props, instance))

private def mapProperties(props: Properties, instance: SelfDescribingData[Json]) =
props
.map { case (pointer, _) =>
FlatData.getPath(pointer.forData, instance.data, getString, NullCharacter)
}

def getString(json: Json): String =
json.fold(NullCharacter, transformBool, _ => json.show, escape, _ => escape(json.noSpaces), _ => escape(json.noSpaces))

def getEntities(event: Event): List[SelfDescribingData[Json]] =
event.unstruct_event.data.toList ++
event.derived_contexts.data ++
Expand All @@ -178,11 +147,6 @@ object EventUtils {
case _ => 0
}

/** Prevents data with newlines and tabs from breaking the loading process */
private def escape(s: String): String =
if (s == NullCharacter) "\\\\N"
else s.replace('\t', ' ').replace('\n', ' ')

private def getLength(schema: Schema): Option[Int] =
schema.maxLength.map(_.value.toInt)
}
Loading

0 comments on commit d99cc98

Please sign in to comment.