Skip to content

Commit

Permalink
Updating dispatching methods to ensure dbt-synapse adapter can use ad… (
Browse files Browse the repository at this point in the history
#178)

* Updating dispatching methods to ensure dbt-synapse adapter can use adapter dispatch methods

* additional changes

* Updating dbt-fabric adapter to stay compatible with dbt-synapse adapter

* changes to drop_relation_if_exists

* Updated tests and drop_relation

* adding temp relation to table materialization to support temp view drop relation

* adding a log

* dropping and creating temp relation in table materialization and removing it from adapter create_table_as.sql macro

* removing hardcoded fabric__ references to not to break dbt-synapse and dbt-sqlserver adapters

* moving temp relation drop to table, incremental and snapshot materializations

* adding drop relation to create_table_as to support test_store_tests

* adding log statements

* removing comments

* Update

* Update dbt/include/fabric/macros/adapters/columns.sql

Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>

* Update dbt/include/fabric/macros/materializations/models/incremental/incremental.sql

Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>

* Resolving comments

---------

Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>
  • Loading branch information
prdpsvs and jtcohen6 authored May 30, 2024
1 parent 45dacbc commit 4ed06da
Show file tree
Hide file tree
Showing 29 changed files with 171 additions and 288 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/integration-tests-azure.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ jobs:
name: Regular
strategy:
fail-fast: false
max-parallel: 1
matrix:
python_version: ["3.8", "3.9", "3.10", "3.11"]
profile: ["ci_azure_auto"]
python_version: ["3.11"]
msodbc_version: ["17", "18"]
max-parallel: 1

runs-on: ubuntu-latest
container:
image: ghcr.io/${{ github.repository }}:CI-${{ matrix.python_version }}-msodbc${{ matrix.msodbc_version }}
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/fabric/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.5"
version = "1.8.6"
6 changes: 6 additions & 0 deletions dbt/adapters/fabric/fabric_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ def open(cls, connection: Connection) -> Connection:

con_str.append(f"Database={credentials.database}")

#Enabling trace flag
if credentials.trace_flag:
con_str.append("SQL_ATTR_TRACE=SQL_OPT_TRACE_ON")
else:
con_str.append("SQL_ATTR_TRACE=SQL_OPT_TRACE_OFF")

assert credentials.authentication is not None

if "ActiveDirectory" in credentials.authentication:
Expand Down
3 changes: 3 additions & 0 deletions dbt/adapters/fabric/fabric_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class FabricCredentials(Credentials):
UID: Optional[str] = None
PWD: Optional[str] = None
windows_login: Optional[bool] = False
trace_flag: Optional[bool] = False
tenant_id: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
Expand All @@ -36,6 +37,7 @@ class FabricCredentials(Credentials):
"app_secret": "client_secret",
"TrustServerCertificate": "trust_cert",
"schema_auth": "schema_authorization",
"SQL_ATTR_TRACE": "trace_flag",
}

@property
Expand Down Expand Up @@ -63,6 +65,7 @@ def _connection_keys(self):
"retries",
"login_timeout",
"query_timeout",
"trace_flag",
)

@property
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/fabric/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@

{% macro fabric__alter_column_type(relation, column_name, new_column_type) %}

{%- set table_name= tmp_relation.include(database=False).include(schema=False)-%}
{%- set schema_name = tmp_relation.include(database=False).include(identifier=False) -%}
{%- set table_name= relation.identifier -%}
{%- set schema_name = relation.schema -%}

{% set generate_tmp_relation_script %}
SELECT TRIM(REPLACE(STRING_AGG(ColumnName + ' ', ',-'), '-', CHAR(10))) AS ColumnDef
Expand Down
12 changes: 10 additions & 2 deletions dbt/include/fabric/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
information_schema
{%- endmacro %}

{% macro get_use_database_sql(database) %}
{{ return(adapter.dispatch('get_use_database_sql', 'dbt')(database)) }}
{% endmacro %}

{%- macro fabric__get_use_database_sql(database) -%}
USE [{{database}}];
{%- endmacro -%}

{% macro fabric__list_schemas(database) %}
{% call statement('list_schemas', fetch_result=True, auto_begin=False) -%}
select name as [schema]
Expand All @@ -27,7 +35,7 @@

