Skip to content

Commit

Permalink
drop table instead of truncating
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Sep 7, 2023
1 parent 6832ca1 commit 42961af
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 48 deletions.
9 changes: 0 additions & 9 deletions samples/sample_target_sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@ def get_sqlalchemy_url(self, config: dict[str, t.Any]) -> str:
"""Generates a SQLAlchemy URL for SQLite."""
return f"sqlite:///{config[DB_PATH_CONFIG]}"

@staticmethod
def get_truncate_table_ddl(
table_name: str,
) -> tuple[text, dict]:
return (
text(f"DELETE FROM {table_name}"), # noqa: S608
{},
)


class SQLiteSink(SQLSink):
"""The Sink class for SQLite.
Expand Down
48 changes: 9 additions & 39 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,44 +744,6 @@ def prepare_schema(self, schema_name: str) -> None:
if not schema_exists:
self.create_schema(schema_name)

@staticmethod
def get_truncate_table_ddl(
table_name: str,
) -> tuple[TextClause, dict]:
"""Get the truncate table SQL statement.
Override this if your database uses a different syntax for truncating tables.
Args:
table_name: Fully qualified table name of column to alter.
Returns:
A tuple of the SQL statement and a dictionary of bind parameters.
"""
return (
text(f"TRUNCATE TABLE {table_name}"),
{},
)

def truncate_table(self, full_table_name: str) -> None:
"""Truncate target table.
Args:
full_table_name: Fully qualified table name of column to alter.
Raises:
NotImplementedError: if truncating tables is not supported.
"""
if not self.allow_overwrite:
msg = "Truncating tables is not supported."
raise NotImplementedError(msg)

truncate_table_ddl, kwargs = self.get_truncate_table_ddl(
table_name=full_table_name,
)
with self._connect() as conn, conn.begin():
conn.execute(truncate_table_ddl, **kwargs)

def prepare_table(
self,
full_table_name: str,
Expand Down Expand Up @@ -809,7 +771,15 @@ def prepare_table(
)
return
if self.config["load_method"] == TargetLoadMethods.OVERWRITE:
self.truncate_table(full_table_name=full_table_name)
self.get_table(full_table_name=full_table_name).drop(self._engine)
self.create_empty_table(
full_table_name=full_table_name,
schema=schema,
primary_keys=primary_keys,
partition_keys=partition_keys,
as_temp_table=as_temp_table,
)
return

for property_name, property_def in schema["properties"].items():
self.prepare_column(
Expand Down

0 comments on commit 42961af

Please sign in to comment.