Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle reserved words in db, schema and table names #242

Merged
merged 10 commits into from
Aug 13, 2024
Merged
13 changes: 8 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ jobs:
fail-fast: false
matrix:
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
- "3.12"
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
- "3.11"
- "3.10"
- "3.9"
- "3.8"
os:
- "ubuntu-latest"
# - "macos-latest"
# - "windows-latest"
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
Expand Down
38 changes: 13 additions & 25 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,23 @@ packages = [
[tool.poetry.dependencies]
python = ">=3.8"
cryptography = ">=40,<43"
singer-sdk = "~=0.39.0"
snowflake-sqlalchemy = "~=1.6.1"
snowflake-connector-python = { version = "<4.0.0", extras = ["secure-local-storage"] }
sqlalchemy = "<2"

[tool.poetry.dependencies.singer-sdk]
git = "https://github.com/meltano/sdk.git"
rev = "edgarrmondragon/refactor/handle-sql-ident-normalization"

[tool.poetry.group.dev.dependencies]
coverage = ">=7.2.7"
pytest = ">=7.4.3"
pytest-xdist = ">=3.3.1"
singer-sdk = { version="~=0.39.0", extras = ["testing"] }

[tool.poetry.group.dev.dependencies.singer-sdk]
git = "https://github.com/meltano/sdk.git"
rev = "edgarrmondragon/refactor/handle-sql-ident-normalization"
extras = ["testing"]

[tool.ruff]
line-length = 120
Expand Down Expand Up @@ -68,3 +75,6 @@ target-snowflake = 'target_snowflake.target:TargetSnowflake.cli'
enable = true
vcs = "git"
style = "semver"

[tool.pytest.ini_options]
addopts = '--durations=10'
4 changes: 2 additions & 2 deletions target_snowflake/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from snowflake.sqlalchemy import URL
from snowflake.sqlalchemy.base import SnowflakeIdentifierPreparer
from snowflake.sqlalchemy.snowdialect import SnowflakeDialect
from sqlalchemy.sql import quoted_name, text
from sqlalchemy.sql import text

from target_snowflake.snowflake_types import NUMBER, TIMESTAMP_NTZ, VARIANT

Expand Down Expand Up @@ -388,7 +388,7 @@ def _get_merge_from_stage_statement( # noqa: ANN202
dedup = f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {dedup_cols} ORDER BY SEQ8() DESC) = 1"
return (
text(
f"merge into {quoted_name(full_table_name, quote=True)} d using " # noqa: ISC003
f"merge into {full_table_name} d using " # noqa: ISC003
+ f"(select {json_casting_selects} from '@~/target-snowflake/{sync_id}'" # noqa: S608
+ f"(file_format => {file_format}) {dedup}) s "
+ f"on {join_expr} "
Expand Down
2 changes: 1 addition & 1 deletion target_snowflake/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
}


class SnowflakeSink(SQLSink):
class SnowflakeSink(SQLSink[SnowflakeConnector]):
"""Snowflake target sink class."""

connector_class = SnowflakeConnector
Expand Down
57 changes: 57 additions & 0 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,61 @@ def setup(self) -> None:
)


class SnowflakeTargetExistingReservedNameTableAlter(TargetFileTestTemplate):
name = "existing_reserved_name_table_alter"
# This sends a schema that will request altering from TIMESTAMP_NTZ to VARCHAR

@property
def singer_filepath(self) -> Path:
current_dir = Path(__file__).resolve().parent
return current_dir / "target_test_streams" / "reserved_words_in_table.singer"

def setup(self) -> None:
connector = self.target.default_sink_class.connector_class(self.target.config)
table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.\"order\"".upper()
connector.connection.execute(
f"""
CREATE OR REPLACE TABLE {table} (
ID VARCHAR(16777216),
COL_STR VARCHAR(16777216),
COL_TS TIMESTAMP_NTZ(9),
COL_INT STRING,
COL_BOOL BOOLEAN,
COL_VARIANT VARIANT,
_SDC_BATCHED_AT TIMESTAMP_NTZ(9),
_SDC_DELETED_AT VARCHAR(16777216),
_SDC_EXTRACTED_AT TIMESTAMP_NTZ(9),
_SDC_RECEIVED_AT TIMESTAMP_NTZ(9),
_SDC_SEQUENCE NUMBER(38,0),
_SDC_TABLE_VERSION NUMBER(38,0),
PRIMARY KEY (ID)
)
""",
)


class SnowflakeTargetReservedWordsInTable(TargetFileTestTemplate):
# Contains reserved words from
# https://docs.snowflake.com/en/sql-reference/reserved-keywords
# Syncs records then alters schema by adding a non-reserved word column.
name = "reserved_words_in_table"

@property
def singer_filepath(self) -> Path:
current_dir = Path(__file__).resolve().parent
return current_dir / "target_test_streams" / "reserved_words_in_table.singer"

def validate(self) -> None:
connector = self.target.default_sink_class.connector_class(self.target.config)
table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.\"order\"".upper()
result = connector.connection.execute(
f"select * from {table}",
)
assert result.rowcount == 1
row = result.first()
assert len(row) == 13, f"Row has unexpected length {len(row)}"


class SnowflakeTargetTypeEdgeCasesTest(TargetFileTestTemplate):
name = "type_edge_cases"

Expand Down Expand Up @@ -540,6 +595,8 @@ def singer_filepath(self) -> Path:
SnowflakeTargetColonsInColName,
SnowflakeTargetExistingTable,
SnowflakeTargetExistingTableAlter,
SnowflakeTargetExistingReservedNameTableAlter,
SnowflakeTargetReservedWordsInTable,
SnowflakeTargetTypeEdgeCasesTest,
SnowflakeTargetColumnOrderMismatch,
],
Expand Down
2 changes: 2 additions & 0 deletions tests/target_test_streams/reserved_words_in_table.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{ "type": "SCHEMA", "stream": "order", "schema": { "properties": { "id": { "type": [ "string", "null" ] }, "col_str": { "type": [ "string", "null" ] }, "col_ts": { "format": "date-time", "type": [ "string", "null" ] }, "col_int": { "type": "integer" }, "col_bool": { "type": [ "boolean", "null" ] }, "col_variant": {"type": "object"} }, "type": "object" }, "key_properties": [ "id" ], "bookmark_properties": [ "col_ts" ] }
{ "type": "RECORD", "stream": "order", "record": { "id": "123", "col_str": "foo", "col_ts": "2023-06-13 11:50:04.072", "col_int": 5, "col_bool": true, "col_variant": {"key": "val"} }, "time_extracted": "2023-06-14T18:08:23.074716+00:00" }
Loading