Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(targets): SQL target developers can now more easily override the mapping from JSON schema to SQL column type #2732

Merged
merged 14 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/classes/singer_sdk.connectors.sql.JSONSchemaToSQL.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
singer_sdk.connectors.sql.JSONSchemaToSQL
=========================================

.. currentmodule:: singer_sdk.connectors.sql

.. autoclass:: JSONSchemaToSQL
:members:
:special-members: __init__, __call__
1 change: 1 addition & 0 deletions docs/guides/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ pagination-classes
custom-clis
config-schema
sql-tap
sql-target
```
52 changes: 52 additions & 0 deletions docs/guides/sql-target.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Building SQL targets

## Mapping JSON Schema to SQL types

Starting with version `0.42.0`, the Meltano Singer SDK provides a clean way to map JSON Schema to SQL types. This is useful when the SQL dialect needs to do special handling for certain JSON Schema types.

### Custom JSON Schema mapping

If the default [`JSONSchemaToSQL`](connectors.sql.JSONSchemaToSQL) instance doesn't cover all the types supported by the SQLAlchemy dialect in your target, you can override the {attr}`SQLConnector.jsonschema_to_sql <singer_sdk.SQLConnector.jsonschema_to_sql>` property and register a new type handler for the type you need to support:

```python
import functools

import sqlalchemy as sa
from singer_sdk import typing as th
from singer_sdk.connectors import JSONSchemaToSQL, SQLConnector

from my_sqlalchemy_dialect import VectorType


def custom_array_to_sql(jsonschema: dict) -> VectorType | sa.types.VARCHAR:
"""Custom mapping for arrays of numbers."""
if items := jsonschema.get("items"):
if items.get("type") == "number":
return VectorType()

return sa.types.VARCHAR()


class MyConnector(SQLConnector):
@functools.cached_property
def jsonschema_to_sql(self):
to_sql = JSONSchemaToSQL()
to_sql.register_type_handler("array", custom_array_to_sql)
return to_sql
```

### Custom string format mapping

You can also register a new format handler for custom string formats:

```python
from my_sqlalchemy_dialect import URI


