Skip to content

Commit

Permalink
Loader: Support legacy file hierarchy (close #1319)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Nov 6, 2023
1 parent 6cc1956 commit 5dba819
Show file tree
Hide file tree
Showing 17 changed files with 214 additions and 71 deletions.
4 changes: 4 additions & 0 deletions config/loader/aws/redshift.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -275,5 +275,9 @@
# e.g. [ "iglu:com.example/myschema1/jsonschema/1-*-*", "iglu:com.example/myschema2/jsonschema/1-*-*"]
# Optional, empty list by default
"disableMigration": []

# Use legacy file hierarchy, i.e. vendor/name/format/model , to decide which path to load transformed events from
# This should be in sync with transformer configuration
"legacyPartitioning": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,22 @@ object Common {
val AtomicSchema: SchemaKey =
SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0))
val AtomicType = TypesInfo.Shredded.Type(AtomicSchema, TypesInfo.Shredded.ShreddedFormat.TSV, SnowplowEntity.SelfDescribingEvent)
val AtomicPath: String = entityPath(AtomicType)

val FolderTimeFormatter: DateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss").withZone(ZoneId.from(ZoneOffset.UTC))

def entityPath(entity: TypesInfo.Shredded.Type) =
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}/revision=${entity.schemaKey.version.revision}/addition=${entity.schemaKey.version.addition}"
def entityPath(entity: TypesInfo.Shredded.Type, legacyPartitioning: Boolean) =
if (legacyPartitioning)
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}"
else
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}/revision=${entity.schemaKey.version.revision}/addition=${entity.schemaKey.version.addition}"

def entityPathFull(base: BlobStorage.Folder, entity: TypesInfo.Shredded.Type): BlobStorage.Folder =
BlobStorage.Folder.append(base, entityPath(entity))
def entityPathFull(
base: BlobStorage.Folder,
entity: TypesInfo.Shredded.Type,
legacyPartitioning: Boolean
): BlobStorage.Folder =
BlobStorage.Folder.append(base, entityPath(entity, legacyPartitioning))

