Skip to content

Commit

Permalink
Update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Schulte, Johannes committed Mar 5, 2021
1 parent df32762 commit 02bdd02
Showing 1 changed file with 19 additions and 6 deletions.
25 changes: 19 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ Convert schemaless, primitive kafka messages into ConnectData

## Why?

Most sink connectors in kafka-connect require a schema to insert the data. When dealing with topics that contain plain JSON records it's not possible to insert them them without an inline schema, as explained [here](https://rmoff.net/2020/01/22/kafka-connect-and-schemas/)
Most sink connectors in kafka-connect require a schema to insert the data. When dealing with topics that contain plain JSON records it's not possible to insert them them without an inline schema, as explained [here](https://www.confluent.de/blog/kafka-connect-deep-dive-converters-serialization-explained/#string-json-data)

Sometimes it is just not possible to change the JSON records to include the schema and we are okay with having the json "as-is" in the sink system. This still allows a "schema-on-read"-strategy, supported by RDBMS like MySql and Postgres with their JSON data types.

See [Further options](#further-options)
## Installation

Copy a release jar from this website into your [connect plugin path](https://docs.confluent.io/home/connect/userguide.html#installing-kconnect-plugins)
Expand All @@ -34,8 +35,8 @@ and the following table created in the sink db

```
create table `super_event` (
`ID` VARCHAR(255) NOT NULL ,
`VALUE` JSON ,
`ID` VARCHAR(255) NOT NULL,
`VALUE` JSON,
PRIMARY KEY (ID)
) ENGINE = InnoDB
```
Expand All @@ -48,11 +49,23 @@ topics=super_event
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.baunz.kafka.connect.StringAsStructConverter
connection.url=jdbc=mysql=//awesome-db/
insert.mode= UPSERT,
insert.mode= UPSERT
pk.mode=record_key
pk.fields=id
```

creates a connector that fills the target table
creates a connector that fills the target table and allows the data to be queried like this (mysql example)


```
select
value->>'$.key',
value->>'$.payload.profileId'
from
super_event;
```
## Further Options

* Apply a schema to the raw JSON to via transforms (https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-json-schema/transformations/FromJson.html)

## Further
* Attach a schema with KSQL (https://rmoff.net/2020/01/22/kafka-connect-and-schemas/)

0 comments on commit 02bdd02

Please sign in to comment.