Skip to content

Commit

Permalink
add schema validation to concepts (#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
sehz authored Oct 26, 2024
1 parent aea5e08 commit afe098d
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 0 deletions.
2 changes: 2 additions & 0 deletions sdf/_embeds/schema/config-json.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
config:
converter: json
53 changes: 53 additions & 0 deletions sdf/_embeds/schema/dataflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
apiVersion: 0.5.0
meta:
name: person-age-validation
version: 0.1.0
namespace: examples
config:
converter: json
consumer:
default_starting_offset:
value: 0
position: End

types:
user:
type: object
properties:
name:
type: string
age:
type: u8

topics:
user-topic:
name: user
schema:
value:
type: user

message-topic:
name: message
schema:
value:
type: string


services:
check-adult:
sources:
- type: topic
id: user-topic
transforms:
- operator: map
run: |
fn age_check(user: User) -> Result<String> {
if user.age < 18 {
Ok("minor".to_string())
} else {
Ok("adult".to_string())
}
}
sinks:
- type: topic
id: message-topic
2 changes: 2 additions & 0 deletions sdf/_embeds/schema/schema-error.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
$ sdf log -f
Error deserializing input value ExpectedUnsigned at character 0
11 changes: 11 additions & 0 deletions sdf/_embeds/schema/show-state.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
>> show state
Namespace Keys Type
check-adult/user-topic/topic.offset 1 offset
check-adult/age-check/metrics 1 table
request-processing/request/metrics 1 table
request-processing/request/topic.offset 1 offset
check-adult/user-topic/metrics 1 table
>> show state check-adult/user-topic/metrics
Key Window succeeded failed last_error_offset
stats * 4 2 5
>>
5 changes: 5 additions & 0 deletions sdf/_embeds/schema/topic-schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
topics:
persons:
schema:
value:
type: person
4 changes: 4 additions & 0 deletions sdf/_embeds/schema/user-bad.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"name": "joe",
"age": "30"
}
4 changes: 4 additions & 0 deletions sdf/_embeds/schema/user-good.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"name": "joe",
"age": 30
}
8 changes: 8 additions & 0 deletions sdf/_embeds/schema/user-good.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
types:
person:
type: object
properties:
name:
type: string
weight:
type: u8
5 changes: 5 additions & 0 deletions sdf/_embeds/schema/version.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apiVersion: 0.5.0
meta:
name: person-age-validation
version: 0.1.0
namespace: examples
89 changes: 89 additions & 0 deletions sdf/concepts/schema_validation.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
---
title: Schema Validation
description: Schema Validation
sidebar_position: 60
---

import CodeBlock from '@theme/CodeBlock';
import UserGood from '!!raw-loader!../_embeds/schema/user-good.yaml';
import TypeSchema from '!!raw-loader!../_embeds/schema/topic-schema.yaml';
import JsonConfig from '!!raw-loader!../_embeds/schema/config-json.yaml';
import UserGoodData from '!!raw-loader!../_embeds/schema/user-good.json';
import UserBadData from '!!raw-loader!../_embeds/schema/user-bad.json';
import ShowState from '!!raw-loader!../_embeds/schema/show-state.txt';
import SchemaErrorLog from '!!raw-loader!../_embeds/schema/schema-error.log';
import Version from '!!raw-loader!../_embeds/schema/version.yaml';
import DataFlow from '!!raw-loader!../_embeds/schema/dataflow.yaml';

SDF provides a schema validation feature to ensure that the data flowing through the dataflow is in the correct format.

# Schema

First step is to define data schema. Schema is defined in the [types] section of the dataflow. The types can be define as inline or in the package which can be shared across multiple dataflows.

For example, the following is a simple object type representing a person.

<CodeBlock language="yaml">{UserGood}</CodeBlock>

To enforce schema, all you have to is to specify the schema in the `topic` section. For example, the following is a topic definition with schema:

<CodeBlock language="yaml">{TypeSchema}</CodeBlock>

Schema can be enforced for both key and value part of the record.

Once defined, it can used to enforce the schema on the data from the source. The enforcement is specific to serialization format. Currently, SDF supports JSON serialization format but it can be extended to other formats in the future.

The serialization format is defined int the configuration section:

<CodeBlock language="yaml">{JsonConfig}</CodeBlock>

This will use `json` for all topics. But you can override per topic.


Given the schema above, the following JSON object will pass the schema validation:

<CodeBlock language="json">{UserGoodData}</CodeBlock>

However, the following JSON object will fail the schema validation:

<CodeBlock language="json">{UserBadData}</CodeBlock>

The schema validation error will be reported in the [operator log]. The error message will indicate the field that failed the validation. The failed record will be skipped and the dataflow will continue to process the next record.

For example, with bad user data above, the error message will be:

<CodeBlock language="bash">{SchemaErrorLog}</CodeBlock>

Number of failed records will be also reflected in the internal metrics. The metrics can be accessed via the `sdf show state <operator>/metrics` command.

<CodeBlock>{ShowState}</CodeBlock>

The SDF type supports following concepts in the schema:
- primitive types such as string, integer, float, boolean.
- enum types
- composite objects with nested properties
- array or list of objects

# Versioning

Inline schema's version is inherited from dataflow version. If you want to version the schema, you can define the schema in the package and then version the package. The versioned package can be then used in the dataflow.

The schema package then can be published to [Hub] and imported into the dataflow.

Version follows semver syntax. For example, the following is a versioned schema package:

<CodeBlock language="yaml">{Version}</CodeBlock>

The `apiVersion` is the pkg syntax version and `version` in the `meta` section is the schema version.


# Dataflow

Full dataflow is defined as follows:
<CodeBlock language="yaml">{DataFlow}</CodeBlock>



[types]: /sdf/concepts/types.mdx
[operator log]: /sdf/cli/log.mdx
[Hub]: /sdf/composition/hub.mdx

0 comments on commit afe098d

Please sign in to comment.