{% macro fabric__list_relations_without_caching(schema_relation) -%}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
{{ get_use_database_sql(schema_relation.database) }}
with base as (
select
DB_NAME() as [database],
Expand All @@ -51,7 +59,7 @@

{% macro fabric__get_relation_without_caching(schema_relation) -%}
{% call statement('get_relation_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
{{ get_use_database_sql(schema_relation.database) }}
with base as (
select
DB_NAME() as [database],
Expand Down
53 changes: 23 additions & 30 deletions dbt/include/fabric/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,44 @@
{{ return(temp_relation) }}
{% endmacro %}

{% macro fabric__drop_relation(relation) -%}
{% call statement('drop_relation', auto_begin=False) -%}
{{ fabric__drop_relation_script(relation) }}
{%- endcall %}
{% endmacro %}

{% macro fabric__drop_relation_script(relation) -%}

{% if relation.type == 'view' -%}
{% macro fabric__get_drop_sql(relation) -%}
{% if relation.type == 'view' -%}
{% call statement('find_references', fetch_result=true) %}
USE [{{ relation.database }}];
select
sch.name as schema_name,
obj.name as view_name
from sys.sql_expression_dependencies refs
inner join sys.objects obj
on refs.referencing_id = obj.object_id
inner join sys.schemas sch
on obj.schema_id = sch.schema_id
where refs.referenced_database_name = '{{ relation.database }}'
and refs.referenced_schema_name = '{{ relation.schema }}'
and refs.referenced_entity_name = '{{ relation.identifier }}'
and refs.referencing_class = 1
and obj.type = 'V'
{{ get_use_database_sql(relation.database) }}
select
sch.name as schema_name,
obj.name as view_name
from sys.sql_expression_dependencies refs
inner join sys.objects obj
on refs.referencing_id = obj.object_id
inner join sys.schemas sch
on obj.schema_id = sch.schema_id
where refs.referenced_database_name = '{{ relation.database }}'
and refs.referenced_schema_name = '{{ relation.schema }}'
and refs.referenced_entity_name = '{{ relation.identifier }}'
and refs.referencing_class = 1
and obj.type = 'V'
{% endcall %}
{% set references = load_result('find_references')['data'] %}
{% for reference in references -%}
-- dropping referenced view {{ reference[0] }}.{{ reference[1] }}
{{ fabric__drop_relation_script(relation.incorporate(
type="view",
path={"schema": reference[0], "identifier": reference[1]})) }}
-- dropping referenced view {{ reference[0] }}.{{ reference[1] }}
{% do adapter.drop_relation
(api.Relation.create(
identifier = reference[1], schema = reference[0], database = relation.database, type='view'
))%}
{% endfor %}
{% elif relation.type == 'table'%}
{% set object_id_type = 'U' %}

{%- else -%}
{{ exceptions.raise_not_implemented('Invalid relation being dropped: ' ~ relation) }}
{% endif %}
USE [{{ relation.database }}];
{{ get_use_database_sql(relation.database) }}
EXEC('DROP {{ relation.type }} IF EXISTS {{ relation.include(database=False) }};');
{% endmacro %}

{% macro fabric__rename_relation(from_relation, to_relation) -%}
{% call statement('rename_relation') -%}
USE [{{ from_relation.database }}];
{{ get_use_database_sql(from_relation.database) }}
EXEC sp_rename '{{ from_relation.schema }}.{{ from_relation.identifier }}', '{{ to_relation.identifier }}'
{%- endcall %}
{% endmacro %}
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/fabric/macros/adapters/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

{% macro fabric__create_schema_with_authorization(relation, schema_authorization) -%}
{% call statement('create_schema') -%}
USE [{{ relation.database }}];
{{ get_use_database_sql(relation.database) }}
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{{ relation.schema }}')
BEGIN
EXEC('CREATE SCHEMA [{{ relation.schema }}] AUTHORIZATION [{{ schema_authorization }}]')
Expand All @@ -27,7 +27,7 @@
identifier=row[1],
type=row[3]
) -%}
{% do drop_relation(schema_relation) %}
{% do adapter.drop_relation(schema_relation) %}
{%- endfor %}

{% call statement('drop_schema') -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@

{% materialization incremental, adapter='fabric' -%}

{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set target_relation = this.incorporate(type='table') %}
{%- set relations_list = fabric__get_relation_without_caching(target_relation) -%}
{%- set relation = load_cached_relation(this) -%}

{%- set existing_relation = none %}
{% if (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] == target_relation.type)%}
{% if relation.type == 'table' %}
{% set existing_relation = target_relation %}
{% elif (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] != target_relation.type) %}
{% set existing_relation = get_or_create_relation(relations_list[0][0], relations_list[0][2] , relations_list[0][1] , relations_list[0][3])[1] %}
{% elif relation.type == 'view' %}
{% set existing_relation = get_or_create_relation(relation.database, relation.schema, relation.identifier, relation.type)[1] %}
{% endif %}

{{ log("Full refresh mode" ~ full_refresh_mode)}}
{{ log("existing relation : "~existing_relation ~ " type "~ existing_relation.type ~ " is view? "~existing_relation.is_view) }}
{{ log("target relation: " ~target_relation ~ " type "~ target_relation.type ~ " is view? "~target_relation.is_view) }}

-- configs
{%- set unique_key = config.get('unique_key') -%}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
Expand All @@ -28,35 +21,39 @@

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if existing_relation is none %}
-- naming a temp relation
{% set tmp_relation_view = target_relation.incorporate(path={"identifier": target_relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%}

-- Fabric & Synapse adapters use temp relation because of lack of CTE support for CTE in CTAS, Insert
-- drop temp relation if exists
{% do adapter.drop_relation(tmp_relation_view) %}

{% if existing_relation is none %}
{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{{ get_create_table_as_sql(False, target_relation, sql)}}
{%- endcall -%}

{% elif existing_relation.is_view %}

{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(existing_relation) }}
{% do adapter.drop_relation(existing_relation) %}

{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{{ get_create_table_as_sql(False, target_relation, sql)}}
{%- endcall -%}

{% elif full_refresh_mode %}

{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{{ get_create_table_as_sql(False, target_relation, sql)}}
{%- endcall -%}

{% else %}

{%- call statement('create_tmp_relation') -%}
{{ fabric__create_table_as(True, temp_relation, sql)}}
{{ get_create_table_as_sql(True, temp_relation, sql)}}
{%- endcall -%}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
to_relation=target_relation) %}
from_relation=temp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
Expand All @@ -72,14 +69,12 @@
{%- endcall -%}
{% endif %}

{% do drop_relation_if_exists(temp_relation) %}
{% do adapter.drop_relation(tmp_relation_view) %}
{% do adapter.drop_relation(temp_relation) %}
{{ run_hooks(post_hooks, inside_transaction=True) }}

{% set target_relation = target_relation.incorporate(type='table') %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}
{% do adapter.commit() %}
{{ return({'relations': [target_relation]}) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
{%- set target_relation = this.incorporate(type='table') -%}

{% call statement('main') %}
{{ fabric__drop_relation_script(target_relation) }}
{% do adapter.drop_relation(target_relation) %}
{{ create_or_replace_clone(target_relation, defer_relation) }}
{% endcall %}
{{ return({'relations': [target_relation]}) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
{% macro build_columns_constraints(relation) %}
{{ return(adapter.dispatch('build_columns_constraints', 'dbt')(relation)) }}
{% endmacro %}

{% macro fabric__build_columns_constraints(relation) %}
{# loop through user_provided_columns to create DDL with data types and constraints #}
{%- set raw_column_constraints = adapter.render_raw_columns_constraints(raw_columns=model['columns']) -%}
Expand All @@ -8,6 +12,10 @@
)
{% endmacro %}

{% macro build_model_constraints(relation) %}
{{ return(adapter.dispatch('build_model_constraints', 'dbt')(relation)) }}
{% endmacro %}

{% macro fabric__build_model_constraints(relation) %}
{# loop through user_provided_columns to create DDL with data types and constraints #}
{%- set raw_model_constraints = adapter.render_raw_model_constraints(raw_constraints=model['constraints']) -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
{% macro fabric__create_table_as(temporary, relation, sql) -%}

{% set tmp_relation = relation.incorporate(
path={"identifier": relation.identifier.replace("#", "") ~ '_temp_view'},
type='view')-%}
{% do run_query(fabric__drop_relation_script(tmp_relation)) %}
{% set tmp_relation = relation.incorporate(path={"identifier": relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%}
{{ get_create_view_as_sql(tmp_relation, sql) }}

{% set contract_config = config.get('contract') %}

{{ fabric__create_view_as(tmp_relation, sql) }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}

CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}]
{{ fabric__build_columns_constraints(relation) }}
{{ build_columns_constraints(relation) }}
{{ get_assert_columns_equivalent(sql) }}

{% set listColumns %}
{% for column in model['columns'] %}
{{ "["~column~"]" }}{{ ", " if not loop.last }}
Expand All @@ -24,9 +19,7 @@
({{listColumns}}) SELECT {{listColumns}} FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}];

{%- else %}
EXEC('CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}] AS (SELECT * FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}]);');
EXEC('CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}] AS (SELECT * FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}]);');
{% endif %}

{{ fabric__drop_relation_script(tmp_relation) }}

{% do adapter.drop_relation(tmp_relation)%}
{% endmacro %}
Loading

0 comments on commit 4ed06da

Please sign in to comment.