/**
* Remove all occurrences of access key id and secret access key from message Helps to avoid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,23 @@ object Databricks {
discovery: DataDiscovery,
eventTableColumns: EventTableColumns,
i: Unit,
disableMigration: List[SchemaCriterion]
disableMigration: List[SchemaCriterion],
legacyPartitioning: Boolean
): LoadStatements = {
val toCopy = columnsToCopyFromDiscoveredData(discovery)
val toSkip = ColumnsToSkip(getEntityColumnsPresentInDbOnly(eventTableColumns, toCopy))

NonEmptyList.one(loadAuthMethod =>
Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip, discovery.typesInfo, loadAuthMethod, i)
Statement.EventsCopy(
discovery.base,
discovery.compression,
toCopy,
toSkip,
discovery.typesInfo,
loadAuthMethod,
i,
legacyPartitioning
)
)
}

Expand Down Expand Up @@ -117,7 +127,7 @@ object Databricks {
sql"""COPY INTO $frTableName
FROM (SELECT _C0::VARCHAR(512) RUN_ID FROM '$frPath' $frAuth)
FILEFORMAT = CSV"""
case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod, _) =>
case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod, _, _) =>
val updatedPath = replaceScheme(path)
val frTableName = Fragment.const(qualify(EventsTable.MainName))
val frPath = Fragment.const0(updatedPath.append("output=good"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ class DatabricksSpec extends Specification {
)

val results = target
.getLoadStatements(discovery, eventsColumns, (), Nil)
.getLoadStatements(discovery, eventsColumns, (), Nil, false)
.map(f => f(LoadAuthMethod.NoCreds))

results should be like { case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _, _), Nil) =>
results should be like { case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _, _, _), Nil) =>
path must beEqualTo(baseFolder)
compression must beEqualTo(Compression.Gzip)

Expand Down Expand Up @@ -117,7 +117,8 @@ class DatabricksSpec extends Specification {
toSkip,
TypesInfo.WideRow(PARQUET, List.empty),
LoadAuthMethod.NoCreds,
()
(),
false
)

target.toFragment(statement).toString must beLike { case sql =>
Expand All @@ -143,7 +144,16 @@ class DatabricksSpec extends Specification {
)
val loadAuthMethod = LoadAuthMethod.TempCreds.AWS("testAccessKey", "testSecretKey", "testSessionToken", Instant.now.plusSeconds(3600))
val statement =
Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod, ())
Statement.EventsCopy(
baseFolder,
Compression.Gzip,
toCopy,
toSkip,
TypesInfo.WideRow(PARQUET, List.empty),
loadAuthMethod,
(),
false
)

target.toFragment(statement).toString must beLike { case sql =>
sql must contain(
Expand All @@ -169,7 +179,8 @@ class DatabricksSpec extends Specification {
toSkip,
TypesInfo.WideRow(PARQUET, List.empty),
LoadAuthMethod.NoCreds,
()
(),
false
)

val testTarget = Databricks
Expand Down Expand Up @@ -201,7 +212,16 @@ class DatabricksSpec extends Specification {
BlobStorage.Folder.coerce("https://test.blob.core.windows.net/test-container/path1/path2")
val loadAuthMethod = LoadAuthMethod.TempCreds.Azure("testToken", Instant.now.plusSeconds(3600))
val statement =
Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod, ())
Statement.EventsCopy(
baseFolder,
Compression.Gzip,
toCopy,
toSkip,
TypesInfo.WideRow(PARQUET, List.empty),
loadAuthMethod,
(),
false
)

target.toFragment(statement).toString must beLike { case sql =>
sql must contain(
Expand Down Expand Up @@ -243,7 +263,7 @@ object DatabricksSpec {
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil),
Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil, false),
exampleTelemetry
)

Expand Down
3 changes: 2 additions & 1 deletion modules/loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
},
"featureFlags": {
"addLoadTstampColumn": true,
"disableMigration": []
"disableMigration": [],
"legacyPartitioning": false
}
"telemetry": {
"disable": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,15 @@ object Loader {
start <- Clock[F].realTimeInstant
_ <- discovery.origin.timestamps.min.map(t => Monitoring[F].periodicMetrics.setEarliestKnownUnloadedData(t)).sequence.void
result <-
Load.load[F, C, I](setStageC, incrementAttemptsC, discovery, initQueryResult, target, config.featureFlags.disableMigration)
Load.load[F, C, I](
setStageC,
incrementAttemptsC,
discovery,
initQueryResult,
target,
config.featureFlags.disableMigration,
config.featureFlags.legacyPartitioning
)
attempts <- control.getAndResetAttempts
_ <- result match {
case Load.LoadSuccess(ingested) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ object Config {
backoff: FiniteDuration,
cumulativeBound: Option[FiniteDuration]
)
final case class FeatureFlags(addLoadTstampColumn: Boolean, disableMigration: List[SchemaCriterion])
final case class FeatureFlags(
addLoadTstampColumn: Boolean,
disableMigration: List[SchemaCriterion],
legacyPartitioning: Boolean
)

sealed trait Strategy
object Strategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ object Statement {
columnsToSkip: ColumnsToSkip,
typesInfo: TypesInfo,
loadAuthMethod: LoadAuthMethod,
initQueryResult: T
initQueryResult: T,
legacyPartitioning: Boolean
) extends Statement
with Loading {
def table: String = EventsTable.MainName
Expand All @@ -72,11 +73,12 @@ object Statement {
compression: Compression,
loadAuthMethod: LoadAuthMethod,
shredModel: ShredModel,
tableName: String
tableName: String,
legacyPartitioning: Boolean
) extends Statement
with Loading {
def table: String = shreddedType.info.getName
def path: String = shreddedType.getLoadPath
def path: String = shreddedType.getLoadPath(legacyPartitioning)
def title = s"COPY $table FROM $path"
}
case class CreateTempEventTable(table: String) extends Loading {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ trait Target[I] {
discovery: DataDiscovery,
eventTableColumns: EventTableColumns,
initQueryResult: I,
disableMigration: List[SchemaCriterion]
disableMigration: List[SchemaCriterion],
legacyPartitioning: Boolean
): LoadStatements

/** Get DDL of a manifest table */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ sealed trait ShreddedType {
def info: ShreddedType.Info

/** Get S3 prefix which DB should COPY FROM */
def getLoadPath: String
def getLoadPath(legacyPartitioning: Boolean): String

/** Human-readable form */
def show: String
Expand Down Expand Up @@ -60,8 +60,11 @@ object ShreddedType {
* existing JSONPaths file
*/
final case class Json(info: Info, jsonPaths: BlobStorage.Key) extends ShreddedType {
def getLoadPath: String =
s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=json/model=${info.version.model}/revision=${info.version.revision}/addition=${info.version.addition}"
def getLoadPath(legacyPartitioning: Boolean): String =
if (legacyPartitioning)
s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=json/model=${info.version.model}"
else
s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=json/model=${info.version.model}/revision=${info.version.revision}/addition=${info.version.addition}"

def show: String = s"${info.toCriterion.asString} ($jsonPaths)"
}
Expand All @@ -74,14 +77,17 @@ object ShreddedType {
* raw metadata extracted from S3 Key
*/
final case class Tabular(info: Info) extends ShreddedType {
def getLoadPath: String =
s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.version.model}/revision=${info.version.revision}/addition=${info.version.addition}"
def getLoadPath(legacyPartitioning: Boolean): String =
if (legacyPartitioning)
s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.version.model}"
else
s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.version.model}/revision=${info.version.revision}/addition=${info.version.addition}"

def show: String = s"${info.toCriterion.asString} TSV"
}

