From 5dba81921dd8453f454f8fc482dbcd898bf8f8b9 Mon Sep 17 00:00:00 2001 From: Oguzhan Unlu Date: Mon, 6 Nov 2023 14:59:30 +0300 Subject: [PATCH] Loader: Support legacy file hierarchy (close #1319) --- .../aws/redshift.config.reference.hocon | 4 ++ .../snowplow/rdbloader/common/Common.scala | 16 +++-- .../loader/databricks/Databricks.scala | 16 ++++- .../loader/databricks/DatabricksSpec.scala | 34 +++++++--- .../src/main/resources/application.conf | 3 +- .../snowplow/rdbloader/Loader.scala | 10 ++- .../snowplow/rdbloader/config/Config.scala | 6 +- .../snowplow/rdbloader/db/Statement.scala | 8 ++- .../snowplow/rdbloader/db/Target.scala | 3 +- .../rdbloader/discovery/ShreddedType.scala | 18 ++++-- .../snowplow/rdbloader/loading/Load.scala | 34 +++++----- .../snowplow/rdbloader/ConfigSpec.scala | 3 +- .../snowplow/rdbloader/loading/LoadSpec.scala | 30 +++++---- .../snowplow/rdbloader/test/PureDAO.scala | 8 ++- .../snowplow/loader/redshift/Redshift.scala | 19 +++--- .../loader/redshift/RedshiftSpec.scala | 63 ++++++++++++++++++- .../snowplow/loader/snowflake/Snowflake.scala | 10 +-- 17 files changed, 214 insertions(+), 71 deletions(-) diff --git a/config/loader/aws/redshift.config.reference.hocon b/config/loader/aws/redshift.config.reference.hocon index 2a25fd8ba..a1f88699a 100644 --- a/config/loader/aws/redshift.config.reference.hocon +++ b/config/loader/aws/redshift.config.reference.hocon @@ -275,5 +275,9 @@ # e.g. [ "iglu:com.example/myschema1/jsonschema/1-*-*", "iglu:com.example/myschema2/jsonschema/1-*-*"] # Optional, empty list by default "disableMigration": [] + + # Use legacy file hierarchy, i.e. vendor/name/format/model , to decide which path to load transformed events from + # This should be in sync with transformer configuration + "legacyPartitioning": false } } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala index 65314f0ff..b79c633a3 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala @@ -26,16 +26,22 @@ object Common { val AtomicSchema: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0)) val AtomicType = TypesInfo.Shredded.Type(AtomicSchema, TypesInfo.Shredded.ShreddedFormat.TSV, SnowplowEntity.SelfDescribingEvent) - val AtomicPath: String = entityPath(AtomicType) val FolderTimeFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss").withZone(ZoneId.from(ZoneOffset.UTC)) - def entityPath(entity: TypesInfo.Shredded.Type) = - s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}/revision=${entity.schemaKey.version.revision}/addition=${entity.schemaKey.version.addition}" + def entityPath(entity: TypesInfo.Shredded.Type, legacyPartitioning: Boolean) = + if (legacyPartitioning) + s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}" + else + s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}/revision=${entity.schemaKey.version.revision}/addition=${entity.schemaKey.version.addition}" - def entityPathFull(base: BlobStorage.Folder, entity: TypesInfo.Shredded.Type): BlobStorage.Folder = - BlobStorage.Folder.append(base, entityPath(entity)) + def entityPathFull( + base: BlobStorage.Folder, + entity: TypesInfo.Shredded.Type, + legacyPartitioning: Boolean + ): BlobStorage.Folder = + BlobStorage.Folder.append(base, entityPath(entity, legacyPartitioning)) /** * Remove all occurrences of access key id and secret access key from message Helps to avoid diff --git a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala index 765684a98..0af7f60e1 100644 --- a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala +++ b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala @@ -48,13 +48,23 @@ object Databricks { discovery: DataDiscovery, eventTableColumns: EventTableColumns, i: Unit, - disableMigration: List[SchemaCriterion] + disableMigration: List[SchemaCriterion], + legacyPartitioning: Boolean ): LoadStatements = { val toCopy = columnsToCopyFromDiscoveredData(discovery) val toSkip = ColumnsToSkip(getEntityColumnsPresentInDbOnly(eventTableColumns, toCopy)) NonEmptyList.one(loadAuthMethod => - Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip, discovery.typesInfo, loadAuthMethod, i) + Statement.EventsCopy( + discovery.base, + discovery.compression, + toCopy, + toSkip, + discovery.typesInfo, + loadAuthMethod, + i, + legacyPartitioning + ) ) } @@ -117,7 +127,7 @@ object Databricks { sql"""COPY INTO $frTableName FROM (SELECT _C0::VARCHAR(512) RUN_ID FROM '$frPath' $frAuth) FILEFORMAT = CSV""" - case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod, _) => + case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod, _, _) => val updatedPath = replaceScheme(path) val frTableName = Fragment.const(qualify(EventsTable.MainName)) val frPath = Fragment.const0(updatedPath.append("output=good")) diff --git a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala index 2906f0318..3deb2f786 100644 --- a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala +++ b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala @@ -64,10 +64,10 @@ class DatabricksSpec extends Specification { ) val results = target - .getLoadStatements(discovery, eventsColumns, (), Nil) + .getLoadStatements(discovery, eventsColumns, (), Nil, false) .map(f => f(LoadAuthMethod.NoCreds)) - results should be like { case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _, _), Nil) => + results should be like { case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _, _, _), Nil) => path must beEqualTo(baseFolder) compression must beEqualTo(Compression.Gzip) @@ -117,7 +117,8 @@ class DatabricksSpec extends Specification { toSkip, TypesInfo.WideRow(PARQUET, List.empty), LoadAuthMethod.NoCreds, - () + (), + false ) target.toFragment(statement).toString must beLike { case sql => @@ -143,7 +144,16 @@ class DatabricksSpec extends Specification { ) val loadAuthMethod = LoadAuthMethod.TempCreds.AWS("testAccessKey", "testSecretKey", "testSessionToken", Instant.now.plusSeconds(3600)) val statement = - Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod, ()) + Statement.EventsCopy( + baseFolder, + Compression.Gzip, + toCopy, + toSkip, + TypesInfo.WideRow(PARQUET, List.empty), + loadAuthMethod, + (), + false + ) target.toFragment(statement).toString must beLike { case sql => sql must contain( @@ -169,7 +179,8 @@ class DatabricksSpec extends Specification { toSkip, TypesInfo.WideRow(PARQUET, List.empty), LoadAuthMethod.NoCreds, - () + (), + false ) val testTarget = Databricks @@ -201,7 +212,16 @@ class DatabricksSpec extends Specification { BlobStorage.Folder.coerce("https://test.blob.core.windows.net/test-container/path1/path2") val loadAuthMethod = LoadAuthMethod.TempCreds.Azure("testToken", Instant.now.plusSeconds(3600)) val statement = - Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod, ()) + Statement.EventsCopy( + baseFolder, + Compression.Gzip, + toCopy, + toSkip, + TypesInfo.WideRow(PARQUET, List.empty), + loadAuthMethod, + (), + false + ) target.toFragment(statement).toString must beLike { case sql => sql must contain( @@ -243,7 +263,7 @@ object DatabricksSpec { Config.Retries(Config.Strategy.Constant, None, 1.minute, None), Config.Retries(Config.Strategy.Constant, None, 1.minute, None), Config.Retries(Config.Strategy.Constant, None, 1.minute, None), - Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil), + Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil, false), exampleTelemetry ) diff --git a/modules/loader/src/main/resources/application.conf b/modules/loader/src/main/resources/application.conf index fbd998e6d..8fdf060e6 100644 --- a/modules/loader/src/main/resources/application.conf +++ b/modules/loader/src/main/resources/application.conf @@ -46,7 +46,8 @@ }, "featureFlags": { "addLoadTstampColumn": true, - "disableMigration": [] + "disableMigration": [], + "legacyPartitioning": false } "telemetry": { "disable": false diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala index 87ebe0568..eaea03bc1 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala @@ -208,7 +208,15 @@ object Loader { start <- Clock[F].realTimeInstant _ <- discovery.origin.timestamps.min.map(t => Monitoring[F].periodicMetrics.setEarliestKnownUnloadedData(t)).sequence.void result <- - Load.load[F, C, I](setStageC, incrementAttemptsC, discovery, initQueryResult, target, config.featureFlags.disableMigration) + Load.load[F, C, I]( + setStageC, + incrementAttemptsC, + discovery, + initQueryResult, + target, + config.featureFlags.disableMigration, + config.featureFlags.legacyPartitioning + ) attempts <- control.getAndResetAttempts _ <- result match { case Load.LoadSuccess(ingested) => diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala index ec6dbb956..86b2fc9f2 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala @@ -124,7 +124,11 @@ object Config { backoff: FiniteDuration, cumulativeBound: Option[FiniteDuration] ) - final case class FeatureFlags(addLoadTstampColumn: Boolean, disableMigration: List[SchemaCriterion]) + final case class FeatureFlags( + addLoadTstampColumn: Boolean, + disableMigration: List[SchemaCriterion], + legacyPartitioning: Boolean + ) sealed trait Strategy object Strategy { diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala index 9bd0fa467..162f2dad7 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala @@ -61,7 +61,8 @@ object Statement { columnsToSkip: ColumnsToSkip, typesInfo: TypesInfo, loadAuthMethod: LoadAuthMethod, - initQueryResult: T + initQueryResult: T, + legacyPartitioning: Boolean ) extends Statement with Loading { def table: String = EventsTable.MainName @@ -72,11 +73,12 @@ object Statement { compression: Compression, loadAuthMethod: LoadAuthMethod, shredModel: ShredModel, - tableName: String + tableName: String, + legacyPartitioning: Boolean ) extends Statement with Loading { def table: String = shreddedType.info.getName - def path: String = shreddedType.getLoadPath + def path: String = shreddedType.getLoadPath(legacyPartitioning) def title = s"COPY $table FROM $path" } case class CreateTempEventTable(table: String) extends Loading { diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala index b9c5ee157..48a3d93c4 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala @@ -44,7 +44,8 @@ trait Target[I] { discovery: DataDiscovery, eventTableColumns: EventTableColumns, initQueryResult: I, - disableMigration: List[SchemaCriterion] + disableMigration: List[SchemaCriterion], + legacyPartitioning: Boolean ): LoadStatements /** Get DDL of a manifest table */ diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala index 81f0e6970..2230ac0d5 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala @@ -31,7 +31,7 @@ sealed trait ShreddedType { def info: ShreddedType.Info /** Get S3 prefix which DB should COPY FROM */ - def getLoadPath: String + def getLoadPath(legacyPartitioning: Boolean): String /** Human-readable form */ def show: String @@ -60,8 +60,11 @@ object ShreddedType { * existing JSONPaths file */ final case class Json(info: Info, jsonPaths: BlobStorage.Key) extends ShreddedType { - def getLoadPath: String = - s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=json/model=${info.version.model}/revision=${info.version.revision}/addition=${info.version.addition}" + def getLoadPath(legacyPartitioning: Boolean): String = + if (legacyPartitioning) + s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=json/model=${info.version.model}" + else + s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=json/model=${info.version.model}/revision=${info.version.revision}/addition=${info.version.addition}" def show: String = s"${info.toCriterion.asString} ($jsonPaths)" } @@ -74,14 +77,17 @@ object ShreddedType { * raw metadata extracted from S3 Key */ final case class Tabular(info: Info) extends ShreddedType { - def getLoadPath: String = - s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.version.model}/revision=${info.version.revision}/addition=${info.version.addition}" + def getLoadPath(legacyPartitioning: Boolean): String = + if (legacyPartitioning) + s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.version.model}" + else + s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.version.model}/revision=${info.version.revision}/addition=${info.version.addition}" def show: String = s"${info.toCriterion.asString} TSV" } final case class Widerow(info: Info) extends ShreddedType { - def getLoadPath: String = s"${info.base}${Common.GoodPrefix}" + def getLoadPath(legacyPartitioning: Boolean): String = s"${info.base}${Common.GoodPrefix}" def show: String = s"${info.toCriterion.asString} WIDEROW" } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala index c92c0bbcb..259bc69bb 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala @@ -88,14 +88,17 @@ object Load { discovery: DataDiscovery.WithOrigin, initQueryResult: I, target: Target[I], - disableMigration: List[SchemaCriterion] + disableMigration: List[SchemaCriterion], + legacyPartitioning: Boolean ): F[LoadResult] = for { _ <- TargetCheck.prepareTarget[F, C] migrations <- Migration.build[F, C, I](discovery.discovery, target, disableMigration) _ <- getPreTransactions(setStage, migrations.preTransaction, incrementAttempt).traverse_(Transaction[F, C].run(_)) result <- Transaction[F, C].transact { - getTransaction[C, I](setStage, discovery, initQueryResult, target, disableMigration)(migrations.inTransaction) + getTransaction[C, I](setStage, discovery, initQueryResult, target, disableMigration, legacyPartitioning)( + migrations.inTransaction + ) .onError { case _: Throwable => incrementAttempt } } } yield result @@ -124,7 +127,8 @@ object Load { discovery: DataDiscovery.WithOrigin, initQueryResult: I, target: Target[I], - disableMigration: List[SchemaCriterion] + disableMigration: List[SchemaCriterion], + legacyPartitioning: Boolean )( inTransactionMigrations: F[Unit] ): F[LoadResult] = @@ -143,7 +147,7 @@ object Load { Logging[F].info(s"Loading transaction for ${discovery.origin.base} has started") *> setStage(Stage.MigrationIn) *> inTransactionMigrations *> - run[F, I](setLoading, discovery.discovery, initQueryResult, target, disableMigration) *> + run[F, I](setLoading, discovery.discovery, initQueryResult, target, disableMigration, legacyPartitioning) *> setStage(Stage.Committing) *> Manifest.add[F](discovery.origin.toManifestItem) *> Manifest @@ -198,20 +202,22 @@ object Load { discovery: DataDiscovery, initQueryResult: I, target: Target[I], - disableMigration: List[SchemaCriterion] + disableMigration: List[SchemaCriterion], + legacyPartitioning: Boolean ): F[Unit] = for { _ <- Logging[F].info(s"Loading ${discovery.base}") existingEventTableColumns <- if (target.requiresEventsColumns) Control.getColumns[F](EventsTable.MainName) else Nil.pure[F] - _ <- target.getLoadStatements(discovery, existingEventTableColumns, initQueryResult, disableMigration).traverse_ { genStatement => - for { - loadAuthMethod <- LoadAuthService[F].forLoadingEvents - // statement must be generated as late as possible, to have fresh and valid credentials. - statement = genStatement(loadAuthMethod) - _ <- Logging[F].info(statement.title) - _ <- setLoading(statement.table) - _ <- DAO[F].executeUpdate(statement, DAO.Purpose.Loading).void - } yield () + _ <- target.getLoadStatements(discovery, existingEventTableColumns, initQueryResult, disableMigration, legacyPartitioning).traverse_ { + genStatement => + for { + loadAuthMethod <- LoadAuthService[F].forLoadingEvents + // statement must be generated as late as possible, to have fresh and valid credentials. + statement = genStatement(loadAuthMethod) + _ <- Logging[F].info(statement.title) + _ <- setLoading(statement.table) + _ <- DAO[F].executeUpdate(statement, DAO.Purpose.Loading).void + } yield () } _ <- Logging[F].info(s"Folder [${discovery.base}] has been loaded (not committed yet)") } yield () diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala index 83f174408..b0a5f7225 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala @@ -218,7 +218,8 @@ object ConfigSpec { 1.hour ) val exampleInitRetries: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour)) - val exampleFeatureFlags: Config.FeatureFlags = Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil) + val exampleFeatureFlags: Config.FeatureFlags = + Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil, legacyPartitioning = false) val exampleCloud: Config.Cloud = Config.Cloud.AWS(exampleRegion, exampleMessageQueue) val exampleTelemetry = Telemetry.Config( diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala index 2861edabb..133e1f5db 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala @@ -83,10 +83,11 @@ class LoadSpec extends Specification { ColumnsToSkip(List.empty), TypesInfo.Shredded(List.empty), LoadAuthMethod.NoCreds, - () + (), + false ) ), - LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip, LoadAuthMethod.NoCreds, model, model.tableName)), + LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip, LoadAuthMethod.NoCreds, model, model.tableName, false)), LogEntry.Sql(Statement.ManifestAdd(LoadSpec.dataDiscoveryWithOrigin.origin.toManifestItem)), LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), PureTransaction.CommitMessage @@ -99,7 +100,8 @@ class LoadSpec extends Specification { LoadSpec.dataDiscoveryWithOrigin, (), PureDAO.DummyTarget, - Nil + Nil, + false ) .runS @@ -131,7 +133,8 @@ class LoadSpec extends Specification { LoadSpec.dataDiscoveryWithOrigin, (), PureDAO.DummyTarget, - Nil + Nil, + false ) .runS @@ -164,7 +167,8 @@ class LoadSpec extends Specification { LoadSpec.dataDiscoveryWithOrigin, (), PureDAO.DummyTarget, - Nil + Nil, + false ) .runS @@ -209,10 +213,11 @@ class LoadSpec extends Specification { ColumnsToSkip(List.empty), TypesInfo.Shredded(List.empty), LoadAuthMethod.NoCreds, - () + (), + false ) ), - LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip, LoadAuthMethod.NoCreds, model, model.tableName)), + LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip, LoadAuthMethod.NoCreds, model, model.tableName, false)), PureTransaction.RollbackMessage, LogEntry.Message("TICK REALTIME"), LogEntry.Message("SLEEP 30000000000 nanoseconds"), @@ -226,10 +231,11 @@ class LoadSpec extends Specification { ColumnsToSkip(List.empty), TypesInfo.Shredded(List.empty), LoadAuthMethod.NoCreds, - () + (), + false ) ), - LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip, LoadAuthMethod.NoCreds, model, model.tableName)), + LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip, LoadAuthMethod.NoCreds, model, model.tableName, false)), LogEntry.Sql(Statement.ManifestAdd(LoadSpec.dataDiscoveryWithOrigin.origin.toManifestItem)), LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), PureTransaction.CommitMessage @@ -241,7 +247,8 @@ class LoadSpec extends Specification { LoadSpec.dataDiscoveryWithOrigin, (), PureDAO.DummyTarget, - Nil + Nil, + false ) .runS @@ -282,7 +289,8 @@ class LoadSpec extends Specification { LoadSpec.dataDiscoveryWithOrigin, (), PureDAO.DummyTarget, - Nil + Nil, + false ) .runS diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala index 5f3156ea4..05fdf4374 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala @@ -85,7 +85,8 @@ object PureDAO { discovery: DataDiscovery, eventTableColumns: EventTableColumns, i: Unit, - disableMigration: List[SchemaCriterion] + disableMigration: List[SchemaCriterion], + legacyPartitioning: Boolean ): LoadStatements = NonEmptyList( loadAuthMethod => @@ -96,7 +97,8 @@ object PureDAO { ColumnsToSkip(List.empty), TypesInfo.Shredded(List.empty), loadAuthMethod, - i + i, + legacyPartitioning ), discovery.shreddedTypes.map { shredded => val mergeResult = discovery.shredModels(shredded.info.getSchemaKey) @@ -104,7 +106,7 @@ object PureDAO { mergeResult.recoveryModels.getOrElse(shredded.info.getSchemaKey, mergeResult.goodModel) val isMigrationDisabled = disableMigration.contains(shredded.info.toCriterion) val tableName = if (isMigrationDisabled) mergeResult.goodModel.tableName else shredModel.tableName - loadAuthMethod => Statement.ShreddedCopy(shredded, Compression.Gzip, loadAuthMethod, shredModel, tableName) + loadAuthMethod => Statement.ShreddedCopy(shredded, Compression.Gzip, loadAuthMethod, shredModel, tableName, false) } ) diff --git a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala index 2b43a781d..1e355716c 100644 --- a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala +++ b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala @@ -85,11 +85,12 @@ object Redshift { discovery: DataDiscovery, eventTableColumns: EventTableColumns, i: Unit, - disableMigration: List[SchemaCriterion] + disableMigration: List[SchemaCriterion], + legacyPartitioning: Boolean ): LoadStatements = { val shreddedStatements = discovery.shreddedTypes .filterNot(_.isAtomic) - .groupBy(_.getLoadPath) + .groupBy(_.getLoadPath(legacyPartitioning)) .values .map(_.head) // So we get only one copy statement for given path .map { shreddedType => @@ -104,7 +105,8 @@ object Redshift { discovery.compression, loadAuthMethod, shredModel, - tableName + tableName, + legacyPartitioning ) } .toList @@ -117,7 +119,8 @@ object Redshift { ColumnsToSkip.none, discovery.typesInfo, loadAuthMethod, - i + i, + legacyPartitioning ) } NonEmptyList(atomic, shreddedStatements) @@ -196,10 +199,10 @@ object Redshift { | CREDENTIALS '$frCredentials' | REGION '$frRegion' | DELIMITER '$EventFieldSeparator'""".stripMargin - case Statement.EventsCopy(path, compression, columnsToCopy, _, _, loadAuthMethod, _) => + case Statement.EventsCopy(path, compression, columnsToCopy, _, _, loadAuthMethod, _, legacyPartitioning) => // For some reasons Redshift JDBC doesn't handle interpolation in COPY statements val frTableName = Fragment.const(EventsTable.withSchema(schema)) - val frPath = Fragment.const0(Common.entityPathFull(path, Common.AtomicType)) + val frPath = Fragment.const0(Common.entityPathFull(path, Common.AtomicType, legacyPartitioning)) val frCredentials = loadAuthMethodFragment(loadAuthMethod, storage.roleArn) val frRegion = Fragment.const0(region.name) val frMaxError = Fragment.const0(maxError.toString) @@ -218,9 +221,9 @@ object Redshift { | ACCEPTINVCHARS | $frCompression""".stripMargin - case Statement.ShreddedCopy(shreddedType, compression, loadAuthMethod, shredModel, tableName) => + case Statement.ShreddedCopy(shreddedType, compression, loadAuthMethod, shredModel, tableName, legacyPartitioning) => val frTableName = Fragment.const0(qualify(tableName)) - val frPath = Fragment.const0(shreddedType.getLoadPath) + val frPath = Fragment.const0(shreddedType.getLoadPath(legacyPartitioning)) val frCredentials = loadAuthMethodFragment(loadAuthMethod, storage.roleArn) val frRegion = Fragment.const0(region.name) val frMaxError = Fragment.const0(maxError.toString) diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala index 89c7dbda3..f9686cb62 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala @@ -82,7 +82,7 @@ class RedshiftSpec extends Specification { } } - "getLoadStatements should return one COPY per unique schema (vendor, name, model)" in { + "getLoadStatements should return one COPY per unique schema (vendor, name, model, revision, addition)" in { val shreddedTypes = List( Info( vendor = "com.acme", @@ -128,7 +128,7 @@ class RedshiftSpec extends Specification { ) val result = redshift - .getLoadStatements(discovery, List.empty, (), Nil) + .getLoadStatements(discovery, List.empty, (), Nil, false) .map(f => f(LoadAuthService.LoadAuthMethod.NoCreds).title) result.size must beEqualTo(3) @@ -140,6 +140,65 @@ class RedshiftSpec extends Specification { ) ) } + + "getLoadStatements should respect legacyPartitioning flag" in { + val shreddedTypes = List( + Info( + vendor = "com.acme", + name = "event", + version = SchemaVer.Full(2, 0, 0), + entity = SelfDescribingEvent, + base = Folder.coerce("s3://my-bucket/my-path") + ), + Info( + vendor = "com.acme", + name = "event", + version = SchemaVer.Full(2, 0, 0), + entity = Context, + base = Folder.coerce("s3://my-bucket/my-path") + ), + Info( + vendor = "com.acme", + name = "event", + version = SchemaVer.Full(3, 0, 0), + entity = SelfDescribingEvent, + base = Folder.coerce("s3://my-bucket/my-path") + ), + Info( + vendor = "com.acme", + name = "event", + version = SchemaVer.Full(3, 0, 0), + entity = Context, + base = Folder.coerce("s3://my-bucket/my-path") + ) + ).map(Tabular) + + val discovery = DataDiscovery( + Folder.coerce("s3://my-bucket/my-path"), + shreddedTypes, + Compression.None, + TypesInfo.Shredded(List.empty), + Nil, + shreddedTypes.map { s => + s.info.getSchemaKey -> foldMapMergeRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s.info.getSchemaKey), Schema())) + ) + }.toMap + ) + + val result = redshift + .getLoadStatements(discovery, List.empty, (), Nil, legacyPartitioning = true) + .map(f => f(LoadAuthService.LoadAuthMethod.NoCreds).title) + + result.size must beEqualTo(3) + result.toList must containTheSameElementsAs( + List( + "COPY events FROM s3://my-bucket/my-path/", // atomic + "COPY com_acme_event_2 FROM s3://my-bucket/my-path/output=good/vendor=com.acme/name=event/format=tsv/model=2", + "COPY com_acme_event_3 FROM s3://my-bucket/my-path/output=good/vendor=com.acme/name=event/format=tsv/model=3" + ) + ) + } } } diff --git a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala index bdbaeabf5..4bbd17ba4 100644 --- a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala +++ b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala @@ -67,7 +67,8 @@ object Snowflake { discovery: DataDiscovery, eventTableColumns: EventTableColumns, initQueryResult: InitQueryResult, - disableMigration: List[SchemaCriterion] + disableMigration: List[SchemaCriterion], + legacyPartitioning: Boolean ): LoadStatements = { val columnsToCopy = columnsToCopyFromDiscoveredData(discovery) @@ -81,7 +82,8 @@ object Snowflake { ColumnsToSkip.none, discovery.typesInfo, loadAuthMethod, - initQueryResult + initQueryResult, + legacyPartitioning ) ) case _: StorageTarget.LoadAuthMethod.TempCreds => @@ -165,7 +167,7 @@ object Snowflake { case Statement.FoldersCopy(_, _, _) => throw new IllegalStateException("Init query result has wrong format in FoldersCopy") - case Statement.EventsCopy(p, _, columns, _, typesInfo, _, initQueryResult: InitQueryResult) => + case Statement.EventsCopy(p, _, columns, _, typesInfo, _, initQueryResult: InitQueryResult, _) => val updatedPath = replaceScheme(p) // This is validated on config decoding stage val stage = tgt.transformedStage.getOrElse( @@ -184,7 +186,7 @@ object Snowflake { |$frFileFormat |$frOnError""".stripMargin - case Statement.EventsCopy(_, _, _, _, _, _, _) => + case Statement.EventsCopy(_, _, _, _, _, _, _, _) => throw new IllegalStateException("Init query result has wrong format in EventsCopy") case Statement.StagePath(stage) =>