Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loader: Add legacy file hierarchy support #1320

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/loader/aws/redshift.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -275,5 +275,9 @@
# e.g. [ "iglu:com.example/myschema1/jsonschema/1-*-*", "iglu:com.example/myschema2/jsonschema/1-*-*"]
# Optional, empty list by default
"disableMigration": []

# Use legacy file hierarchy, i.e. vendor/name/format/model , to decide which path to load transformed events from
# This should be in sync with transformer configuration
"legacyPartitioning": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@
# When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation.
# Optional, default "false".
"truncateAtomicFields": false

# Use old directory structure, i.e. vendor/name/format/model , for transformed events or not
# This should be enabled during upgrade from older versions of the loader
"legacyPartitioning": false
}

# Observability and reporting options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@

# When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation.
# Optional, default "false".
"truncateAtomicFields": false
"truncateAtomicFields": false

# Use old directory structure, i.e. vendor/name/format/model , for transformed events or not
# This should be enabled during upgrade from older versions of the loader
"legacyPartitioning": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,5 +150,9 @@
# When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation.
# Optional, default "false".
"truncateAtomicFields": false

# Use old directory structure, i.e. vendor/name/format/model , for transformed events or not
# This should be enabled during upgrade from older versions of the loader
"legacyPartitioning": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,9 @@
# When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation.
# Optional, default "false".
"truncateAtomicFields": false

# Use old directory structure, i.e. vendor/name/format/model , for transformed events or not
# This should be enabled during upgrade from older versions of the loader
"legacyPartitioning": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
"featureFlags": {
"legacyMessageFormat": false,
"enableMaxRecordsPerFile": true,
"truncateAtomicFields": false
"truncateAtomicFields": false,
"legacyPartitioning": false
}

"monitoring": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object Processing {
source
.through(transform(transformer, config.validations, processor))
.through(incrementMetrics(resources.metrics))
.through(handleTransformResult(transformer))
.through(handleTransformResult(transformer, config.featureFlags.legacyPartitioning))
.through(windowing)

val sink: Pipe[F, Record[Window, List[(SinkPath, Transformed.Data)], State[C]], Unit] =
Expand Down Expand Up @@ -226,14 +226,15 @@ object Processing {
* to where it should sink. Processes in batches for efficiency.
*/
def handleTransformResult[F[_], C: Checkpointer[F, *]](
transformer: Transformer[F]
transformer: Transformer[F],
legacyPartitioning: Boolean
): Pipe[F, TransformationResults[C], SerializationResults[C]] =
_.map { case (items, checkpointer) =>
val state = State.fromEvents(items).withCheckpointer(checkpointer)
val mapped = items.flatMap(
_.fold(
bad => transformer.badTransform(bad).split :: Nil,
success => success.output.map(_.split)
bad => transformer.badTransform(bad).split(legacyPartitioning) :: Nil,
success => success.output.map(_.split(legacyPartitioning))
)
)
(mapped, state)
Expand All @@ -255,11 +256,15 @@ object Processing {
}

implicit class TransformedOps(t: Transformed) {
def getPath: SinkPath = t match {
def getPath(legacyPartitioning: Boolean): SinkPath = t match {
case p: Transformed.Shredded =>
val suffix = Some(
s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}"
)
val suffix =
if (legacyPartitioning)
Some(s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/")
else
Some(
s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}"
)
val pathType = if (p.isGood) SinkPath.PathType.Good else SinkPath.PathType.Bad
SinkPath(suffix, pathType)
case p: Transformed.WideRow =>
Expand All @@ -269,6 +274,6 @@ object Processing {
case _: Transformed.Parquet =>
SinkPath(None, SinkPath.PathType.Good)
}
def split: (SinkPath, Transformed.Data) = (getPath, t.data)
def split(legacyPartitioning: Boolean): (SinkPath, Transformed.Data) = (getPath(legacyPartitioning), t.data)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
inputEventsPath = "/processing-spec/1/input/events"
)

val config = TransformerConfig(appConfig(outputDirectory), igluConfig)
val config = TransformerConfig(appConfig(outputDirectory, false), igluConfig)

for {
output <- process(inputStream, config)
Expand Down Expand Up @@ -84,7 +84,7 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
inputEventsPath = "/processing-spec/3/input/events"
)

val config = TransformerConfig(appConfig(outputDirectory), igluConfig)
val config = TransformerConfig(appConfig(outputDirectory, false), igluConfig)

for {
output <- process(inputStream, config)
Expand Down Expand Up @@ -114,11 +114,71 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
}
.unsafeRunSync()
}

"respect legacyPartitioning flag" in {
temporaryDirectory
.use { outputDirectory =>
val inputStream = InputEventsProvider.eventStream(
inputEventsPath = "/processing-spec/1/input/events"
)

val config = TransformerConfig(appConfig(outputDirectory, true), igluConfig)

for {
output <- process(inputStream, config)
actualAtomicRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1"
)
)
actualOptimizelyRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1"
)
)
actualConsentRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1"
)
)
actualBadRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2"
)
)

