-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade schema-ddl to 0.19.4 #1230
Conversation
oguzhanunlu
commented
Apr 4, 2023
•
edited
Loading
edited
- batch transformer is to be tested
3b43079
to
d99cc98
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! Turned out to be smaller than my expectations.
It would be nice to have Processing.spec
changes for batch and stream. As they are the closest thing that we got to the integration test.
...c/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Flattening.scala
Outdated
Show resolved
Hide resolved
07f3af8
to
71ac670
Compare
modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala
Outdated
Show resolved
Hide resolved
eecc449
to
2314134
Compare
2314134
to
52ecf5d
Compare
modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala
Outdated
Show resolved
Hide resolved
...redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala
Outdated
Show resolved
Hide resolved
...redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala
Outdated
Show resolved
Hide resolved
...redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala
Outdated
Show resolved
Hide resolved
52ecf5d
to
2446613
Compare
...redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala
Outdated
Show resolved
Hide resolved
...redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala
Outdated
Show resolved
Hide resolved
...redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala
Outdated
Show resolved
Hide resolved
modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala
Outdated
Show resolved
Hide resolved
e9ba726
to
fb8d691
Compare
rebased on top of develop hence force push, fixed table creation in migration which got broken after splitting statements, tests still continue |
fb8d691
to
0754c62
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good @oguzhanunlu ! I need to see your changes to schema-ddl before I can finish the review.
Also pointing out that #1287 will need rebasing onto your latest changes in this PR.
|
||
package object transformation { | ||
|
||
private val Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") | ||
|
||
type PropertiesKey = (SchemaListKey, StorageTime) | ||
type PropertiesCache[F[_]] = LruMap[F, PropertiesKey, Properties] | ||
type ShredModelCache[F[_]] = LruMap[F, SchemaKey, ShredModel] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous version we used StorageTime
as part of the key for the cache. We did this so that the cache entry expires in sync with when the iglu-scala-client cache expires. Is it possible to keep StorageTime
as part of the cache key in the new version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, just pushed a new commit
@@ -191,7 +246,7 @@ object Redshift { | |||
| ACCEPTINVCHARS | |||
| $frCompression""".stripMargin | |||
case ShreddedType.Tabular(_) => | |||
sql"""COPY $frTableName FROM '$frPath' | |||
sql"""COPY $frTableName ($frColumns) FROM '$frPath' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
val frPath = Fragment.const0(shreddedType.getLoadPath) | ||
val frCredentials = loadAuthMethodFragment(loadAuthMethod, storage.roleArn) | ||
val frRegion = Fragment.const0(region.name) | ||
val frMaxError = Fragment.const0(maxError.toString) | ||
val frCompression = getCompressionFormat(compression) | ||
val extraCols = ShredModelEntry.extraCols.map(_._1.replaceAll(""""""", "")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you have local changes to schema-ddl which you have not pushed to github yet? I think I need to see those changes before I comment on this section.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, I just pushed a new commit at snowplow/schema-ddl#193
columns: List[String] | ||
wideColumns: List[String], | ||
shredModels: Map[SchemaKey, MergeRedshiftSchemasResult], | ||
disableMigration: List[SchemaCriterion] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you copy disableMigration
from the config into the DataDiscovery
case class. You do this so that you can use it in the Migration
object.
Did you consider instead simply passing the relevant config down into Migration
object but without adding it to the DataDiscovery
class?
I don't know if that would look neater or not. Ultimately we should do whichever implementation looks neatest.
I suggest the alternative because DataDiscovery
kinda represents information we discover dynamically from the message queue or from Iglu. And disableMigration
does not seem to fit that description.
But I will leave it to your judgement whether your implementation is neater than my suggestion. I haven't thought through the details or the impact on the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getLoadStatements
also use disableMigration
to decide which table name should be used but I agree with your sentiment, I just pushed a new commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
@@ -62,7 +63,8 @@ object Databricks { | |||
|
|||
override def initQuery[F[_]: DAO: Monad]: F[Unit] = Monad[F].unit | |||
|
|||
override def createTable(schemas: SchemaList): Block = Block(Nil, Nil, Entity.Table(tgt.schema, schemas.latest.schemaKey)) | |||
override def createTable(shredModel: ShredModel): Block = | |||
Block(Nil, Nil, Entity.Table(tgt.schema, shredModel.schemaKey, shredModel.tableName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I right that this createTable
should never get called for Databricks? Or for Snowflake?
If yes, I think I'd prefer to see it implemented as:
override def createTable(shredModel: ShredModel): Block =
throw new IllegalStateException("createTable should never be called for Databricks")
Otherwise, I find it confusing to see it returns an object using shredModel.tableName
.
e9354fa
to
b5e190a
Compare
b5e190a
to
4fa257f
Compare
c4091e3
to
bc229f8
Compare