Skip to content

Commit

Permalink
Include 'supersedes' field to schema output (close #144)
Browse files Browse the repository at this point in the history
  • Loading branch information
stanch authored and spenes committed Oct 17, 2023
1 parent bfbed2c commit f1204d1
Show file tree
Hide file tree
Showing 19 changed files with 546 additions and 388 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ object Server {
client <- BlazeClientBuilder[IO](httpPool).resource
webhookClient = Webhook.WebhookClient(config.webhooks, client)
storage <- Storage.initialize[IO](config.database)
_ <- Resource.eval(storage.addSupersededByColumn)
_ <- Resource.eval(storage.runAutomaticMigrations)
cache <- CachingMiddleware.initResponseCache[IO](1000, CacheTtl)
blocker <- Blocker[IO],
} yield BlazeServerBuilder[IO](httpPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ END';"""

body JSON NOT NULL,

superseded_by VARCHAR(128) NULL
superseded_by VARCHAR(32) NULL,
supersedes VARCHAR(32) ARRAY NULL
)""")

val draftsCreate = (fr"CREATE TABLE IF NOT EXISTS" ++ Postgres.DraftsTable ++ fr"""(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ object Fifth {
val previousPublic = schemas.forall(_.isPublic)
val versions = schemas.map(_.map.schemaKey.version)
if ((previousPublic && isPublic) || (!previousPublic && !isPublic) || schemas.isEmpty)
VersionCursor.isAllowed(current.schemaKey.version, versions, patchesAllowed = false)
VersionCursor.isAllowed(current.schemaKey.version, versions, patchesAllowed = false, List.empty)
else
Inconsistency.Availability(isPublic, previousPublic).asLeft
}
Expand Down
87 changes: 49 additions & 38 deletions src/main/scala/com/snowplowanalytics/iglu/server/model/Schema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ package com.snowplowanalytics.iglu.server.model

import java.time.Instant

import cats.data.NonEmptyList
import cats.implicits._

import io.circe._
import io.circe.syntax._
import io.circe.{Decoder, Encoder, FailedCursor, Json}
import io.circe.{Decoder, Encoder, Json}
import io.circe.generic.semiauto._

import doobie._
Expand All @@ -33,10 +32,15 @@ import com.snowplowanalytics.iglu.core.circe.implicits._

import Schema.Metadata

case class Schema(schemaMap: SchemaMap, metadata: Metadata, body: Json, supersededBy: Option[SchemaVer.Full]) {
case class Schema(
schemaMap: SchemaMap,
metadata: Metadata,
body: Json,
supersedingInfo: Schema.SupersedingInfo
) {
def withFormat(repr: Schema.Repr.Format): Schema.Repr = repr match {
case Schema.Repr.Format.Canonical =>
Schema.Repr.Canonical(canonical, supersededBy)
Schema.Repr.Canonical(canonical, supersedingInfo)
case Schema.Repr.Format.Uri =>
Schema.Repr.Uri(schemaMap.schemaKey)
case Schema.Repr.Format.Meta =>
Expand Down Expand Up @@ -70,7 +74,7 @@ object Schema {
object Repr {

/** Canonical self-describing representation */
case class Canonical(schema: SelfDescribingSchema[Json], supersededBy: Option[SchemaVer.Full]) extends Repr
case class Canonical(schema: SelfDescribingSchema[Json], supersedingInfo: SupersedingInfo) extends Repr

/** Non-vanilla representation for UIs/non-validation clients */
case class Full(schema: Schema) extends Repr
Expand All @@ -80,8 +84,8 @@ object Schema {

def apply(schema: Schema): Repr = Full(schema)
def apply(uri: SchemaMap): Repr = Uri(uri.schemaKey)
def apply(schema: SelfDescribingSchema[Json], supersededBy: Option[SchemaVer.Full]): Repr =
Canonical(schema, supersededBy)
def apply(schema: SelfDescribingSchema[Json], supersedingInfo: SupersedingInfo): Repr =
Canonical(schema, supersedingInfo)

sealed trait Format extends Product with Serializable
object Format {
Expand All @@ -101,15 +105,14 @@ object Schema {

sealed trait SchemaBody extends Product with Serializable
object SchemaBody {
case class SelfDescribing(schema: SelfDescribingSchema[Json], supersedingInfo: Option[SupersedingInfo])
extends SchemaBody
case class BodyOnly(schema: Json, supersedingInfo: Option[SupersedingInfo]) extends SchemaBody
case class SelfDescribing(schema: SelfDescribingSchema[Json], supersedingInfo: SupersedingInfo) extends SchemaBody
case class BodyOnly(schema: Json, supersedingInfo: SupersedingInfo) extends SchemaBody

implicit val schemaBodyCirceDecoder: Decoder[SchemaBody] =
Decoder.instance { cursor =>
for {
removed <- SupersedingInfo.removeSupersedingInfoFields(cursor)
supersedingInfo <- cursor.as[Option[SupersedingInfo]]
supersedingInfo <- cursor.as[SupersedingInfo]
res <- SelfDescribingSchema.parse(removed) match {
case Right(schema) => SelfDescribing(schema, supersedingInfo).asRight
case Left(_) => removed.as[JsonObject].map(obj => BodyOnly(Json.fromJsonObject(obj), supersedingInfo))
Expand All @@ -118,29 +121,35 @@ object Schema {
}
}

sealed trait SupersedingInfo extends Product with Serializable
case class SupersedingInfo(supersededBy: Option[SchemaVer.Full], supersedes: List[SchemaVer.Full])

object SupersedingInfo {
case class SupersededBy(version: SchemaVer.Full) extends SupersedingInfo
case class Superseded(versions: NonEmptyList[SchemaVer.Full]) extends SupersedingInfo
def empty = SupersedingInfo(Option.empty, List.empty)

implicit val supersedingInfoDecoder: Decoder[Option[SupersedingInfo]] =
implicit val supersedingInfoDecoder: Decoder[SupersedingInfo] =
Decoder.instance { json =>
json.downField(SupersededByField) match {
case _: FailedCursor =>
json.downField(SupersedesField) match {
case _: FailedCursor => None.asRight
case c => c.as[NonEmptyList[SchemaVer.Full]].map(Superseded(_).some)
}
case c => c.as[SchemaVer.Full].map(SupersededBy(_).some)
}
for {
supersededBy <- json.getOrElse[Option[SchemaVer.Full]](SupersededByField)(None)
supersedes <- json.getOrElse[List[SchemaVer.Full]](SupersedesField)(List.empty)
} yield SupersedingInfo(supersededBy, supersedes)
}

implicit val supersedingInfoEncoder: Encoder[SupersedingInfo] =
Encoder.instance { info =>
Json
.obj(
SupersededByField -> info.supersededBy.map(_.asString).asJson,
SupersedesField -> info.supersedes.map(_.asString).asJson
)
.dropNullValues
.dropEmptyValues
}

def removeSupersedingInfoFields(json: HCursor) =
for {
map <- json.as[JsonObject].map(_.toMap)
r = map - SupersededByField - SupersedesField
j = Json.fromJsonObject(JsonObject.fromMap(r))
} yield j
obj <- json.as[JsonObject]
removed = obj.remove(SupersededByField).remove(SupersedesField)
} yield Json.fromJsonObject(removed)
}

sealed trait Format extends Product with Serializable
Expand Down Expand Up @@ -171,31 +180,28 @@ object Schema {
case None => schema
}

private def supersededByJson(supersededBy: Option[SchemaVer.Full]): Json =
supersededBy.map(v => Json.obj(SupersededByField -> v.asString.asJson)).getOrElse(JsonObject.empty.asJson)

implicit val schemaEncoder: Encoder[Schema] =
Encoder.instance { schema =>
Json
.obj(
"self" -> schema.schemaMap.asJson,
"metadata" -> schema.metadata.asJson(Metadata.metadataEncoder)
)
.deepMerge(supersededByJson(schema.supersededBy))
.deepMerge(schema.supersedingInfo.asJson)
.deepMerge(schema.body)
}

implicit val representationEncoder: Encoder[Repr] =
Encoder.instance {
case Repr.Full(s) => orderedSchema(schemaEncoder.apply(s))
case Repr.Uri(u) => Encoder[String].apply(u.toSchemaUri)
case Repr.Canonical(schema, supersededBy) =>
case Repr.Canonical(schema, supersedingInfo) =>
orderedSchema {
Json
.obj(
s"$$schema" -> CanonicalUri.asJson
)
.deepMerge(supersededByJson(supersededBy))
.deepMerge(supersedingInfo.asJson)
.deepMerge(schema.normalize)
}
}
Expand All @@ -209,16 +215,21 @@ object Schema {
body = bodyJson.toList.filterNot {
case (key, _) => List("self", "metadata", SupersededByField, SupersedesField).contains(key)
}
supersededBy <- cursor.downField(SupersededByField).as[Option[SchemaVer.Full]]
} yield Schema(self, meta, Json.fromFields(body), supersededBy)
supersedingInfo <- cursor.value.as[SupersedingInfo]
} yield Schema(self, meta, Json.fromFields(body), supersedingInfo)
}

implicit val schemaVerFull: Read[Option[SchemaVer.Full]] =
Read[Option[String]].map(_.flatMap(v => SchemaVer.parseFull(v).toOption))

implicit val schemaVerFullList: Read[List[SchemaVer.Full]] =
Read[Option[List[String]]].map(_.fold(List.empty[SchemaVer.Full]) {
_.flatMap(v => SchemaVer.parseFull(v).toOption)
})

implicit val schemaDoobieRead: Read[Schema] =
Read[(SchemaMap, Metadata, Json, Option[SchemaVer.Full])].map {
case (schemaMap, meta, body, supersededBy) =>
Schema(schemaMap, meta, body, supersededBy)
Read[(SchemaMap, Metadata, Json, Option[SchemaVer.Full], List[SchemaVer.Full])].map {
case (schemaMap, meta, body, supersededBy, supersedes) =>
Schema(schemaMap, meta, body, SupersedingInfo(supersededBy, supersedes))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package com.snowplowanalytics.iglu.server.model

import cats.Show
import cats.syntax.either._

import com.snowplowanalytics.iglu.core.SchemaVer

/** ADT representing a place of schema in SchemaVer tree */
Expand All @@ -39,13 +38,19 @@ object VersionCursor {
sealed trait Inconsistency extends Product with Serializable
object Inconsistency {
case object PreviousMissing extends Inconsistency
case object SupersededMissing extends Inconsistency
case object SupersededInvalid extends Inconsistency
case object AlreadyExists extends Inconsistency
case class Availability(isPublic: Boolean, previousPublic: Boolean) extends Inconsistency

implicit val inconsistencyShowInstance: Show[Inconsistency] =
Show.show {
case PreviousMissing =>
"Preceding SchemaVer in the group is missing, check that schemas published in proper order"
case SupersededMissing =>
s"Superseded schema version(s) do not exist"
case SupersededInvalid =>
s"Superseded schema version(s) must be below the superseding version"
case AlreadyExists =>
"Schema already exists"
case Availability(isPublic, previousPublic) =>
Expand All @@ -56,11 +61,14 @@ object VersionCursor {
def isAllowed(
version: SchemaVer.Full,
existing: List[SchemaVer.Full],
patchesAllowed: Boolean
patchesAllowed: Boolean,
supersedes: List[SchemaVer.Full]
): Either[Inconsistency, Unit] =
if (existing.contains(version) && !patchesAllowed) Inconsistency.AlreadyExists.asLeft
else if (previousExists(existing, get(version))) ().asRight
else Inconsistency.PreviousMissing.asLeft
else if (!previousExists(existing, get(version))) Inconsistency.PreviousMissing.asLeft
else if (supersedes.diff(existing).nonEmpty) Inconsistency.SupersededMissing.asLeft
else if (supersedes.exists(Ordering[SchemaVer.Full].gt(_, version))) Inconsistency.SupersededInvalid.asLeft
else ().asRight

def get(version: SchemaVer.Full): VersionCursor = version match {
case SchemaVer.Full(1, 0, 0) => Initial
Expand Down
Loading

0 comments on commit f1204d1

Please sign in to comment.