expectedCompletionMessage <- readMessageFromResource("/processing-spec/1/output/good/tsv/completion.json", outputDirectory)
expectedAtomicRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-atomic")
expectedOptimizelyRows <- readLinesFromResource("/processing-spec/1/output/good/tsv/com.optimizely-state")
expectedConsentRows <-
readLinesFromResource("/processing-spec/1/output/good/tsv/com.snowplowanalytics.snowplow-consent_document")
expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad")
} yield {
removeAppId(output.completionMessages.toList) must beEqualTo(Vector(expectedCompletionMessage))
output.checkpointed must beEqualTo(1)

assertStringRows(removeAppId(actualAtomicRows), expectedAtomicRows)
assertStringRows(removeAppId(actualOptimizelyRows), expectedOptimizelyRows)
assertStringRows(removeAppId(actualConsentRows), expectedConsentRows)

assertStringRows(removeAppId(actualBadRows), expectedBadRows)
}
}
.unsafeRunSync()
}
}
}

object ShredTsvProcessingSpec {
private val appConfig = (outputPath: Path) => s"""|{
private val appConfig = (outputPath: Path, legacyPartitioning: Boolean) => s"""|{
| "input": {
| "type": "pubsub"
| "subscription": "projects/project-id/subscriptions/subscription-id"
Expand All @@ -137,6 +197,9 @@ object ShredTsvProcessingSpec {
| "region": "eu-central-1"
| }
| "windowing": "1 minute"
| "featureFlags": {
| "legacyPartitioning": $legacyPartitioning
| }
| "formats": {
| "transformationType": "shred"
| "default": "TSV"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ class TransformingSpec extends Specification {
val testFileNameMap = List(
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "atomic", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-atomic",
.getPath(false) -> "com.snowplowanalytics.snowplow-atomic",
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-consent_document",
Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath -> "com.optimizely-state"
.getPath(false) -> "com.snowplowanalytics.snowplow-consent_document",
Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath(false) -> "com.optimizely-state"
).toMap

val expectedTransformedMap =
Expand Down Expand Up @@ -149,7 +149,7 @@ object TransformingSpec {

val eventStream = parsedEventStream(resourcePath)
.through(Processing.transform(transformer, validations, TestProcessor))
.through(Processing.handleTransformResult(transformer))
.through(Processing.handleTransformResult(transformer, legacyPartitioning = false))

val transformed = eventStream.compile.toList.unsafeRunSync().flatMap(_._1)
(transformed.flatMap(_.getGood), transformed.flatMap(_.getBad))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,22 @@ object Common {
val AtomicSchema: SchemaKey =
SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0))
val AtomicType = TypesInfo.Shredded.Type(AtomicSchema, TypesInfo.Shredded.ShreddedFormat.TSV, SnowplowEntity.SelfDescribingEvent)
val AtomicPath: String = entityPath(AtomicType)

val FolderTimeFormatter: DateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss").withZone(ZoneId.from(ZoneOffset.UTC))

def entityPath(entity: TypesInfo.Shredded.Type) =
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}/revision=${entity.schemaKey.version.revision}/addition=${entity.schemaKey.version.addition}"
def entityPath(entity: TypesInfo.Shredded.Type, legacyPartitioning: Boolean) =
if (legacyPartitioning)
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}"
else
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}/revision=${entity.schemaKey.version.revision}/addition=${entity.schemaKey.version.addition}"

def entityPathFull(base: BlobStorage.Folder, entity: TypesInfo.Shredded.Type): BlobStorage.Folder =
BlobStorage.Folder.append(base, entityPath(entity))
def entityPathFull(
base: BlobStorage.Folder,
entity: TypesInfo.Shredded.Type,
legacyPartitioning: Boolean
): BlobStorage.Folder =
BlobStorage.Folder.append(base, entityPath(entity, legacyPartitioning))

/**
* Remove all occurrences of access key id and secret access key from message Helps to avoid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ object TransformerConfig {
legacyMessageFormat: Boolean,
sparkCacheEnabled: Option[Boolean],
enableMaxRecordsPerFile: Boolean,
truncateAtomicFields: Boolean
truncateAtomicFields: Boolean,
legacyPartitioning: Boolean
)

object FeatureFlags {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,23 @@ object Databricks {
discovery: DataDiscovery,
eventTableColumns: EventTableColumns,
i: Unit,
disableMigration: List[SchemaCriterion]
disableMigration: List[SchemaCriterion],
legacyPartitioning: Boolean
): LoadStatements = {
val toCopy = columnsToCopyFromDiscoveredData(discovery)
val toSkip = ColumnsToSkip(getEntityColumnsPresentInDbOnly(eventTableColumns, toCopy))

NonEmptyList.one(loadAuthMethod =>
Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip, discovery.typesInfo, loadAuthMethod, i)
Statement.EventsCopy(
discovery.base,
discovery.compression,
toCopy,
toSkip,
discovery.typesInfo,
loadAuthMethod,
i,
legacyPartitioning
)
)
}

Expand Down Expand Up @@ -117,7 +127,7 @@ object Databricks {
sql"""COPY INTO $frTableName
FROM (SELECT _C0::VARCHAR(512) RUN_ID FROM '$frPath' $frAuth)
FILEFORMAT = CSV"""
case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod, _) =>
case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod, _, _) =>
val updatedPath = replaceScheme(path)
val frTableName = Fragment.const(qualify(EventsTable.MainName))
val frPath = Fragment.const0(updatedPath.append("output=good"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ class DatabricksSpec extends Specification {
)

val results = target
.getLoadStatements(discovery, eventsColumns, (), Nil)
.getLoadStatements(discovery, eventsColumns, (), Nil, false)
.map(f => f(LoadAuthMethod.NoCreds))

results should be like { case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _, _), Nil) =>
results should be like { case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _, _, _), Nil) =>
path must beEqualTo(baseFolder)
compression must beEqualTo(Compression.Gzip)

Expand Down Expand Up @@ -117,7 +117,8 @@ class DatabricksSpec extends Specification {
toSkip,
TypesInfo.WideRow(PARQUET, List.empty),
LoadAuthMethod.NoCreds,
()
(),
false
)

target.toFragment(statement).toString must beLike { case sql =>
Expand All @@ -143,7 +144,16 @@ class DatabricksSpec extends Specification {
)
val loadAuthMethod = LoadAuthMethod.TempCreds.AWS("testAccessKey", "testSecretKey", "testSessionToken", Instant.now.plusSeconds(3600))
val statement =
Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod, ())
Statement.EventsCopy(
baseFolder,
Compression.Gzip,
toCopy,
toSkip,
TypesInfo.WideRow(PARQUET, List.empty),
loadAuthMethod,
(),
false
)

target.toFragment(statement).toString must beLike { case sql =>
sql must contain(
Expand All @@ -169,7 +179,8 @@ class DatabricksSpec extends Specification {
toSkip,
TypesInfo.WideRow(PARQUET, List.empty),
LoadAuthMethod.NoCreds,
()
(),
false
)

val testTarget = Databricks
Expand Down Expand Up @@ -201,7 +212,16 @@ class DatabricksSpec extends Specification {
BlobStorage.Folder.coerce("https://test.blob.core.windows.net/test-container/path1/path2")
val loadAuthMethod = LoadAuthMethod.TempCreds.Azure("testToken", Instant.now.plusSeconds(3600))
val statement =
Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod, ())
Statement.EventsCopy(
baseFolder,
Compression.Gzip,
toCopy,
toSkip,
TypesInfo.WideRow(PARQUET, List.empty),
loadAuthMethod,
(),
false
)

target.toFragment(statement).toString must beLike { case sql =>
sql must contain(
Expand Down Expand Up @@ -243,7 +263,7 @@ object DatabricksSpec {
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil),
Config.FeatureFlags(addLoadTstampColumn = true, disableMigration = Nil, false),
exampleTelemetry
)

Expand Down
Loading
Loading