class MyConnector(SQLConnector):
@functools.cached_property
def jsonschema_to_sql(self):
to_sql = JSONSchemaToSQL()
to_sql.register_format_handler("uri", URI)
return to_sql
```
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,4 @@ Other
:template: class.rst

connectors.sql.SQLToJSONSchema
connectors.sql.JSONSchemaToSQL
241 changes: 238 additions & 3 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
else:
from warnings import deprecated

if sys.version_info < (3, 10):
from typing_extensions import TypeAlias
else:
from typing import TypeAlias # noqa: ICN003

if t.TYPE_CHECKING:
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector
Expand Down Expand Up @@ -192,6 +197,227 @@
return th.BooleanType.type_dict # type: ignore[no-any-return]


JSONtoSQLHandler: TypeAlias = t.Union[
t.Type[sa.types.TypeEngine],
t.Callable[[dict], sa.types.TypeEngine],
]


class JSONSchemaToSQL:
"""A configurable mapper for converting JSON Schema types to SQLAlchemy types.

This class provides a mapping from JSON Schema types to SQLAlchemy types.

.. versionadded:: 0.42.0
"""

def __init__(self) -> None:
"""Initialize the mapper with default type mappings."""
# Default type mappings
self._type_mapping: dict[str, JSONtoSQLHandler] = {
"string": self._handle_string_type,
"integer": sa.types.INTEGER,
"number": sa.types.DECIMAL,
"boolean": sa.types.BOOLEAN,
"object": sa.types.VARCHAR,
"array": sa.types.VARCHAR,
}

# Format handlers for string types
self._format_handlers: dict[str, JSONtoSQLHandler] = {
# Default date-like formats
"date-time": sa.types.DATETIME,
"time": sa.types.TIME,
"date": sa.types.DATE,
# Common string formats with sensible defaults
"uuid": sa.types.UUID,
"email": lambda _: sa.types.VARCHAR(254), # RFC 5321
"uri": lambda _: sa.types.VARCHAR(2083), # Common browser limit
"hostname": lambda _: sa.types.VARCHAR(253), # RFC 1035
"ipv4": lambda _: sa.types.VARCHAR(15),
"ipv6": lambda _: sa.types.VARCHAR(45),
}

self._fallback_type: type[sa.types.TypeEngine] = sa.types.VARCHAR

def _invoke_handler( # noqa: PLR6301
self,
handler: JSONtoSQLHandler,
schema: dict,
) -> sa.types.TypeEngine:
"""Invoke a handler, handling both type classes and callables.

Args:
handler: The handler to invoke.
schema: The schema to pass to callable handlers.

Returns:
The resulting SQLAlchemy type.
"""
if isinstance(handler, type):
return handler() # type: ignore[no-any-return]
return handler(schema)

@property
def fallback_type(self) -> type[sa.types.TypeEngine]:
"""Return the fallback type.

Returns:
The fallback type.
"""
return self._fallback_type

@fallback_type.setter
def fallback_type(self, value: type[sa.types.TypeEngine]) -> None:
"""Set the fallback type.

Args:
value: The new fallback type.
"""
self._fallback_type = value

def register_type_handler(self, json_type: str, handler: JSONtoSQLHandler) -> None:
"""Register a custom type handler.

Args:
json_type: The JSON Schema type to handle.
handler: Either a SQLAlchemy type class or a callable that takes a schema
dict and returns a SQLAlchemy type instance.
"""
self._type_mapping[json_type] = handler

def register_format_handler(
self,
format_name: str,
handler: JSONtoSQLHandler,
) -> None:
"""Register a custom format handler.

Args:
format_name: The format string (e.g., "date-time", "email", "custom-format").
handler: Either a SQLAlchemy type class or a callable that takes a schema
dict and returns a SQLAlchemy type instance.
""" # noqa: E501
self._format_handlers[format_name] = handler

def handle_multiple_types(self, types: t.Sequence[str]) -> sa.types.TypeEngine: # noqa: ARG002, PLR6301
"""Handle multiple types by returning a VARCHAR.

Args:
types: The list of types to handle.

Returns:
A VARCHAR type.
"""
return sa.types.VARCHAR()

def handle_raw_string(self, schema: dict) -> sa.types.TypeEngine: # noqa: PLR6301
"""Handle a string type generically.

Args:
schema: The JSON Schema object.

Returns:
Appropriate SQLAlchemy type.
"""
max_length: int | None = schema.get("maxLength")
return sa.types.VARCHAR(max_length)

def _get_type_from_schema(self, schema: dict) -> sa.types.TypeEngine | None:
"""Try to get a SQL type from a single schema object.

Args:
schema: The JSON Schema object.

Returns:
SQL type if one can be determined, None otherwise.
"""
# Check if this is a string with format first
if schema.get("type") == "string" and "format" in schema:
format_type = self._handle_format(schema)
if format_type is not None:
return format_type

# Then check regular types
if schema_type := schema.get("type"):
if isinstance(schema_type, (list, tuple)):
# Filter out null type if present
non_null_types = [t for t in schema_type if t != "null"]

# If we have multiple non-null types, use VARCHAR
if len(non_null_types) > 1:
self.handle_multiple_types(non_null_types)

# If we have exactly one non-null type, use its handler
if len(non_null_types) == 1 and non_null_types[0] in self._type_mapping:
handler = self._type_mapping[non_null_types[0]]
return self._invoke_handler(handler, schema)

elif type_handler := self._type_mapping.get(schema_type):
return self._invoke_handler(type_handler, schema)

return None

def _handle_format(self, schema: dict) -> sa.types.TypeEngine | None:
"""Handle format-specific type conversion.

Args:
schema: The JSON Schema object.

Returns:
The format-specific SQL type if applicable, None otherwise.
"""
if "format" not in schema:
return None

format_string: str = schema["format"]

if handler := self._format_handlers.get(format_string):
return self._invoke_handler(handler, schema)

return None

def _handle_string_type(self, schema: dict) -> sa.types.TypeEngine:
"""Handle string type conversion with special cases for formats.

Args:
schema: The JSON Schema object.

Returns:
Appropriate SQLAlchemy type.
"""
# Check for format-specific handling first
if format_type := self._handle_format(schema):
return format_type

return self.handle_raw_string(schema)

def to_sql_type(self, schema: dict) -> sa.types.TypeEngine:
"""Convert a JSON Schema type definition to a SQLAlchemy type.

Args:
schema: The JSON Schema object.

Returns:
The corresponding SQLAlchemy type.
"""
if sql_type := self._get_type_from_schema(schema):
return sql_type

# Handle anyOf
if "anyOf" in schema:
for subschema in schema["anyOf"]:
# Skip null types in anyOf
if subschema.get("type") == "null":
continue

if sql_type := self._get_type_from_schema(subschema):
return sql_type

# Fallback
return self.fallback_type()


class SQLConnector: # noqa: PLR0904
"""Base class for SQLAlchemy-based connectors.

Expand Down Expand Up @@ -255,6 +481,16 @@
"""
return SQLToJSONSchema()

@functools.cached_property
def jsonschema_to_sql(self) -> JSONSchemaToSQL:
"""The JSON-to-SQL type mapper object for this SQL connector.

Override this property to provide a custom mapping for your SQL dialect.

.. versionadded:: 0.42.0
"""
return JSONSchemaToSQL()

@contextmanager
def _connect(self) -> t.Iterator[sa.engine.Connection]:
with self._engine.connect().execution_options(stream_results=True) as conn:
Expand Down Expand Up @@ -418,8 +654,7 @@
msg = f"Unexpected type received: '{type(sql_type).__name__}'"
raise ValueError(msg)

@staticmethod
def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine:
def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine:

Check warning on line 657 in singer_sdk/connectors/sql.py

View workflow job for this annotation

GitHub Actions / Check API Changes

SQLConnector.to_sql_type(jsonschema_type)

Positional parameter was moved: `` -> ``

Check warning on line 657 in singer_sdk/connectors/sql.py

View workflow job for this annotation

GitHub Actions / Check API Changes

SQLConnector.to_sql_type(self)

Parameter was added as required: `` -> ``
"""Return a JSON Schema representation of the provided type.

By default will call `typing.to_sql_type()`.
Expand All @@ -435,7 +670,7 @@
Returns:
The SQLAlchemy type representation of the data type.
"""
return th.to_sql_type(jsonschema_type)
return self.jsonschema_to_sql.to_sql_type(jsonschema_type)

@staticmethod
def get_fully_qualified_name(
Expand Down
4 changes: 4 additions & 0 deletions singer_sdk/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,10 @@ def _jsonschema_type_check(jsonschema_type: dict, type_check: tuple[str]) -> boo
)


@deprecated(
"Use `JSONSchemaToSQL` instead.",
category=DeprecationWarning,
)
def to_sql_type( # noqa: PLR0911, C901
jsonschema_type: dict,
) -> sa.types.TypeEngine:
Expand Down
Loading
Loading