final case class Widerow(info: Info) extends ShreddedType {
def getLoadPath: String = s"${info.base}${Common.GoodPrefix}"
def getLoadPath(legacyPartitioning: Boolean): String = s"${info.base}${Common.GoodPrefix}"

def show: String = s"${info.toCriterion.asString} WIDEROW"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,17 @@ object Load {
discovery: DataDiscovery.WithOrigin,
initQueryResult: I,
target: Target[I],
disableMigration: List[SchemaCriterion]
disableMigration: List[SchemaCriterion],
legacyPartitioning: Boolean
): F[LoadResult] =
for {
_ <- TargetCheck.prepareTarget[F, C]
migrations <- Migration.build[F, C, I](discovery.discovery, target, disableMigration)
_ <- getPreTransactions(setStage, migrations.preTransaction, incrementAttempt).traverse_(Transaction[F, C].run(_))
result <- Transaction[F, C].transact {
getTransaction[C, I](setStage, discovery, initQueryResult, target, disableMigration)(migrations.inTransaction)
getTransaction[C, I](setStage, discovery, initQueryResult, target, disableMigration, legacyPartitioning)(
migrations.inTransaction
)
.onError { case _: Throwable => incrementAttempt }
}
} yield result
Expand Down Expand Up @@ -124,7 +127,8 @@ object Load {
discovery: DataDiscovery.WithOrigin,
initQueryResult: I,
target: Target[I],
disableMigration: List[SchemaCriterion]
disableMigration: List[SchemaCriterion],
legacyPartitioning: Boolean
)(
inTransactionMigrations: F[Unit]
): F[LoadResult] =
Expand All @@ -143,7 +147,7 @@ object Load {
Logging[F].info(s"Loading transaction for ${discovery.origin.base} has started") *>
setStage(Stage.MigrationIn) *>
inTransactionMigrations *>
run[F, I](setLoading, discovery.discovery, initQueryResult, target, disableMigration) *>
run[F, I](setLoading, discovery.discovery, initQueryResult, target, disableMigration, legacyPartitioning) *>
setStage(Stage.Committing) *>
Manifest.add[F](discovery.origin.toManifestItem) *>
Manifest
Expand Down Expand Up @@ -198,20 +202,22 @@ object Load {
discovery: DataDiscovery,
initQueryResult: I,
target: Target[I],
disableMigration: List[SchemaCriterion]
disableMigration: List[SchemaCriterion],
legacyPartitioning: Boolean
): F[Unit] =
for {
_ <- Logging[F].info(s"Loading ${discovery.base}")
existingEventTableColumns <- if (target.requiresEventsColumns) Control.getColumns[F](EventsTable.MainName) else Nil.pure[F]
_ <- target.getLoadStatements(discovery, existingEventTableColumns, initQueryResult, disableMigration).traverse_ { genStatement =>
for {
loadAuthMethod <- LoadAuthService[F].forLoadingEvents
// statement must be generated as late as possible, to have fresh and valid credentials.
statement = genStatement(loadAuthMethod)
_ <- Logging[F].info(statement.title)
_ <- setLoading(statement.table)
_ <- DAO[F].executeUpdate(statement, DAO.Purpose.Loading).void
} yield ()
_ <- target.getLoadStatements(discovery, existingEventTableColumns, initQueryResult, disableMigration, legacyPartitioning).traverse_ {
genStatement =>
for {
loadAuthMethod <- LoadAuthService[F].forLoadingEvents
// statement must be generated as late as possible, to have fresh and valid credentials.
statement = genStatement(loadAuthMethod)
_ <- Logging[F].info(statement.title)
_ <- setLoading(statement.table)
_ <- DAO[F].executeUpdate(statement, DAO.Purpose.Loading).void
} yield ()
}
_ <- Logging[F].info(s"Folder [${discovery.base}] has been loaded (not committed yet)")
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ object ConfigSpec {
1.hour
)
val exampleInitRetries: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour))
val exampleFeatureFlags: Config.FeatureFlags = Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil)
val exampleFeatureFlags: Config.FeatureFlags =
Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil, legacyPartitioning = false)
val exampleCloud: Config.Cloud = Config.Cloud.AWS(exampleRegion, exampleMessageQueue)
val exampleTelemetry =
Telemetry.Config(
Expand Down
Loading

0 comments on commit 5dba819

Please sign in to comment.